1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <memory>
7 #include <string>
8
9 #include "db/db_test_util.h"
10 #include "db/memtable.h"
11 #include "db/range_del_aggregator.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/memtablerep.h"
14 #include "rocksdb/slice_transform.h"
15
16 namespace ROCKSDB_NAMESPACE {
17
18 class DBMemTableTest : public DBTestBase {
19 public:
DBMemTableTest()20 DBMemTableTest() : DBTestBase("/db_memtable_test") {}
21 };
22
23 class MockMemTableRep : public MemTableRep {
24 public:
MockMemTableRep(Allocator * allocator,MemTableRep * rep)25 explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep)
26 : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {}
27
Allocate(const size_t len,char ** buf)28 KeyHandle Allocate(const size_t len, char** buf) override {
29 return rep_->Allocate(len, buf);
30 }
31
Insert(KeyHandle handle)32 void Insert(KeyHandle handle) override { rep_->Insert(handle); }
33
InsertWithHint(KeyHandle handle,void ** hint)34 void InsertWithHint(KeyHandle handle, void** hint) override {
35 num_insert_with_hint_++;
36 EXPECT_NE(nullptr, hint);
37 last_hint_in_ = *hint;
38 rep_->InsertWithHint(handle, hint);
39 last_hint_out_ = *hint;
40 }
41
Contains(const char * key) const42 bool Contains(const char* key) const override { return rep_->Contains(key); }
43
Get(const LookupKey & k,void * callback_args,bool (* callback_func)(void * arg,const char * entry))44 void Get(const LookupKey& k, void* callback_args,
45 bool (*callback_func)(void* arg, const char* entry)) override {
46 rep_->Get(k, callback_args, callback_func);
47 }
48
ApproximateMemoryUsage()49 size_t ApproximateMemoryUsage() override {
50 return rep_->ApproximateMemoryUsage();
51 }
52
GetIterator(Arena * arena)53 Iterator* GetIterator(Arena* arena) override {
54 return rep_->GetIterator(arena);
55 }
56
last_hint_in()57 void* last_hint_in() { return last_hint_in_; }
last_hint_out()58 void* last_hint_out() { return last_hint_out_; }
num_insert_with_hint()59 int num_insert_with_hint() { return num_insert_with_hint_; }
60
61 private:
62 std::unique_ptr<MemTableRep> rep_;
63 void* last_hint_in_;
64 void* last_hint_out_;
65 int num_insert_with_hint_;
66 };
67
68 class MockMemTableRepFactory : public MemTableRepFactory {
69 public:
CreateMemTableRep(const MemTableRep::KeyComparator & cmp,Allocator * allocator,const SliceTransform * transform,Logger * logger)70 MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
71 Allocator* allocator,
72 const SliceTransform* transform,
73 Logger* logger) override {
74 SkipListFactory factory;
75 MemTableRep* skiplist_rep =
76 factory.CreateMemTableRep(cmp, allocator, transform, logger);
77 mock_rep_ = new MockMemTableRep(allocator, skiplist_rep);
78 return mock_rep_;
79 }
80
CreateMemTableRep(const MemTableRep::KeyComparator & cmp,Allocator * allocator,const SliceTransform * transform,Logger * logger,uint32_t column_family_id)81 MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
82 Allocator* allocator,
83 const SliceTransform* transform,
84 Logger* logger,
85 uint32_t column_family_id) override {
86 last_column_family_id_ = column_family_id;
87 return CreateMemTableRep(cmp, allocator, transform, logger);
88 }
89
Name() const90 const char* Name() const override { return "MockMemTableRepFactory"; }
91
rep()92 MockMemTableRep* rep() { return mock_rep_; }
93
IsInsertConcurrentlySupported() const94 bool IsInsertConcurrentlySupported() const override { return false; }
95
GetLastColumnFamilyId()96 uint32_t GetLastColumnFamilyId() { return last_column_family_id_; }
97
98 private:
99 MockMemTableRep* mock_rep_;
100 // workaround since there's no port::kMaxUint32 yet.
101 uint32_t last_column_family_id_ = static_cast<uint32_t>(-1);
102 };
103
104 class TestPrefixExtractor : public SliceTransform {
105 public:
Name() const106 const char* Name() const override { return "TestPrefixExtractor"; }
107
Transform(const Slice & key) const108 Slice Transform(const Slice& key) const override {
109 const char* p = separator(key);
110 if (p == nullptr) {
111 return Slice();
112 }
113 return Slice(key.data(), p - key.data() + 1);
114 }
115
InDomain(const Slice & key) const116 bool InDomain(const Slice& key) const override {
117 return separator(key) != nullptr;
118 }
119
InRange(const Slice &) const120 bool InRange(const Slice& /*key*/) const override { return false; }
121
122 private:
separator(const Slice & key) const123 const char* separator(const Slice& key) const {
124 return reinterpret_cast<const char*>(memchr(key.data(), '_', key.size()));
125 }
126 };
127
128 // Test that ::Add properly returns false when inserting duplicate keys
TEST_F(DBMemTableTest,DuplicateSeq)129 TEST_F(DBMemTableTest, DuplicateSeq) {
130 SequenceNumber seq = 123;
131 std::string value;
132 Status s;
133 MergeContext merge_context;
134 Options options;
135 InternalKeyComparator ikey_cmp(options.comparator);
136 ReadRangeDelAggregator range_del_agg(&ikey_cmp,
137 kMaxSequenceNumber /* upper_bound */);
138
139 // Create a MemTable
140 InternalKeyComparator cmp(BytewiseComparator());
141 auto factory = std::make_shared<SkipListFactory>();
142 options.memtable_factory = factory;
143 ImmutableCFOptions ioptions(options);
144 WriteBufferManager wb(options.db_write_buffer_size);
145 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
146 kMaxSequenceNumber, 0 /* column_family_id */);
147
148 // Write some keys and make sure it returns false on duplicates
149 bool res;
150 res = mem->Add(seq, kTypeValue, "key", "value2");
151 ASSERT_TRUE(res);
152 res = mem->Add(seq, kTypeValue, "key", "value2");
153 ASSERT_FALSE(res);
154 // Changing the type should still cause the duplicatae key
155 res = mem->Add(seq, kTypeMerge, "key", "value2");
156 ASSERT_FALSE(res);
157 // Changing the seq number will make the key fresh
158 res = mem->Add(seq + 1, kTypeMerge, "key", "value2");
159 ASSERT_TRUE(res);
160 // Test with different types for duplicate keys
161 res = mem->Add(seq, kTypeDeletion, "key", "");
162 ASSERT_FALSE(res);
163 res = mem->Add(seq, kTypeSingleDeletion, "key", "");
164 ASSERT_FALSE(res);
165
166 // Test the duplicate keys under stress
167 for (int i = 0; i < 10000; i++) {
168 bool insert_dup = i % 10 == 1;
169 if (!insert_dup) {
170 seq++;
171 }
172 res = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq));
173 if (insert_dup) {
174 ASSERT_FALSE(res);
175 } else {
176 ASSERT_TRUE(res);
177 }
178 }
179 delete mem;
180
181 // Test with InsertWithHint
182 options.memtable_insert_with_hint_prefix_extractor.reset(
183 new TestPrefixExtractor()); // which uses _ to extract the prefix
184 ioptions = ImmutableCFOptions(options);
185 mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
186 kMaxSequenceNumber, 0 /* column_family_id */);
187 // Insert a duplicate key with _ in it
188 res = mem->Add(seq, kTypeValue, "key_1", "value");
189 ASSERT_TRUE(res);
190 res = mem->Add(seq, kTypeValue, "key_1", "value");
191 ASSERT_FALSE(res);
192 delete mem;
193
194 // Test when InsertConcurrently will be invoked
195 options.allow_concurrent_memtable_write = true;
196 ioptions = ImmutableCFOptions(options);
197 mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
198 kMaxSequenceNumber, 0 /* column_family_id */);
199 MemTablePostProcessInfo post_process_info;
200 res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
201 ASSERT_TRUE(res);
202 res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
203 ASSERT_FALSE(res);
204 delete mem;
205 }
206
207 // A simple test to verify that the concurrent merge writes is functional
TEST_F(DBMemTableTest,ConcurrentMergeWrite)208 TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
209 int num_ops = 1000;
210 std::string value;
211 Status s;
212 MergeContext merge_context;
213 Options options;
214 // A merge operator that is not sensitive to concurrent writes since in this
215 // test we don't order the writes.
216 options.merge_operator = MergeOperators::CreateUInt64AddOperator();
217
218 // Create a MemTable
219 InternalKeyComparator cmp(BytewiseComparator());
220 auto factory = std::make_shared<SkipListFactory>();
221 options.memtable_factory = factory;
222 options.allow_concurrent_memtable_write = true;
223 ImmutableCFOptions ioptions(options);
224 WriteBufferManager wb(options.db_write_buffer_size);
225 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
226 kMaxSequenceNumber, 0 /* column_family_id */);
227
228 // Put 0 as the base
229 PutFixed64(&value, static_cast<uint64_t>(0));
230 bool res = mem->Add(0, kTypeValue, "key", value);
231 ASSERT_TRUE(res);
232 value.clear();
233
234 // Write Merge concurrently
235 ROCKSDB_NAMESPACE::port::Thread write_thread1([&]() {
236 MemTablePostProcessInfo post_process_info1;
237 std::string v1;
238 for (int seq = 1; seq < num_ops / 2; seq++) {
239 PutFixed64(&v1, seq);
240 bool res1 =
241 mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info1);
242 ASSERT_TRUE(res1);
243 v1.clear();
244 }
245 });
246 ROCKSDB_NAMESPACE::port::Thread write_thread2([&]() {
247 MemTablePostProcessInfo post_process_info2;
248 std::string v2;
249 for (int seq = num_ops / 2; seq < num_ops; seq++) {
250 PutFixed64(&v2, seq);
251 bool res2 =
252 mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info2);
253 ASSERT_TRUE(res2);
254 v2.clear();
255 }
256 });
257 write_thread1.join();
258 write_thread2.join();
259
260 Status status;
261 ReadOptions roptions;
262 SequenceNumber max_covering_tombstone_seq = 0;
263 LookupKey lkey("key", kMaxSequenceNumber);
264 res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status, &merge_context,
265 &max_covering_tombstone_seq, roptions);
266 ASSERT_TRUE(res);
267 uint64_t ivalue = DecodeFixed64(Slice(value).data());
268 uint64_t sum = 0;
269 for (int seq = 0; seq < num_ops; seq++) {
270 sum += seq;
271 }
272 ASSERT_EQ(ivalue, sum);
273
274 delete mem;
275 }
276
TEST_F(DBMemTableTest,InsertWithHint)277 TEST_F(DBMemTableTest, InsertWithHint) {
278 Options options;
279 options.allow_concurrent_memtable_write = false;
280 options.create_if_missing = true;
281 options.memtable_factory.reset(new MockMemTableRepFactory());
282 options.memtable_insert_with_hint_prefix_extractor.reset(
283 new TestPrefixExtractor());
284 options.env = env_;
285 Reopen(options);
286 MockMemTableRep* rep =
287 reinterpret_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
288 ->rep();
289 ASSERT_OK(Put("foo_k1", "foo_v1"));
290 ASSERT_EQ(nullptr, rep->last_hint_in());
291 void* hint_foo = rep->last_hint_out();
292 ASSERT_OK(Put("foo_k2", "foo_v2"));
293 ASSERT_EQ(hint_foo, rep->last_hint_in());
294 ASSERT_EQ(hint_foo, rep->last_hint_out());
295 ASSERT_OK(Put("foo_k3", "foo_v3"));
296 ASSERT_EQ(hint_foo, rep->last_hint_in());
297 ASSERT_EQ(hint_foo, rep->last_hint_out());
298 ASSERT_OK(Put("bar_k1", "bar_v1"));
299 ASSERT_EQ(nullptr, rep->last_hint_in());
300 void* hint_bar = rep->last_hint_out();
301 ASSERT_NE(hint_foo, hint_bar);
302 ASSERT_OK(Put("bar_k2", "bar_v2"));
303 ASSERT_EQ(hint_bar, rep->last_hint_in());
304 ASSERT_EQ(hint_bar, rep->last_hint_out());
305 ASSERT_EQ(5, rep->num_insert_with_hint());
306 ASSERT_OK(Put("whitelisted", "vvv"));
307 ASSERT_EQ(5, rep->num_insert_with_hint());
308 ASSERT_EQ("foo_v1", Get("foo_k1"));
309 ASSERT_EQ("foo_v2", Get("foo_k2"));
310 ASSERT_EQ("foo_v3", Get("foo_k3"));
311 ASSERT_EQ("bar_v1", Get("bar_k1"));
312 ASSERT_EQ("bar_v2", Get("bar_k2"));
313 ASSERT_EQ("vvv", Get("whitelisted"));
314 }
315
TEST_F(DBMemTableTest,ColumnFamilyId)316 TEST_F(DBMemTableTest, ColumnFamilyId) {
317 // Verifies MemTableRepFactory is told the right column family id.
318 Options options;
319 options.allow_concurrent_memtable_write = false;
320 options.create_if_missing = true;
321 options.memtable_factory.reset(new MockMemTableRepFactory());
322 DestroyAndReopen(options);
323 CreateAndReopenWithCF({"pikachu"}, options);
324
325 for (uint32_t cf = 0; cf < 2; ++cf) {
326 ASSERT_OK(Put(cf, "key", "val"));
327 ASSERT_OK(Flush(cf));
328 ASSERT_EQ(
329 cf, static_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
330 ->GetLastColumnFamilyId());
331 }
332 }
333
334 } // namespace ROCKSDB_NAMESPACE
335
main(int argc,char ** argv)336 int main(int argc, char** argv) {
337 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
338 ::testing::InitGoogleTest(&argc, argv);
339 return RUN_ALL_TESTS();
340 }
341