1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 
13 namespace ROCKSDB_NAMESPACE {
14 
15 static int cfilter_count = 0;
16 static int cfilter_skips = 0;
17 
18 // This is a static filter used for filtering
19 // kvs during the compaction process.
20 static std::string NEW_VALUE = "NewValue";
21 
22 class DBTestCompactionFilter : public DBTestBase {
23  public:
DBTestCompactionFilter()24   DBTestCompactionFilter() : DBTestBase("/db_compaction_filter_test") {}
25 };
26 
27 // Param variant of DBTestBase::ChangeCompactOptions
28 class DBTestCompactionFilterWithCompactParam
29     : public DBTestCompactionFilter,
30       public ::testing::WithParamInterface<DBTestBase::OptionConfig> {
31  public:
DBTestCompactionFilterWithCompactParam()32   DBTestCompactionFilterWithCompactParam() : DBTestCompactionFilter() {
33     option_config_ = GetParam();
34     Destroy(last_options_);
35     auto options = CurrentOptions();
36     if (option_config_ == kDefault || option_config_ == kUniversalCompaction ||
37         option_config_ == kUniversalCompactionMultiLevel) {
38       options.create_if_missing = true;
39     }
40     if (option_config_ == kLevelSubcompactions ||
41         option_config_ == kUniversalSubcompactions) {
42       assert(options.max_subcompactions > 1);
43     }
44     TryReopen(options);
45   }
46 };
47 
48 #ifndef ROCKSDB_VALGRIND_RUN
49 INSTANTIATE_TEST_CASE_P(
50     CompactionFilterWithOption, DBTestCompactionFilterWithCompactParam,
51     ::testing::Values(DBTestBase::OptionConfig::kDefault,
52                       DBTestBase::OptionConfig::kUniversalCompaction,
53                       DBTestBase::OptionConfig::kUniversalCompactionMultiLevel,
54                       DBTestBase::OptionConfig::kLevelSubcompactions,
55                       DBTestBase::OptionConfig::kUniversalSubcompactions));
56 #else
57 // Run fewer cases in valgrind
58 INSTANTIATE_TEST_CASE_P(CompactionFilterWithOption,
59                         DBTestCompactionFilterWithCompactParam,
60                         ::testing::Values(DBTestBase::OptionConfig::kDefault));
61 #endif  // ROCKSDB_VALGRIND_RUN
62 
63 class KeepFilter : public CompactionFilter {
64  public:
Filter(int,const Slice &,const Slice &,std::string *,bool *) const65   bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
66               std::string* /*new_value*/,
67               bool* /*value_changed*/) const override {
68     cfilter_count++;
69     return false;
70   }
71 
Name() const72   const char* Name() const override { return "KeepFilter"; }
73 };
74 
75 class DeleteFilter : public CompactionFilter {
76  public:
Filter(int,const Slice &,const Slice &,std::string *,bool *) const77   bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
78               std::string* /*new_value*/,
79               bool* /*value_changed*/) const override {
80     cfilter_count++;
81     return true;
82   }
83 
Name() const84   const char* Name() const override { return "DeleteFilter"; }
85 };
86 
87 class DeleteISFilter : public CompactionFilter {
88  public:
Filter(int,const Slice & key,const Slice &,std::string *,bool *) const89   bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
90               std::string* /*new_value*/,
91               bool* /*value_changed*/) const override {
92     cfilter_count++;
93     int i = std::stoi(key.ToString());
94     if (i > 5 && i <= 105) {
95       return true;
96     }
97     return false;
98   }
99 
IgnoreSnapshots() const100   bool IgnoreSnapshots() const override { return true; }
101 
Name() const102   const char* Name() const override { return "DeleteFilter"; }
103 };
104 
105 // Skip x if floor(x/10) is even, use range skips. Requires that keys are
106 // zero-padded to length 10.
107 class SkipEvenFilter : public CompactionFilter {
108  public:
FilterV2(int,const Slice & key,ValueType,const Slice &,std::string *,std::string * skip_until) const109   Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/,
110                     const Slice& /*existing_value*/, std::string* /*new_value*/,
111                     std::string* skip_until) const override {
112     cfilter_count++;
113     int i = std::stoi(key.ToString());
114     if (i / 10 % 2 == 0) {
115       char key_str[100];
116       snprintf(key_str, sizeof(key_str), "%010d", i / 10 * 10 + 10);
117       *skip_until = key_str;
118       ++cfilter_skips;
119       return Decision::kRemoveAndSkipUntil;
120     }
121     return Decision::kKeep;
122   }
123 
IgnoreSnapshots() const124   bool IgnoreSnapshots() const override { return true; }
125 
Name() const126   const char* Name() const override { return "DeleteFilter"; }
127 };
128 
129 class DelayFilter : public CompactionFilter {
130  public:
DelayFilter(DBTestBase * d)131   explicit DelayFilter(DBTestBase* d) : db_test(d) {}
Filter(int,const Slice &,const Slice &,std::string *,bool *) const132   bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
133               std::string* /*new_value*/,
134               bool* /*value_changed*/) const override {
135     db_test->env_->addon_time_.fetch_add(1000);
136     return true;
137   }
138 
Name() const139   const char* Name() const override { return "DelayFilter"; }
140 
141  private:
142   DBTestBase* db_test;
143 };
144 
145 class ConditionalFilter : public CompactionFilter {
146  public:
ConditionalFilter(const std::string * filtered_value)147   explicit ConditionalFilter(const std::string* filtered_value)
148       : filtered_value_(filtered_value) {}
Filter(int,const Slice &,const Slice & value,std::string *,bool *) const149   bool Filter(int /*level*/, const Slice& /*key*/, const Slice& value,
150               std::string* /*new_value*/,
151               bool* /*value_changed*/) const override {
152     return value.ToString() == *filtered_value_;
153   }
154 
Name() const155   const char* Name() const override { return "ConditionalFilter"; }
156 
157  private:
158   const std::string* filtered_value_;
159 };
160 
161 class ChangeFilter : public CompactionFilter {
162  public:
ChangeFilter()163   explicit ChangeFilter() {}
164 
Filter(int,const Slice &,const Slice &,std::string * new_value,bool * value_changed) const165   bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
166               std::string* new_value, bool* value_changed) const override {
167     assert(new_value != nullptr);
168     *new_value = NEW_VALUE;
169     *value_changed = true;
170     return false;
171   }
172 
Name() const173   const char* Name() const override { return "ChangeFilter"; }
174 };
175 
176 class KeepFilterFactory : public CompactionFilterFactory {
177  public:
KeepFilterFactory(bool check_context=false,bool check_context_cf_id=false)178   explicit KeepFilterFactory(bool check_context = false,
179                              bool check_context_cf_id = false)
180       : check_context_(check_context),
181         check_context_cf_id_(check_context_cf_id),
182         compaction_filter_created_(false) {}
183 
CreateCompactionFilter(const CompactionFilter::Context & context)184   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
185       const CompactionFilter::Context& context) override {
186     if (check_context_) {
187       EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
188       EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
189     }
190     if (check_context_cf_id_) {
191       EXPECT_EQ(expect_cf_id_.load(), context.column_family_id);
192     }
193     compaction_filter_created_ = true;
194     return std::unique_ptr<CompactionFilter>(new KeepFilter());
195   }
196 
compaction_filter_created() const197   bool compaction_filter_created() const { return compaction_filter_created_; }
198 
Name() const199   const char* Name() const override { return "KeepFilterFactory"; }
200   bool check_context_;
201   bool check_context_cf_id_;
202   std::atomic_bool expect_full_compaction_;
203   std::atomic_bool expect_manual_compaction_;
204   std::atomic<uint32_t> expect_cf_id_;
205   bool compaction_filter_created_;
206 };
207 
208 class DeleteFilterFactory : public CompactionFilterFactory {
209  public:
CreateCompactionFilter(const CompactionFilter::Context & context)210   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
211       const CompactionFilter::Context& context) override {
212     if (context.is_manual_compaction) {
213       return std::unique_ptr<CompactionFilter>(new DeleteFilter());
214     } else {
215       return std::unique_ptr<CompactionFilter>(nullptr);
216     }
217   }
218 
Name() const219   const char* Name() const override { return "DeleteFilterFactory"; }
220 };
221 
222 // Delete Filter Factory which ignores snapshots
223 class DeleteISFilterFactory : public CompactionFilterFactory {
224  public:
CreateCompactionFilter(const CompactionFilter::Context & context)225   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
226       const CompactionFilter::Context& context) override {
227     if (context.is_manual_compaction) {
228       return std::unique_ptr<CompactionFilter>(new DeleteISFilter());
229     } else {
230       return std::unique_ptr<CompactionFilter>(nullptr);
231     }
232   }
233 
Name() const234   const char* Name() const override { return "DeleteFilterFactory"; }
235 };
236 
237 class SkipEvenFilterFactory : public CompactionFilterFactory {
238  public:
CreateCompactionFilter(const CompactionFilter::Context & context)239   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
240       const CompactionFilter::Context& context) override {
241     if (context.is_manual_compaction) {
242       return std::unique_ptr<CompactionFilter>(new SkipEvenFilter());
243     } else {
244       return std::unique_ptr<CompactionFilter>(nullptr);
245     }
246   }
247 
Name() const248   const char* Name() const override { return "SkipEvenFilterFactory"; }
249 };
250 
251 class DelayFilterFactory : public CompactionFilterFactory {
252  public:
DelayFilterFactory(DBTestBase * d)253   explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
CreateCompactionFilter(const CompactionFilter::Context &)254   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
255       const CompactionFilter::Context& /*context*/) override {
256     return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
257   }
258 
Name() const259   const char* Name() const override { return "DelayFilterFactory"; }
260 
261  private:
262   DBTestBase* db_test;
263 };
264 
265 class ConditionalFilterFactory : public CompactionFilterFactory {
266  public:
ConditionalFilterFactory(const Slice & filtered_value)267   explicit ConditionalFilterFactory(const Slice& filtered_value)
268       : filtered_value_(filtered_value.ToString()) {}
269 
CreateCompactionFilter(const CompactionFilter::Context &)270   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
271       const CompactionFilter::Context& /*context*/) override {
272     return std::unique_ptr<CompactionFilter>(
273         new ConditionalFilter(&filtered_value_));
274   }
275 
Name() const276   const char* Name() const override { return "ConditionalFilterFactory"; }
277 
278  private:
279   std::string filtered_value_;
280 };
281 
282 class ChangeFilterFactory : public CompactionFilterFactory {
283  public:
ChangeFilterFactory()284   explicit ChangeFilterFactory() {}
285 
CreateCompactionFilter(const CompactionFilter::Context &)286   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
287       const CompactionFilter::Context& /*context*/) override {
288     return std::unique_ptr<CompactionFilter>(new ChangeFilter());
289   }
290 
Name() const291   const char* Name() const override { return "ChangeFilterFactory"; }
292 };
293 
294 #ifndef ROCKSDB_LITE
TEST_F(DBTestCompactionFilter,CompactionFilter)295 TEST_F(DBTestCompactionFilter, CompactionFilter) {
296   Options options = CurrentOptions();
297   options.max_open_files = -1;
298   options.num_levels = 3;
299   options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
300   options = CurrentOptions(options);
301   CreateAndReopenWithCF({"pikachu"}, options);
302 
303   // Write 100K keys, these are written to a few files in L0.
304   const std::string value(10, 'x');
305   for (int i = 0; i < 100000; i++) {
306     char key[100];
307     snprintf(key, sizeof(key), "B%010d", i);
308     Put(1, key, value);
309   }
310   ASSERT_OK(Flush(1));
311 
312   // Push all files to the highest level L2. Verify that
313   // the compaction is each level invokes the filter for
314   // all the keys in that level.
315   cfilter_count = 0;
316   dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
317   ASSERT_EQ(cfilter_count, 100000);
318   cfilter_count = 0;
319   dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
320   ASSERT_EQ(cfilter_count, 100000);
321 
322   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
323   ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
324   ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
325   cfilter_count = 0;
326 
327   // All the files are in the lowest level.
328   // Verify that all but the 100001st record
329   // has sequence number zero. The 100001st record
330   // is at the tip of this snapshot and cannot
331   // be zeroed out.
332   int count = 0;
333   int total = 0;
334   Arena arena;
335   {
336     InternalKeyComparator icmp(options.comparator);
337     ReadRangeDelAggregator range_del_agg(&icmp,
338                                          kMaxSequenceNumber /* upper_bound */);
339     ScopedArenaIterator iter(dbfull()->NewInternalIterator(
340         &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
341     iter->SeekToFirst();
342     ASSERT_OK(iter->status());
343     while (iter->Valid()) {
344       ParsedInternalKey ikey(Slice(), 0, kTypeValue);
345       ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
346       total++;
347       if (ikey.sequence != 0) {
348         count++;
349       }
350       iter->Next();
351     }
352   }
353   ASSERT_EQ(total, 100000);
354   ASSERT_EQ(count, 0);
355 
356   // overwrite all the 100K keys once again.
357   for (int i = 0; i < 100000; i++) {
358     char key[100];
359     snprintf(key, sizeof(key), "B%010d", i);
360     ASSERT_OK(Put(1, key, value));
361   }
362   ASSERT_OK(Flush(1));
363 
364   // push all files to the highest level L2. This
365   // means that all keys should pass at least once
366   // via the compaction filter
367   cfilter_count = 0;
368   dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
369   ASSERT_EQ(cfilter_count, 100000);
370   cfilter_count = 0;
371   dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
372   ASSERT_EQ(cfilter_count, 100000);
373   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
374   ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
375   ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
376 
377   // create a new database with the compaction
378   // filter in such a way that it deletes all keys
379   options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
380   options.create_if_missing = true;
381   DestroyAndReopen(options);
382   CreateAndReopenWithCF({"pikachu"}, options);
383 
384   // write all the keys once again.
385   for (int i = 0; i < 100000; i++) {
386     char key[100];
387     snprintf(key, sizeof(key), "B%010d", i);
388     ASSERT_OK(Put(1, key, value));
389   }
390   ASSERT_OK(Flush(1));
391   ASSERT_NE(NumTableFilesAtLevel(0, 1), 0);
392   ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
393   ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0);
394 
395   // Push all files to the highest level L2. This
396   // triggers the compaction filter to delete all keys,
397   // verify that at the end of the compaction process,
398   // nothing is left.
399   cfilter_count = 0;
400   dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
401   ASSERT_EQ(cfilter_count, 100000);
402   cfilter_count = 0;
403   dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
404   ASSERT_EQ(cfilter_count, 0);
405   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
406   ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
407 
408   {
409     // Scan the entire database to ensure that nothing is left
410     std::unique_ptr<Iterator> iter(
411         db_->NewIterator(ReadOptions(), handles_[1]));
412     iter->SeekToFirst();
413     count = 0;
414     while (iter->Valid()) {
415       count++;
416       iter->Next();
417     }
418     ASSERT_EQ(count, 0);
419   }
420 
421   // The sequence number of the remaining record
422   // is not zeroed out even though it is at the
423   // level Lmax because this record is at the tip
424   count = 0;
425   {
426     InternalKeyComparator icmp(options.comparator);
427     ReadRangeDelAggregator range_del_agg(&icmp,
428                                          kMaxSequenceNumber /* upper_bound */);
429     ScopedArenaIterator iter(dbfull()->NewInternalIterator(
430         &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
431     iter->SeekToFirst();
432     ASSERT_OK(iter->status());
433     while (iter->Valid()) {
434       ParsedInternalKey ikey(Slice(), 0, kTypeValue);
435       ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
436       ASSERT_NE(ikey.sequence, (unsigned)0);
437       count++;
438       iter->Next();
439     }
440     ASSERT_EQ(count, 0);
441   }
442 }
443 
444 // Tests the edge case where compaction does not produce any output -- all
445 // entries are deleted. The compaction should create bunch of 'DeleteFile'
446 // entries in VersionEdit, but none of the 'AddFile's.
TEST_F(DBTestCompactionFilter,CompactionFilterDeletesAll)447 TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) {
448   Options options = CurrentOptions();
449   options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
450   options.disable_auto_compactions = true;
451   options.create_if_missing = true;
452   DestroyAndReopen(options);
453 
454   // put some data
455   for (int table = 0; table < 4; ++table) {
456     for (int i = 0; i < 10 + table; ++i) {
457       Put(ToString(table * 100 + i), "val");
458     }
459     Flush();
460   }
461 
462   // this will produce empty file (delete compaction filter)
463   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
464   ASSERT_EQ(0U, CountLiveFiles());
465 
466   Reopen(options);
467 
468   Iterator* itr = db_->NewIterator(ReadOptions());
469   itr->SeekToFirst();
470   // empty db
471   ASSERT_TRUE(!itr->Valid());
472 
473   delete itr;
474 }
475 #endif  // ROCKSDB_LITE
476 
TEST_P(DBTestCompactionFilterWithCompactParam,CompactionFilterWithValueChange)477 TEST_P(DBTestCompactionFilterWithCompactParam,
478        CompactionFilterWithValueChange) {
479   Options options = CurrentOptions();
480   options.num_levels = 3;
481   options.compaction_filter_factory = std::make_shared<ChangeFilterFactory>();
482   CreateAndReopenWithCF({"pikachu"}, options);
483 
484   // Write 100K+1 keys, these are written to a few files
485   // in L0. We do this so that the current snapshot points
486   // to the 100001 key.The compaction filter is  not invoked
487   // on keys that are visible via a snapshot because we
488   // anyways cannot delete it.
489   const std::string value(10, 'x');
490   for (int i = 0; i < 100001; i++) {
491     char key[100];
492     snprintf(key, sizeof(key), "B%010d", i);
493     Put(1, key, value);
494   }
495 
496   // push all files to  lower levels
497   ASSERT_OK(Flush(1));
498   if (option_config_ != kUniversalCompactionMultiLevel &&
499       option_config_ != kUniversalSubcompactions) {
500     dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
501     dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
502   } else {
503     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
504                            nullptr);
505   }
506 
507   // re-write all data again
508   for (int i = 0; i < 100001; i++) {
509     char key[100];
510     snprintf(key, sizeof(key), "B%010d", i);
511     Put(1, key, value);
512   }
513 
514   // push all files to  lower levels. This should
515   // invoke the compaction filter for all 100000 keys.
516   ASSERT_OK(Flush(1));
517   if (option_config_ != kUniversalCompactionMultiLevel &&
518       option_config_ != kUniversalSubcompactions) {
519     dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
520     dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
521   } else {
522     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
523                            nullptr);
524   }
525 
526   // verify that all keys now have the new value that
527   // was set by the compaction process.
528   for (int i = 0; i < 100001; i++) {
529     char key[100];
530     snprintf(key, sizeof(key), "B%010d", i);
531     std::string newvalue = Get(1, key);
532     ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
533   }
534 }
535 
TEST_F(DBTestCompactionFilter,CompactionFilterWithMergeOperator)536 TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) {
537   std::string one, two, three, four;
538   PutFixed64(&one, 1);
539   PutFixed64(&two, 2);
540   PutFixed64(&three, 3);
541   PutFixed64(&four, 4);
542 
543   Options options = CurrentOptions();
544   options.create_if_missing = true;
545   options.merge_operator = MergeOperators::CreateUInt64AddOperator();
546   options.num_levels = 3;
547   // Filter out keys with value is 2.
548   options.compaction_filter_factory =
549       std::make_shared<ConditionalFilterFactory>(two);
550   DestroyAndReopen(options);
551 
552   // In the same compaction, a value type needs to be deleted based on
553   // compaction filter, and there is a merge type for the key. compaction
554   // filter result is ignored.
555   ASSERT_OK(db_->Put(WriteOptions(), "foo", two));
556   ASSERT_OK(Flush());
557   ASSERT_OK(db_->Merge(WriteOptions(), "foo", one));
558   ASSERT_OK(Flush());
559   std::string newvalue = Get("foo");
560   ASSERT_EQ(newvalue, three);
561   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
562   newvalue = Get("foo");
563   ASSERT_EQ(newvalue, three);
564 
565   // value key can be deleted based on compaction filter, leaving only
566   // merge keys.
567   ASSERT_OK(db_->Put(WriteOptions(), "bar", two));
568   ASSERT_OK(Flush());
569   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
570   newvalue = Get("bar");
571   ASSERT_EQ("NOT_FOUND", newvalue);
572   ASSERT_OK(db_->Merge(WriteOptions(), "bar", two));
573   ASSERT_OK(Flush());
574   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
575   newvalue = Get("bar");
576   ASSERT_EQ(two, two);
577 
578   // Compaction filter never applies to merge keys.
579   ASSERT_OK(db_->Put(WriteOptions(), "foobar", one));
580   ASSERT_OK(Flush());
581   ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two));
582   ASSERT_OK(Flush());
583   newvalue = Get("foobar");
584   ASSERT_EQ(newvalue, three);
585   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
586   newvalue = Get("foobar");
587   ASSERT_EQ(newvalue, three);
588 
589   // In the same compaction, both of value type and merge type keys need to be
590   // deleted based on compaction filter, and there is a merge type for the key.
591   // For both keys, compaction filter results are ignored.
592   ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two));
593   ASSERT_OK(Flush());
594   ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two));
595   ASSERT_OK(Flush());
596   newvalue = Get("barfoo");
597   ASSERT_EQ(newvalue, four);
598   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
599   newvalue = Get("barfoo");
600   ASSERT_EQ(newvalue, four);
601 }
602 
603 #ifndef ROCKSDB_LITE
TEST_F(DBTestCompactionFilter,CompactionFilterContextManual)604 TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
605   KeepFilterFactory* filter = new KeepFilterFactory(true, true);
606 
607   Options options = CurrentOptions();
608   options.compaction_style = kCompactionStyleUniversal;
609   options.compaction_filter_factory.reset(filter);
610   options.compression = kNoCompression;
611   options.level0_file_num_compaction_trigger = 8;
612   Reopen(options);
613   int num_keys_per_file = 400;
614   for (int j = 0; j < 3; j++) {
615     // Write several keys.
616     const std::string value(10, 'x');
617     for (int i = 0; i < num_keys_per_file; i++) {
618       char key[100];
619       snprintf(key, sizeof(key), "B%08d%02d", i, j);
620       Put(key, value);
621     }
622     dbfull()->TEST_FlushMemTable();
623     // Make sure next file is much smaller so automatic compaction will not
624     // be triggered.
625     num_keys_per_file /= 2;
626   }
627   dbfull()->TEST_WaitForCompact();
628 
629   // Force a manual compaction
630   cfilter_count = 0;
631   filter->expect_manual_compaction_.store(true);
632   filter->expect_full_compaction_.store(true);
633   filter->expect_cf_id_.store(0);
634   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
635   ASSERT_EQ(cfilter_count, 700);
636   ASSERT_EQ(NumSortedRuns(0), 1);
637   ASSERT_TRUE(filter->compaction_filter_created());
638 
639   // Verify total number of keys is correct after manual compaction.
640   {
641     int count = 0;
642     int total = 0;
643     Arena arena;
644     InternalKeyComparator icmp(options.comparator);
645     ReadRangeDelAggregator range_del_agg(&icmp,
646                                          kMaxSequenceNumber /* snapshots */);
647     ScopedArenaIterator iter(dbfull()->NewInternalIterator(
648         &arena, &range_del_agg, kMaxSequenceNumber));
649     iter->SeekToFirst();
650     ASSERT_OK(iter->status());
651     while (iter->Valid()) {
652       ParsedInternalKey ikey(Slice(), 0, kTypeValue);
653       ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
654       total++;
655       if (ikey.sequence != 0) {
656         count++;
657       }
658       iter->Next();
659     }
660     ASSERT_EQ(total, 700);
661     ASSERT_EQ(count, 0);
662   }
663 }
664 #endif  // ROCKSDB_LITE
665 
TEST_F(DBTestCompactionFilter,CompactionFilterContextCfId)666 TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) {
667   KeepFilterFactory* filter = new KeepFilterFactory(false, true);
668   filter->expect_cf_id_.store(1);
669 
670   Options options = CurrentOptions();
671   options.compaction_filter_factory.reset(filter);
672   options.compression = kNoCompression;
673   options.level0_file_num_compaction_trigger = 2;
674   CreateAndReopenWithCF({"pikachu"}, options);
675 
676   int num_keys_per_file = 400;
677   for (int j = 0; j < 3; j++) {
678     // Write several keys.
679     const std::string value(10, 'x');
680     for (int i = 0; i < num_keys_per_file; i++) {
681       char key[100];
682       snprintf(key, sizeof(key), "B%08d%02d", i, j);
683       Put(1, key, value);
684     }
685     Flush(1);
686     // Make sure next file is much smaller so automatic compaction will not
687     // be triggered.
688     num_keys_per_file /= 2;
689   }
690   dbfull()->TEST_WaitForCompact();
691 
692   ASSERT_TRUE(filter->compaction_filter_created());
693 }
694 
695 #ifndef ROCKSDB_LITE
696 // Compaction filters aplies to all records, regardless snapshots.
TEST_F(DBTestCompactionFilter,CompactionFilterIgnoreSnapshot)697 TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
698   std::string five = ToString(5);
699   Options options = CurrentOptions();
700   options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>();
701   options.disable_auto_compactions = true;
702   options.create_if_missing = true;
703   DestroyAndReopen(options);
704 
705   // Put some data.
706   const Snapshot* snapshot = nullptr;
707   for (int table = 0; table < 4; ++table) {
708     for (int i = 0; i < 10; ++i) {
709       Put(ToString(table * 100 + i), "val");
710     }
711     Flush();
712 
713     if (table == 0) {
714       snapshot = db_->GetSnapshot();
715     }
716   }
717   assert(snapshot != nullptr);
718 
719   cfilter_count = 0;
720   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
721   // The filter should delete 40 records.
722   ASSERT_EQ(40, cfilter_count);
723 
724   {
725     // Scan the entire database as of the snapshot to ensure
726     // that nothing is left
727     ReadOptions read_options;
728     read_options.snapshot = snapshot;
729     std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
730     iter->SeekToFirst();
731     int count = 0;
732     while (iter->Valid()) {
733       count++;
734       iter->Next();
735     }
736     ASSERT_EQ(count, 6);
737     read_options.snapshot = nullptr;
738     std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options));
739     iter1->SeekToFirst();
740     count = 0;
741     while (iter1->Valid()) {
742       count++;
743       iter1->Next();
744     }
745     // We have deleted 10 keys from 40 using the compaction filter
746     //  Keys 6-9 before the snapshot and 100-105 after the snapshot
747     ASSERT_EQ(count, 30);
748   }
749 
750   // Release the snapshot and compact again -> now all records should be
751   // removed.
752   db_->ReleaseSnapshot(snapshot);
753 }
754 #endif  // ROCKSDB_LITE
755 
TEST_F(DBTestCompactionFilter,SkipUntil)756 TEST_F(DBTestCompactionFilter, SkipUntil) {
757   Options options = CurrentOptions();
758   options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
759   options.disable_auto_compactions = true;
760   options.create_if_missing = true;
761   DestroyAndReopen(options);
762 
763   // Write 100K keys, these are written to a few files in L0.
764   for (int table = 0; table < 4; ++table) {
765     // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371].
766     for (int i = table * 6; i < 39 + table * 11; ++i) {
767       char key[100];
768       snprintf(key, sizeof(key), "%010d", table * 100 + i);
769       Put(key, std::to_string(table * 1000 + i));
770     }
771     Flush();
772   }
773 
774   cfilter_skips = 0;
775   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
776   // Number of skips in tables: 2, 3, 3, 3.
777   ASSERT_EQ(11, cfilter_skips);
778 
779   for (int table = 0; table < 4; ++table) {
780     for (int i = table * 6; i < 39 + table * 11; ++i) {
781       int k = table * 100 + i;
782       char key[100];
783       snprintf(key, sizeof(key), "%010d", table * 100 + i);
784       auto expected = std::to_string(table * 1000 + i);
785       std::string val;
786       Status s = db_->Get(ReadOptions(), key, &val);
787       if (k / 10 % 2 == 0) {
788         ASSERT_TRUE(s.IsNotFound());
789       } else {
790         ASSERT_OK(s);
791         ASSERT_EQ(expected, val);
792       }
793     }
794   }
795 }
796 
TEST_F(DBTestCompactionFilter,SkipUntilWithBloomFilter)797 TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) {
798   BlockBasedTableOptions table_options;
799   table_options.whole_key_filtering = false;
800   table_options.filter_policy.reset(NewBloomFilterPolicy(100, false));
801 
802   Options options = CurrentOptions();
803   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
804   options.prefix_extractor.reset(NewCappedPrefixTransform(9));
805   options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
806   options.disable_auto_compactions = true;
807   options.create_if_missing = true;
808   DestroyAndReopen(options);
809 
810   Put("0000000010", "v10");
811   Put("0000000020", "v20");  // skipped
812   Put("0000000050", "v50");
813   Flush();
814 
815   cfilter_skips = 0;
816   EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
817   EXPECT_EQ(1, cfilter_skips);
818 
819   Status s;
820   std::string val;
821 
822   s = db_->Get(ReadOptions(), "0000000010", &val);
823   ASSERT_OK(s);
824   EXPECT_EQ("v10", val);
825 
826   s = db_->Get(ReadOptions(), "0000000020", &val);
827   EXPECT_TRUE(s.IsNotFound());
828 
829   s = db_->Get(ReadOptions(), "0000000050", &val);
830   ASSERT_OK(s);
831   EXPECT_EQ("v50", val);
832 }
833 
834 class TestNotSupportedFilter : public CompactionFilter {
835  public:
Filter(int,const Slice &,const Slice &,std::string *,bool *) const836   bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
837               std::string* /*new_value*/,
838               bool* /*value_changed*/) const override {
839     return true;
840   }
841 
Name() const842   const char* Name() const override { return "NotSupported"; }
IgnoreSnapshots() const843   bool IgnoreSnapshots() const override { return false; }
844 };
845 
TEST_F(DBTestCompactionFilter,IgnoreSnapshotsFalse)846 TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalse) {
847   Options options = CurrentOptions();
848   options.compaction_filter = new TestNotSupportedFilter();
849   DestroyAndReopen(options);
850 
851   Put("a", "v10");
852   Put("z", "v20");
853   Flush();
854 
855   Put("a", "v10");
856   Put("z", "v20");
857   Flush();
858 
859   // Comapction should fail because IgnoreSnapshots() = false
860   EXPECT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
861                   .IsNotSupported());
862 
863   delete options.compaction_filter;
864 }
865 
866 }  // namespace ROCKSDB_NAMESPACE
867 
main(int argc,char ** argv)868 int main(int argc, char** argv) {
869   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
870   ::testing::InitGoogleTest(&argc, argv);
871   return RUN_ALL_TESTS();
872 }
873