1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <memory>
7
8 #include "rocksdb/merge_operator.h"
9 #include "rocksdb/slice.h"
10 #include "utilities/merge_operators.h"
11
12 using ROCKSDB_NAMESPACE::Logger;
13 using ROCKSDB_NAMESPACE::MergeOperator;
14 using ROCKSDB_NAMESPACE::Slice;
15
16 namespace { // anonymous namespace
17
18 // Merge operator that picks the maximum operand, Comparison is based on
19 // Slice::compare
20 class MaxOperator : public MergeOperator {
21 public:
FullMergeV2(const MergeOperationInput & merge_in,MergeOperationOutput * merge_out) const22 bool FullMergeV2(const MergeOperationInput& merge_in,
23 MergeOperationOutput* merge_out) const override {
24 Slice& max = merge_out->existing_operand;
25 if (merge_in.existing_value) {
26 max = Slice(merge_in.existing_value->data(),
27 merge_in.existing_value->size());
28 } else if (max.data() == nullptr) {
29 max = Slice();
30 }
31
32 for (const auto& op : merge_in.operand_list) {
33 if (max.compare(op) < 0) {
34 max = op;
35 }
36 }
37
38 return true;
39 }
40
PartialMerge(const Slice &,const Slice & left_operand,const Slice & right_operand,std::string * new_value,Logger *) const41 bool PartialMerge(const Slice& /*key*/, const Slice& left_operand,
42 const Slice& right_operand, std::string* new_value,
43 Logger* /*logger*/) const override {
44 if (left_operand.compare(right_operand) >= 0) {
45 new_value->assign(left_operand.data(), left_operand.size());
46 } else {
47 new_value->assign(right_operand.data(), right_operand.size());
48 }
49 return true;
50 }
51
PartialMergeMulti(const Slice &,const std::deque<Slice> & operand_list,std::string * new_value,Logger *) const52 bool PartialMergeMulti(const Slice& /*key*/,
53 const std::deque<Slice>& operand_list,
54 std::string* new_value,
55 Logger* /*logger*/) const override {
56 Slice max;
57 for (const auto& operand : operand_list) {
58 if (max.compare(operand) < 0) {
59 max = operand;
60 }
61 }
62
63 new_value->assign(max.data(), max.size());
64 return true;
65 }
66
Name() const67 const char* Name() const override { return "MaxOperator"; }
68 };
69
70 } // end of anonymous namespace
71
72 namespace ROCKSDB_NAMESPACE {
73
CreateMaxOperator()74 std::shared_ptr<MergeOperator> MergeOperators::CreateMaxOperator() {
75 return std::make_shared<MaxOperator>();
76 }
77 } // namespace ROCKSDB_NAMESPACE
78