1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include <rocksdb/compaction_filter.h>
7 #include <rocksdb/db.h>
8 #include <rocksdb/merge_operator.h>
9 #include <rocksdb/options.h>
10 
11 class MyMerge : public ROCKSDB_NAMESPACE::MergeOperator {
12  public:
FullMergeV2(const MergeOperationInput & merge_in,MergeOperationOutput * merge_out) const13   virtual bool FullMergeV2(const MergeOperationInput& merge_in,
14                            MergeOperationOutput* merge_out) const override {
15     merge_out->new_value.clear();
16     if (merge_in.existing_value != nullptr) {
17       merge_out->new_value.assign(merge_in.existing_value->data(),
18                                   merge_in.existing_value->size());
19     }
20     for (const ROCKSDB_NAMESPACE::Slice& m : merge_in.operand_list) {
21       fprintf(stderr, "Merge(%s)\n", m.ToString().c_str());
22       // the compaction filter filters out bad values
23       assert(m.ToString() != "bad");
24       merge_out->new_value.assign(m.data(), m.size());
25     }
26     return true;
27   }
28 
Name() const29   const char* Name() const override { return "MyMerge"; }
30 };
31 
32 class MyFilter : public ROCKSDB_NAMESPACE::CompactionFilter {
33  public:
Filter(int level,const ROCKSDB_NAMESPACE::Slice & key,const ROCKSDB_NAMESPACE::Slice & existing_value,std::string * new_value,bool * value_changed) const34   bool Filter(int level, const ROCKSDB_NAMESPACE::Slice& key,
35               const ROCKSDB_NAMESPACE::Slice& existing_value,
36               std::string* new_value, bool* value_changed) const override {
37     fprintf(stderr, "Filter(%s)\n", key.ToString().c_str());
38     ++count_;
39     assert(*value_changed == false);
40     return false;
41   }
42 
FilterMergeOperand(int level,const ROCKSDB_NAMESPACE::Slice & key,const ROCKSDB_NAMESPACE::Slice & existing_value) const43   bool FilterMergeOperand(
44       int level, const ROCKSDB_NAMESPACE::Slice& key,
45       const ROCKSDB_NAMESPACE::Slice& existing_value) const override {
46     fprintf(stderr, "FilterMerge(%s)\n", key.ToString().c_str());
47     ++merge_count_;
48     return existing_value == "bad";
49   }
50 
Name() const51   const char* Name() const override { return "MyFilter"; }
52 
53   mutable int count_ = 0;
54   mutable int merge_count_ = 0;
55 };
56 
main()57 int main() {
58   ROCKSDB_NAMESPACE::DB* raw_db;
59   ROCKSDB_NAMESPACE::Status status;
60 
61   MyFilter filter;
62 
63   int ret = system("rm -rf /tmp/rocksmergetest");
64   if (ret != 0) {
65     fprintf(stderr, "Error deleting /tmp/rocksmergetest, code: %d\n", ret);
66     return ret;
67   }
68   ROCKSDB_NAMESPACE::Options options;
69   options.create_if_missing = true;
70   options.merge_operator.reset(new MyMerge);
71   options.compaction_filter = &filter;
72   status = ROCKSDB_NAMESPACE::DB::Open(options, "/tmp/rocksmergetest", &raw_db);
73   assert(status.ok());
74   std::unique_ptr<ROCKSDB_NAMESPACE::DB> db(raw_db);
75 
76   ROCKSDB_NAMESPACE::WriteOptions wopts;
77   db->Merge(wopts, "0", "bad");  // This is filtered out
78   db->Merge(wopts, "1", "data1");
79   db->Merge(wopts, "1", "bad");
80   db->Merge(wopts, "1", "data2");
81   db->Merge(wopts, "1", "bad");
82   db->Merge(wopts, "3", "data3");
83   db->CompactRange(ROCKSDB_NAMESPACE::CompactRangeOptions(), nullptr, nullptr);
84   fprintf(stderr, "filter.count_ = %d\n", filter.count_);
85   assert(filter.count_ == 0);
86   fprintf(stderr, "filter.merge_count_ = %d\n", filter.merge_count_);
87   assert(filter.merge_count_ == 6);
88 }
89