1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include <memory>
7 #include <string>
8 
9 #include "db/db_test_util.h"
10 #include "db/memtable.h"
11 #include "db/range_del_aggregator.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/memtablerep.h"
14 #include "rocksdb/slice_transform.h"
15 
16 namespace ROCKSDB_NAMESPACE {
17 
18 class DBMemTableTest : public DBTestBase {
19  public:
DBMemTableTest()20   DBMemTableTest() : DBTestBase("/db_memtable_test") {}
21 };
22 
23 class MockMemTableRep : public MemTableRep {
24  public:
MockMemTableRep(Allocator * allocator,MemTableRep * rep)25   explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep)
26       : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {}
27 
Allocate(const size_t len,char ** buf)28   KeyHandle Allocate(const size_t len, char** buf) override {
29     return rep_->Allocate(len, buf);
30   }
31 
Insert(KeyHandle handle)32   void Insert(KeyHandle handle) override { rep_->Insert(handle); }
33 
InsertWithHint(KeyHandle handle,void ** hint)34   void InsertWithHint(KeyHandle handle, void** hint) override {
35     num_insert_with_hint_++;
36     EXPECT_NE(nullptr, hint);
37     last_hint_in_ = *hint;
38     rep_->InsertWithHint(handle, hint);
39     last_hint_out_ = *hint;
40   }
41 
Contains(const char * key) const42   bool Contains(const char* key) const override { return rep_->Contains(key); }
43 
Get(const LookupKey & k,void * callback_args,bool (* callback_func)(void * arg,const char * entry))44   void Get(const LookupKey& k, void* callback_args,
45            bool (*callback_func)(void* arg, const char* entry)) override {
46     rep_->Get(k, callback_args, callback_func);
47   }
48 
ApproximateMemoryUsage()49   size_t ApproximateMemoryUsage() override {
50     return rep_->ApproximateMemoryUsage();
51   }
52 
GetIterator(Arena * arena)53   Iterator* GetIterator(Arena* arena) override {
54     return rep_->GetIterator(arena);
55   }
56 
last_hint_in()57   void* last_hint_in() { return last_hint_in_; }
last_hint_out()58   void* last_hint_out() { return last_hint_out_; }
num_insert_with_hint()59   int num_insert_with_hint() { return num_insert_with_hint_; }
60 
61  private:
62   std::unique_ptr<MemTableRep> rep_;
63   void* last_hint_in_;
64   void* last_hint_out_;
65   int num_insert_with_hint_;
66 };
67 
68 class MockMemTableRepFactory : public MemTableRepFactory {
69  public:
CreateMemTableRep(const MemTableRep::KeyComparator & cmp,Allocator * allocator,const SliceTransform * transform,Logger * logger)70   MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
71                                  Allocator* allocator,
72                                  const SliceTransform* transform,
73                                  Logger* logger) override {
74     SkipListFactory factory;
75     MemTableRep* skiplist_rep =
76         factory.CreateMemTableRep(cmp, allocator, transform, logger);
77     mock_rep_ = new MockMemTableRep(allocator, skiplist_rep);
78     return mock_rep_;
79   }
80 
CreateMemTableRep(const MemTableRep::KeyComparator & cmp,Allocator * allocator,const SliceTransform * transform,Logger * logger,uint32_t column_family_id)81   MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
82                                  Allocator* allocator,
83                                  const SliceTransform* transform,
84                                  Logger* logger,
85                                  uint32_t column_family_id) override {
86     last_column_family_id_ = column_family_id;
87     return CreateMemTableRep(cmp, allocator, transform, logger);
88   }
89 
Name() const90   const char* Name() const override { return "MockMemTableRepFactory"; }
91 
rep()92   MockMemTableRep* rep() { return mock_rep_; }
93 
IsInsertConcurrentlySupported() const94   bool IsInsertConcurrentlySupported() const override { return false; }
95 
GetLastColumnFamilyId()96   uint32_t GetLastColumnFamilyId() { return last_column_family_id_; }
97 
98  private:
99   MockMemTableRep* mock_rep_;
100   // workaround since there's no port::kMaxUint32 yet.
101   uint32_t last_column_family_id_ = static_cast<uint32_t>(-1);
102 };
103 
104 class TestPrefixExtractor : public SliceTransform {
105  public:
Name() const106   const char* Name() const override { return "TestPrefixExtractor"; }
107 
Transform(const Slice & key) const108   Slice Transform(const Slice& key) const override {
109     const char* p = separator(key);
110     if (p == nullptr) {
111       return Slice();
112     }
113     return Slice(key.data(), p - key.data() + 1);
114   }
115 
InDomain(const Slice & key) const116   bool InDomain(const Slice& key) const override {
117     return separator(key) != nullptr;
118   }
119 
InRange(const Slice &) const120   bool InRange(const Slice& /*key*/) const override { return false; }
121 
122  private:
separator(const Slice & key) const123   const char* separator(const Slice& key) const {
124     return reinterpret_cast<const char*>(memchr(key.data(), '_', key.size()));
125   }
126 };
127 
128 // Test that ::Add properly returns false when inserting duplicate keys
TEST_F(DBMemTableTest,DuplicateSeq)129 TEST_F(DBMemTableTest, DuplicateSeq) {
130   SequenceNumber seq = 123;
131   std::string value;
132   Status s;
133   MergeContext merge_context;
134   Options options;
135   InternalKeyComparator ikey_cmp(options.comparator);
136   ReadRangeDelAggregator range_del_agg(&ikey_cmp,
137                                        kMaxSequenceNumber /* upper_bound */);
138 
139   // Create a MemTable
140   InternalKeyComparator cmp(BytewiseComparator());
141   auto factory = std::make_shared<SkipListFactory>();
142   options.memtable_factory = factory;
143   ImmutableCFOptions ioptions(options);
144   WriteBufferManager wb(options.db_write_buffer_size);
145   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
146                                kMaxSequenceNumber, 0 /* column_family_id */);
147 
148   // Write some keys and make sure it returns false on duplicates
149   bool res;
150   res = mem->Add(seq, kTypeValue, "key", "value2");
151   ASSERT_TRUE(res);
152   res = mem->Add(seq, kTypeValue, "key", "value2");
153   ASSERT_FALSE(res);
154   // Changing the type should still cause the duplicatae key
155   res = mem->Add(seq, kTypeMerge, "key", "value2");
156   ASSERT_FALSE(res);
157   // Changing the seq number will make the key fresh
158   res = mem->Add(seq + 1, kTypeMerge, "key", "value2");
159   ASSERT_TRUE(res);
160   // Test with different types for duplicate keys
161   res = mem->Add(seq, kTypeDeletion, "key", "");
162   ASSERT_FALSE(res);
163   res = mem->Add(seq, kTypeSingleDeletion, "key", "");
164   ASSERT_FALSE(res);
165 
166   // Test the duplicate keys under stress
167   for (int i = 0; i < 10000; i++) {
168     bool insert_dup = i % 10 == 1;
169     if (!insert_dup) {
170       seq++;
171     }
172     res = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq));
173     if (insert_dup) {
174       ASSERT_FALSE(res);
175     } else {
176       ASSERT_TRUE(res);
177     }
178   }
179   delete mem;
180 
181   // Test with InsertWithHint
182   options.memtable_insert_with_hint_prefix_extractor.reset(
183       new TestPrefixExtractor());  // which uses _ to extract the prefix
184   ioptions = ImmutableCFOptions(options);
185   mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
186                      kMaxSequenceNumber, 0 /* column_family_id */);
187   // Insert a duplicate key with _ in it
188   res = mem->Add(seq, kTypeValue, "key_1", "value");
189   ASSERT_TRUE(res);
190   res = mem->Add(seq, kTypeValue, "key_1", "value");
191   ASSERT_FALSE(res);
192   delete mem;
193 
194   // Test when InsertConcurrently will be invoked
195   options.allow_concurrent_memtable_write = true;
196   ioptions = ImmutableCFOptions(options);
197   mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
198                      kMaxSequenceNumber, 0 /* column_family_id */);
199   MemTablePostProcessInfo post_process_info;
200   res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
201   ASSERT_TRUE(res);
202   res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
203   ASSERT_FALSE(res);
204   delete mem;
205 }
206 
207 // A simple test to verify that the concurrent merge writes is functional
TEST_F(DBMemTableTest,ConcurrentMergeWrite)208 TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
209   int num_ops = 1000;
210   std::string value;
211   Status s;
212   MergeContext merge_context;
213   Options options;
214   // A merge operator that is not sensitive to concurrent writes since in this
215   // test we don't order the writes.
216   options.merge_operator = MergeOperators::CreateUInt64AddOperator();
217 
218   // Create a MemTable
219   InternalKeyComparator cmp(BytewiseComparator());
220   auto factory = std::make_shared<SkipListFactory>();
221   options.memtable_factory = factory;
222   options.allow_concurrent_memtable_write = true;
223   ImmutableCFOptions ioptions(options);
224   WriteBufferManager wb(options.db_write_buffer_size);
225   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
226                                kMaxSequenceNumber, 0 /* column_family_id */);
227 
228   // Put 0 as the base
229   PutFixed64(&value, static_cast<uint64_t>(0));
230   bool res = mem->Add(0, kTypeValue, "key", value);
231   ASSERT_TRUE(res);
232   value.clear();
233 
234   // Write Merge concurrently
235   ROCKSDB_NAMESPACE::port::Thread write_thread1([&]() {
236     MemTablePostProcessInfo post_process_info1;
237     std::string v1;
238     for (int seq = 1; seq < num_ops / 2; seq++) {
239       PutFixed64(&v1, seq);
240       bool res1 =
241           mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info1);
242       ASSERT_TRUE(res1);
243       v1.clear();
244     }
245   });
246   ROCKSDB_NAMESPACE::port::Thread write_thread2([&]() {
247     MemTablePostProcessInfo post_process_info2;
248     std::string v2;
249     for (int seq = num_ops / 2; seq < num_ops; seq++) {
250       PutFixed64(&v2, seq);
251       bool res2 =
252           mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info2);
253       ASSERT_TRUE(res2);
254       v2.clear();
255     }
256   });
257   write_thread1.join();
258   write_thread2.join();
259 
260   Status status;
261   ReadOptions roptions;
262   SequenceNumber max_covering_tombstone_seq = 0;
263   LookupKey lkey("key", kMaxSequenceNumber);
264   res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status, &merge_context,
265                  &max_covering_tombstone_seq, roptions);
266   ASSERT_TRUE(res);
267   uint64_t ivalue = DecodeFixed64(Slice(value).data());
268   uint64_t sum = 0;
269   for (int seq = 0; seq < num_ops; seq++) {
270     sum += seq;
271   }
272   ASSERT_EQ(ivalue, sum);
273 
274   delete mem;
275 }
276 
TEST_F(DBMemTableTest,InsertWithHint)277 TEST_F(DBMemTableTest, InsertWithHint) {
278   Options options;
279   options.allow_concurrent_memtable_write = false;
280   options.create_if_missing = true;
281   options.memtable_factory.reset(new MockMemTableRepFactory());
282   options.memtable_insert_with_hint_prefix_extractor.reset(
283       new TestPrefixExtractor());
284   options.env = env_;
285   Reopen(options);
286   MockMemTableRep* rep =
287       reinterpret_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
288           ->rep();
289   ASSERT_OK(Put("foo_k1", "foo_v1"));
290   ASSERT_EQ(nullptr, rep->last_hint_in());
291   void* hint_foo = rep->last_hint_out();
292   ASSERT_OK(Put("foo_k2", "foo_v2"));
293   ASSERT_EQ(hint_foo, rep->last_hint_in());
294   ASSERT_EQ(hint_foo, rep->last_hint_out());
295   ASSERT_OK(Put("foo_k3", "foo_v3"));
296   ASSERT_EQ(hint_foo, rep->last_hint_in());
297   ASSERT_EQ(hint_foo, rep->last_hint_out());
298   ASSERT_OK(Put("bar_k1", "bar_v1"));
299   ASSERT_EQ(nullptr, rep->last_hint_in());
300   void* hint_bar = rep->last_hint_out();
301   ASSERT_NE(hint_foo, hint_bar);
302   ASSERT_OK(Put("bar_k2", "bar_v2"));
303   ASSERT_EQ(hint_bar, rep->last_hint_in());
304   ASSERT_EQ(hint_bar, rep->last_hint_out());
305   ASSERT_EQ(5, rep->num_insert_with_hint());
306   ASSERT_OK(Put("whitelisted", "vvv"));
307   ASSERT_EQ(5, rep->num_insert_with_hint());
308   ASSERT_EQ("foo_v1", Get("foo_k1"));
309   ASSERT_EQ("foo_v2", Get("foo_k2"));
310   ASSERT_EQ("foo_v3", Get("foo_k3"));
311   ASSERT_EQ("bar_v1", Get("bar_k1"));
312   ASSERT_EQ("bar_v2", Get("bar_k2"));
313   ASSERT_EQ("vvv", Get("whitelisted"));
314 }
315 
TEST_F(DBMemTableTest,ColumnFamilyId)316 TEST_F(DBMemTableTest, ColumnFamilyId) {
317   // Verifies MemTableRepFactory is told the right column family id.
318   Options options;
319   options.allow_concurrent_memtable_write = false;
320   options.create_if_missing = true;
321   options.memtable_factory.reset(new MockMemTableRepFactory());
322   DestroyAndReopen(options);
323   CreateAndReopenWithCF({"pikachu"}, options);
324 
325   for (uint32_t cf = 0; cf < 2; ++cf) {
326     ASSERT_OK(Put(cf, "key", "val"));
327     ASSERT_OK(Flush(cf));
328     ASSERT_EQ(
329         cf, static_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
330                 ->GetLastColumnFamilyId());
331   }
332 }
333 
334 }  // namespace ROCKSDB_NAMESPACE
335 
main(int argc,char ** argv)336 int main(int argc, char** argv) {
337   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
338   ::testing::InitGoogleTest(&argc, argv);
339   return RUN_ALL_TESTS();
340 }
341