1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/memtable_list.h"
7 #include <algorithm>
8 #include <string>
9 #include <vector>
10 #include "db/merge_context.h"
11 #include "db/version_set.h"
12 #include "db/write_controller.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/status.h"
15 #include "rocksdb/write_buffer_manager.h"
16 #include "test_util/testharness.h"
17 #include "test_util/testutil.h"
18 #include "util/string_util.h"
19
20 namespace ROCKSDB_NAMESPACE {
21
22 class MemTableListTest : public testing::Test {
23 public:
24 std::string dbname;
25 DB* db;
26 Options options;
27 std::vector<ColumnFamilyHandle*> handles;
28 std::atomic<uint64_t> file_number;
29
MemTableListTest()30 MemTableListTest() : db(nullptr), file_number(1) {
31 dbname = test::PerThreadDBPath("memtable_list_test");
32 options.create_if_missing = true;
33 DestroyDB(dbname, options);
34 }
35
36 // Create a test db if not yet created
CreateDB()37 void CreateDB() {
38 if (db == nullptr) {
39 options.create_if_missing = true;
40 DestroyDB(dbname, options);
41 // Open DB only with default column family
42 ColumnFamilyOptions cf_options;
43 std::vector<ColumnFamilyDescriptor> cf_descs;
44 cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
45 Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
46 EXPECT_OK(s);
47
48 ColumnFamilyOptions cf_opt1, cf_opt2;
49 cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
50 std::numeric_limits<uint64_t>::max());
51 cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
52 std::numeric_limits<uint64_t>::max());
53 int sz = static_cast<int>(handles.size());
54 handles.resize(sz + 2);
55 s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
56 EXPECT_OK(s);
57 s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
58 EXPECT_OK(s);
59
60 cf_descs.emplace_back("one", cf_options);
61 cf_descs.emplace_back("two", cf_options);
62 }
63 }
64
~MemTableListTest()65 ~MemTableListTest() override {
66 if (db) {
67 std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
68 for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
69 handles[i]->GetDescriptor(&cf_descs[i]);
70 }
71 for (auto h : handles) {
72 if (h) {
73 db->DestroyColumnFamilyHandle(h);
74 }
75 }
76 handles.clear();
77 delete db;
78 db = nullptr;
79 DestroyDB(dbname, options, cf_descs);
80 }
81 }
82
83 // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
84 // structures needed to call this function.
Mock_InstallMemtableFlushResults(MemTableList * list,const MutableCFOptions & mutable_cf_options,const autovector<MemTable * > & m,autovector<MemTable * > * to_delete)85 Status Mock_InstallMemtableFlushResults(
86 MemTableList* list, const MutableCFOptions& mutable_cf_options,
87 const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) {
88 // Create a mock Logger
89 test::NullLogger logger;
90 LogBuffer log_buffer(DEBUG_LEVEL, &logger);
91
92 CreateDB();
93 // Create a mock VersionSet
94 DBOptions db_options;
95 ImmutableDBOptions immutable_db_options(db_options);
96 EnvOptions env_options;
97 std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
98 WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
99 WriteController write_controller(10000000u);
100
101 VersionSet versions(dbname, &immutable_db_options, env_options,
102 table_cache.get(), &write_buffer_manager,
103 &write_controller, /*block_cache_tracer=*/nullptr);
104 std::vector<ColumnFamilyDescriptor> cf_descs;
105 cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
106 cf_descs.emplace_back("one", ColumnFamilyOptions());
107 cf_descs.emplace_back("two", ColumnFamilyOptions());
108
109 EXPECT_OK(versions.Recover(cf_descs, false));
110
111 // Create mock default ColumnFamilyData
112 auto column_family_set = versions.GetColumnFamilySet();
113 LogsWithPrepTracker dummy_prep_tracker;
114 auto cfd = column_family_set->GetDefault();
115 EXPECT_TRUE(nullptr != cfd);
116 uint64_t file_num = file_number.fetch_add(1);
117 IOStatus io_s;
118 // Create dummy mutex.
119 InstrumentedMutex mutex;
120 InstrumentedMutexLock l(&mutex);
121 std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
122 Status s = list->TryInstallMemtableFlushResults(
123 cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
124 file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info, &io_s);
125 return s;
126 }
127
128 // Calls MemTableList::InstallMemtableFlushResults() and sets up all
129 // structures needed to call this function.
Mock_InstallMemtableAtomicFlushResults(autovector<MemTableList * > & lists,const autovector<uint32_t> & cf_ids,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<const autovector<MemTable * > * > & mems_list,autovector<MemTable * > * to_delete)130 Status Mock_InstallMemtableAtomicFlushResults(
131 autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
132 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
133 const autovector<const autovector<MemTable*>*>& mems_list,
134 autovector<MemTable*>* to_delete) {
135 // Create a mock Logger
136 test::NullLogger logger;
137 LogBuffer log_buffer(DEBUG_LEVEL, &logger);
138
139 CreateDB();
140 // Create a mock VersionSet
141 DBOptions db_options;
142
143 ImmutableDBOptions immutable_db_options(db_options);
144 EnvOptions env_options;
145 std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
146 WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
147 WriteController write_controller(10000000u);
148
149 VersionSet versions(dbname, &immutable_db_options, env_options,
150 table_cache.get(), &write_buffer_manager,
151 &write_controller, /*block_cache_tracer=*/nullptr);
152 std::vector<ColumnFamilyDescriptor> cf_descs;
153 cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
154 cf_descs.emplace_back("one", ColumnFamilyOptions());
155 cf_descs.emplace_back("two", ColumnFamilyOptions());
156 EXPECT_OK(versions.Recover(cf_descs, false));
157
158 // Create mock default ColumnFamilyData
159
160 auto column_family_set = versions.GetColumnFamilySet();
161
162 LogsWithPrepTracker dummy_prep_tracker;
163 autovector<ColumnFamilyData*> cfds;
164 for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
165 cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
166 EXPECT_NE(nullptr, cfds[i]);
167 }
168 std::vector<FileMetaData> file_metas;
169 file_metas.reserve(cf_ids.size());
170 for (size_t i = 0; i != cf_ids.size(); ++i) {
171 FileMetaData meta;
172 uint64_t file_num = file_number.fetch_add(1);
173 meta.fd = FileDescriptor(file_num, 0, 0);
174 file_metas.emplace_back(meta);
175 }
176 autovector<FileMetaData*> file_meta_ptrs;
177 for (auto& meta : file_metas) {
178 file_meta_ptrs.push_back(&meta);
179 }
180 InstrumentedMutex mutex;
181 InstrumentedMutexLock l(&mutex);
182 return InstallMemtableAtomicFlushResults(
183 &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
184 file_meta_ptrs, to_delete, nullptr, &log_buffer);
185 }
186 };
187
TEST_F(MemTableListTest,Empty)188 TEST_F(MemTableListTest, Empty) {
189 // Create an empty MemTableList and validate basic functions.
190 MemTableList list(1, 0, 0);
191
192 ASSERT_EQ(0, list.NumNotFlushed());
193 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
194 ASSERT_FALSE(list.IsFlushPending());
195
196 autovector<MemTable*> mems;
197 list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
198 ASSERT_EQ(0, mems.size());
199
200 autovector<MemTable*> to_delete;
201 list.current()->Unref(&to_delete);
202 ASSERT_EQ(0, to_delete.size());
203 }
204
TEST_F(MemTableListTest,GetTest)205 TEST_F(MemTableListTest, GetTest) {
206 // Create MemTableList
207 int min_write_buffer_number_to_merge = 2;
208 int max_write_buffer_number_to_maintain = 0;
209 int64_t max_write_buffer_size_to_maintain = 0;
210 MemTableList list(min_write_buffer_number_to_merge,
211 max_write_buffer_number_to_maintain,
212 max_write_buffer_size_to_maintain);
213
214 SequenceNumber seq = 1;
215 std::string value;
216 Status s;
217 MergeContext merge_context;
218 InternalKeyComparator ikey_cmp(options.comparator);
219 SequenceNumber max_covering_tombstone_seq = 0;
220 autovector<MemTable*> to_delete;
221
222 LookupKey lkey("key1", seq);
223 bool found = list.current()->Get(
224 lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
225 &max_covering_tombstone_seq, ReadOptions());
226 ASSERT_FALSE(found);
227
228 // Create a MemTable
229 InternalKeyComparator cmp(BytewiseComparator());
230 auto factory = std::make_shared<SkipListFactory>();
231 options.memtable_factory = factory;
232 ImmutableCFOptions ioptions(options);
233
234 WriteBufferManager wb(options.db_write_buffer_size);
235 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
236 kMaxSequenceNumber, 0 /* column_family_id */);
237 mem->Ref();
238
239 // Write some keys to this memtable.
240 mem->Add(++seq, kTypeDeletion, "key1", "");
241 mem->Add(++seq, kTypeValue, "key2", "value2");
242 mem->Add(++seq, kTypeValue, "key1", "value1");
243 mem->Add(++seq, kTypeValue, "key2", "value2.2");
244
245 // Fetch the newly written keys
246 merge_context.Clear();
247 found = mem->Get(LookupKey("key1", seq), &value,
248 /*timestamp*/nullptr, &s, &merge_context,
249 &max_covering_tombstone_seq, ReadOptions());
250 ASSERT_TRUE(s.ok() && found);
251 ASSERT_EQ(value, "value1");
252
253 merge_context.Clear();
254 found = mem->Get(LookupKey("key1", 2), &value,
255 /*timestamp*/nullptr, &s, &merge_context,
256 &max_covering_tombstone_seq, ReadOptions());
257 // MemTable found out that this key is *not* found (at this sequence#)
258 ASSERT_TRUE(found && s.IsNotFound());
259
260 merge_context.Clear();
261 found = mem->Get(LookupKey("key2", seq), &value,
262 /*timestamp*/nullptr, &s, &merge_context,
263 &max_covering_tombstone_seq, ReadOptions());
264 ASSERT_TRUE(s.ok() && found);
265 ASSERT_EQ(value, "value2.2");
266
267 ASSERT_EQ(4, mem->num_entries());
268 ASSERT_EQ(1, mem->num_deletes());
269
270 // Add memtable to list
271 list.Add(mem, &to_delete);
272
273 SequenceNumber saved_seq = seq;
274
275 // Create another memtable and write some keys to it
276 WriteBufferManager wb2(options.db_write_buffer_size);
277 MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
278 kMaxSequenceNumber, 0 /* column_family_id */);
279 mem2->Ref();
280
281 mem2->Add(++seq, kTypeDeletion, "key1", "");
282 mem2->Add(++seq, kTypeValue, "key2", "value2.3");
283
284 // Add second memtable to list
285 list.Add(mem2, &to_delete);
286
287 // Fetch keys via MemTableList
288 merge_context.Clear();
289 found = list.current()->Get(
290 LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s,
291 &merge_context, &max_covering_tombstone_seq, ReadOptions());
292 ASSERT_TRUE(found && s.IsNotFound());
293
294 merge_context.Clear();
295 found = list.current()->Get(
296 LookupKey("key1", saved_seq), &value, /*timestamp*/nullptr,
297 &s, &merge_context, &max_covering_tombstone_seq, ReadOptions());
298 ASSERT_TRUE(s.ok() && found);
299 ASSERT_EQ("value1", value);
300
301 merge_context.Clear();
302 found = list.current()->Get(
303 LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s,
304 &merge_context, &max_covering_tombstone_seq, ReadOptions());
305 ASSERT_TRUE(s.ok() && found);
306 ASSERT_EQ(value, "value2.3");
307
308 merge_context.Clear();
309 found = list.current()->Get(
310 LookupKey("key2", 1), &value, /*timestamp*/nullptr, &s,
311 &merge_context, &max_covering_tombstone_seq, ReadOptions());
312 ASSERT_FALSE(found);
313
314 ASSERT_EQ(2, list.NumNotFlushed());
315
316 list.current()->Unref(&to_delete);
317 for (MemTable* m : to_delete) {
318 delete m;
319 }
320 }
321
TEST_F(MemTableListTest,GetFromHistoryTest)322 TEST_F(MemTableListTest, GetFromHistoryTest) {
323 // Create MemTableList
324 int min_write_buffer_number_to_merge = 2;
325 int max_write_buffer_number_to_maintain = 2;
326 int64_t max_write_buffer_size_to_maintain = 2000;
327 MemTableList list(min_write_buffer_number_to_merge,
328 max_write_buffer_number_to_maintain,
329 max_write_buffer_size_to_maintain);
330
331 SequenceNumber seq = 1;
332 std::string value;
333 Status s;
334 MergeContext merge_context;
335 InternalKeyComparator ikey_cmp(options.comparator);
336 SequenceNumber max_covering_tombstone_seq = 0;
337 autovector<MemTable*> to_delete;
338
339 LookupKey lkey("key1", seq);
340 bool found = list.current()->Get(
341 lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
342 &max_covering_tombstone_seq, ReadOptions());
343 ASSERT_FALSE(found);
344
345 // Create a MemTable
346 InternalKeyComparator cmp(BytewiseComparator());
347 auto factory = std::make_shared<SkipListFactory>();
348 options.memtable_factory = factory;
349 ImmutableCFOptions ioptions(options);
350
351 WriteBufferManager wb(options.db_write_buffer_size);
352 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
353 kMaxSequenceNumber, 0 /* column_family_id */);
354 mem->Ref();
355
356 // Write some keys to this memtable.
357 mem->Add(++seq, kTypeDeletion, "key1", "");
358 mem->Add(++seq, kTypeValue, "key2", "value2");
359 mem->Add(++seq, kTypeValue, "key2", "value2.2");
360
361 // Fetch the newly written keys
362 merge_context.Clear();
363 found = mem->Get(LookupKey("key1", seq), &value,
364 /*timestamp*/nullptr, &s, &merge_context,
365 &max_covering_tombstone_seq, ReadOptions());
366 // MemTable found out that this key is *not* found (at this sequence#)
367 ASSERT_TRUE(found && s.IsNotFound());
368
369 merge_context.Clear();
370 found = mem->Get(LookupKey("key2", seq), &value,
371 /*timestamp*/nullptr, &s, &merge_context,
372 &max_covering_tombstone_seq, ReadOptions());
373 ASSERT_TRUE(s.ok() && found);
374 ASSERT_EQ(value, "value2.2");
375
376 // Add memtable to list
377 list.Add(mem, &to_delete);
378 ASSERT_EQ(0, to_delete.size());
379
380 // Fetch keys via MemTableList
381 merge_context.Clear();
382 found = list.current()->Get(LookupKey("key1", seq), &value,
383 /*timestamp*/nullptr, &s, &merge_context,
384 &max_covering_tombstone_seq, ReadOptions());
385 ASSERT_TRUE(found && s.IsNotFound());
386
387 merge_context.Clear();
388 found = list.current()->Get(LookupKey("key2", seq), &value,
389 /*timestamp*/nullptr, &s, &merge_context,
390 &max_covering_tombstone_seq, ReadOptions());
391 ASSERT_TRUE(s.ok() && found);
392 ASSERT_EQ("value2.2", value);
393
394 // Flush this memtable from the list.
395 // (It will then be a part of the memtable history).
396 autovector<MemTable*> to_flush;
397 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
398 ASSERT_EQ(1, to_flush.size());
399
400 MutableCFOptions mutable_cf_options(options);
401 s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
402 &to_delete);
403 ASSERT_OK(s);
404 ASSERT_EQ(0, list.NumNotFlushed());
405 ASSERT_EQ(1, list.NumFlushed());
406 ASSERT_EQ(0, to_delete.size());
407
408 // Verify keys are no longer in MemTableList
409 merge_context.Clear();
410 found = list.current()->Get(LookupKey("key1", seq), &value,
411 /*timestamp*/nullptr, &s, &merge_context,
412 &max_covering_tombstone_seq, ReadOptions());
413 ASSERT_FALSE(found);
414
415 merge_context.Clear();
416 found = list.current()->Get(LookupKey("key2", seq), &value,
417 /*timestamp*/nullptr, &s, &merge_context,
418 &max_covering_tombstone_seq, ReadOptions());
419 ASSERT_FALSE(found);
420
421 // Verify keys are present in history
422 merge_context.Clear();
423 found = list.current()->GetFromHistory(
424 LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
425 &max_covering_tombstone_seq, ReadOptions());
426 ASSERT_TRUE(found && s.IsNotFound());
427
428 merge_context.Clear();
429 found = list.current()->GetFromHistory(
430 LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
431 &max_covering_tombstone_seq, ReadOptions());
432 ASSERT_TRUE(found);
433 ASSERT_EQ("value2.2", value);
434
435 // Create another memtable and write some keys to it
436 WriteBufferManager wb2(options.db_write_buffer_size);
437 MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
438 kMaxSequenceNumber, 0 /* column_family_id */);
439 mem2->Ref();
440
441 mem2->Add(++seq, kTypeDeletion, "key1", "");
442 mem2->Add(++seq, kTypeValue, "key3", "value3");
443
444 // Add second memtable to list
445 list.Add(mem2, &to_delete);
446 ASSERT_EQ(0, to_delete.size());
447
448 to_flush.clear();
449 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
450 ASSERT_EQ(1, to_flush.size());
451
452 // Flush second memtable
453 s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
454 &to_delete);
455 ASSERT_OK(s);
456 ASSERT_EQ(0, list.NumNotFlushed());
457 ASSERT_EQ(2, list.NumFlushed());
458 ASSERT_EQ(0, to_delete.size());
459
460 // Add a third memtable to push the first memtable out of the history
461 WriteBufferManager wb3(options.db_write_buffer_size);
462 MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
463 kMaxSequenceNumber, 0 /* column_family_id */);
464 mem3->Ref();
465 list.Add(mem3, &to_delete);
466 ASSERT_EQ(1, list.NumNotFlushed());
467 ASSERT_EQ(1, list.NumFlushed());
468 ASSERT_EQ(1, to_delete.size());
469
470 // Verify keys are no longer in MemTableList
471 merge_context.Clear();
472 found = list.current()->Get(LookupKey("key1", seq), &value,
473 /*timestamp*/nullptr, &s, &merge_context,
474 &max_covering_tombstone_seq, ReadOptions());
475 ASSERT_FALSE(found);
476
477 merge_context.Clear();
478 found = list.current()->Get(LookupKey("key2", seq), &value,
479 /*timestamp*/nullptr, &s, &merge_context,
480 &max_covering_tombstone_seq, ReadOptions());
481 ASSERT_FALSE(found);
482
483 merge_context.Clear();
484 found = list.current()->Get(LookupKey("key3", seq), &value,
485 /*timestamp*/nullptr, &s, &merge_context,
486 &max_covering_tombstone_seq, ReadOptions());
487 ASSERT_FALSE(found);
488
489 // Verify that the second memtable's keys are in the history
490 merge_context.Clear();
491 found = list.current()->GetFromHistory(
492 LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
493 &max_covering_tombstone_seq, ReadOptions());
494 ASSERT_TRUE(found && s.IsNotFound());
495
496 merge_context.Clear();
497 found = list.current()->GetFromHistory(
498 LookupKey("key3", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
499 &max_covering_tombstone_seq, ReadOptions());
500 ASSERT_TRUE(found);
501 ASSERT_EQ("value3", value);
502
503 // Verify that key2 from the first memtable is no longer in the history
504 merge_context.Clear();
505 found = list.current()->Get(LookupKey("key2", seq), &value,
506 /*timestamp*/nullptr, &s, &merge_context,
507 &max_covering_tombstone_seq, ReadOptions());
508 ASSERT_FALSE(found);
509
510 // Cleanup
511 list.current()->Unref(&to_delete);
512 ASSERT_EQ(3, to_delete.size());
513 for (MemTable* m : to_delete) {
514 delete m;
515 }
516 }
517
TEST_F(MemTableListTest,FlushPendingTest)518 TEST_F(MemTableListTest, FlushPendingTest) {
519 const int num_tables = 6;
520 SequenceNumber seq = 1;
521 Status s;
522
523 auto factory = std::make_shared<SkipListFactory>();
524 options.memtable_factory = factory;
525 ImmutableCFOptions ioptions(options);
526 InternalKeyComparator cmp(BytewiseComparator());
527 WriteBufferManager wb(options.db_write_buffer_size);
528 autovector<MemTable*> to_delete;
529
530 // Create MemTableList
531 int min_write_buffer_number_to_merge = 3;
532 int max_write_buffer_number_to_maintain = 7;
533 int64_t max_write_buffer_size_to_maintain =
534 7 * static_cast<int>(options.write_buffer_size);
535 MemTableList list(min_write_buffer_number_to_merge,
536 max_write_buffer_number_to_maintain,
537 max_write_buffer_size_to_maintain);
538
539 // Create some MemTables
540 uint64_t memtable_id = 0;
541 std::vector<MemTable*> tables;
542 MutableCFOptions mutable_cf_options(options);
543 for (int i = 0; i < num_tables; i++) {
544 MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
545 kMaxSequenceNumber, 0 /* column_family_id */);
546 mem->SetID(memtable_id++);
547 mem->Ref();
548
549 std::string value;
550 MergeContext merge_context;
551
552 mem->Add(++seq, kTypeValue, "key1", ToString(i));
553 mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
554 mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
555 mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
556 mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
557
558 tables.push_back(mem);
559 }
560
561 // Nothing to flush
562 ASSERT_FALSE(list.IsFlushPending());
563 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
564 autovector<MemTable*> to_flush;
565 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
566 ASSERT_EQ(0, to_flush.size());
567
568 // Request a flush even though there is nothing to flush
569 list.FlushRequested();
570 ASSERT_FALSE(list.IsFlushPending());
571 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
572
573 // Attempt to 'flush' to clear request for flush
574 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
575 ASSERT_EQ(0, to_flush.size());
576 ASSERT_FALSE(list.IsFlushPending());
577 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
578
579 // Request a flush again
580 list.FlushRequested();
581 // No flush pending since the list is empty.
582 ASSERT_FALSE(list.IsFlushPending());
583 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
584
585 // Add 2 tables
586 list.Add(tables[0], &to_delete);
587 list.Add(tables[1], &to_delete);
588 ASSERT_EQ(2, list.NumNotFlushed());
589 ASSERT_EQ(0, to_delete.size());
590
591 // Even though we have less than the minimum to flush, a flush is
592 // pending since we had previously requested a flush and never called
593 // PickMemtablesToFlush() to clear the flush.
594 ASSERT_TRUE(list.IsFlushPending());
595 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
596
597 // Pick tables to flush
598 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
599 ASSERT_EQ(2, to_flush.size());
600 ASSERT_EQ(2, list.NumNotFlushed());
601 ASSERT_FALSE(list.IsFlushPending());
602 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
603
604 // Revert flush
605 list.RollbackMemtableFlush(to_flush, 0);
606 ASSERT_FALSE(list.IsFlushPending());
607 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
608 to_flush.clear();
609
610 // Add another table
611 list.Add(tables[2], &to_delete);
612 // We now have the minimum to flush regardles of whether FlushRequested()
613 // was called.
614 ASSERT_TRUE(list.IsFlushPending());
615 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
616 ASSERT_EQ(0, to_delete.size());
617
618 // Pick tables to flush
619 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
620 ASSERT_EQ(3, to_flush.size());
621 ASSERT_EQ(3, list.NumNotFlushed());
622 ASSERT_FALSE(list.IsFlushPending());
623 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
624
625 // Pick tables to flush again
626 autovector<MemTable*> to_flush2;
627 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
628 ASSERT_EQ(0, to_flush2.size());
629 ASSERT_EQ(3, list.NumNotFlushed());
630 ASSERT_FALSE(list.IsFlushPending());
631 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
632
633 // Add another table
634 list.Add(tables[3], &to_delete);
635 ASSERT_FALSE(list.IsFlushPending());
636 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
637 ASSERT_EQ(0, to_delete.size());
638
639 // Request a flush again
640 list.FlushRequested();
641 ASSERT_TRUE(list.IsFlushPending());
642 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
643
644 // Pick tables to flush again
645 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
646 ASSERT_EQ(1, to_flush2.size());
647 ASSERT_EQ(4, list.NumNotFlushed());
648 ASSERT_FALSE(list.IsFlushPending());
649 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
650
651 // Rollback first pick of tables
652 list.RollbackMemtableFlush(to_flush, 0);
653 ASSERT_TRUE(list.IsFlushPending());
654 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
655 to_flush.clear();
656
657 // Add another tables
658 list.Add(tables[4], &to_delete);
659 ASSERT_EQ(5, list.NumNotFlushed());
660 // We now have the minimum to flush regardles of whether FlushRequested()
661 ASSERT_TRUE(list.IsFlushPending());
662 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
663 ASSERT_EQ(0, to_delete.size());
664
665 // Pick tables to flush
666 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
667 // Should pick 4 of 5 since 1 table has been picked in to_flush2
668 ASSERT_EQ(4, to_flush.size());
669 ASSERT_EQ(5, list.NumNotFlushed());
670 ASSERT_FALSE(list.IsFlushPending());
671 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
672
673 // Pick tables to flush again
674 autovector<MemTable*> to_flush3;
675 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
676 ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed
677 ASSERT_EQ(5, list.NumNotFlushed());
678 ASSERT_FALSE(list.IsFlushPending());
679 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
680
681 // Flush the 4 memtables that were picked in to_flush
682 s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
683 &to_delete);
684 ASSERT_OK(s);
685
686 // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
687 // tables[3].
688 // Current implementation will only commit memtables in the order they were
689 // created. So TryInstallMemtableFlushResults will install the first 3 tables
690 // in to_flush and stop when it encounters a table not yet flushed.
691 ASSERT_EQ(2, list.NumNotFlushed());
692 int num_in_history =
693 std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
694 static_cast<int>(options.write_buffer_size));
695 ASSERT_EQ(num_in_history, list.NumFlushed());
696 ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
697
698 // Request a flush again. Should be nothing to flush
699 list.FlushRequested();
700 ASSERT_FALSE(list.IsFlushPending());
701 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
702
703 // Flush the 1 memtable that was picked in to_flush2
704 s = MemTableListTest::Mock_InstallMemtableFlushResults(
705 &list, mutable_cf_options, to_flush2, &to_delete);
706 ASSERT_OK(s);
707
708 // This will actually install 2 tables. The 1 we told it to flush, and also
709 // tables[4] which has been waiting for tables[3] to commit.
710 ASSERT_EQ(0, list.NumNotFlushed());
711 num_in_history =
712 std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
713 static_cast<int>(options.write_buffer_size));
714 ASSERT_EQ(num_in_history, list.NumFlushed());
715 ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
716
717 for (const auto& m : to_delete) {
718 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
719 // Verify this, by Ref'ing then UnRef'ing:
720 m->Ref();
721 ASSERT_EQ(m, m->Unref());
722 delete m;
723 }
724 to_delete.clear();
725
726 // Add another table
727 list.Add(tables[5], &to_delete);
728 ASSERT_EQ(1, list.NumNotFlushed());
729 ASSERT_EQ(5, list.GetLatestMemTableID());
730 memtable_id = 4;
731 // Pick tables to flush. The tables to pick must have ID smaller than or
732 // equal to 4. Therefore, no table will be selected in this case.
733 autovector<MemTable*> to_flush4;
734 list.FlushRequested();
735 ASSERT_TRUE(list.HasFlushRequested());
736 list.PickMemtablesToFlush(&memtable_id, &to_flush4);
737 ASSERT_TRUE(to_flush4.empty());
738 ASSERT_EQ(1, list.NumNotFlushed());
739 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
740 ASSERT_FALSE(list.IsFlushPending());
741 ASSERT_FALSE(list.HasFlushRequested());
742
743 // Pick tables to flush. The tables to pick must have ID smaller than or
744 // equal to 5. Therefore, only tables[5] will be selected.
745 memtable_id = 5;
746 list.FlushRequested();
747 list.PickMemtablesToFlush(&memtable_id, &to_flush4);
748 ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
749 ASSERT_EQ(1, list.NumNotFlushed());
750 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
751 ASSERT_FALSE(list.IsFlushPending());
752 to_delete.clear();
753
754 list.current()->Unref(&to_delete);
755 int to_delete_size =
756 std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
757 static_cast<int>(options.write_buffer_size));
758 ASSERT_EQ(to_delete_size, to_delete.size());
759
760 for (const auto& m : to_delete) {
761 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
762 // Verify this, by Ref'ing then UnRef'ing:
763 m->Ref();
764 ASSERT_EQ(m, m->Unref());
765 delete m;
766 }
767 to_delete.clear();
768 }
769
TEST_F(MemTableListTest,EmptyAtomicFlusTest)770 TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
771 autovector<MemTableList*> lists;
772 autovector<uint32_t> cf_ids;
773 autovector<const MutableCFOptions*> options_list;
774 autovector<const autovector<MemTable*>*> to_flush;
775 autovector<MemTable*> to_delete;
776 Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
777 to_flush, &to_delete);
778 ASSERT_OK(s);
779 ASSERT_TRUE(to_delete.empty());
780 }
781
TEST_F(MemTableListTest,AtomicFlusTest)782 TEST_F(MemTableListTest, AtomicFlusTest) {
783 const int num_cfs = 3;
784 const int num_tables_per_cf = 2;
785 SequenceNumber seq = 1;
786
787 auto factory = std::make_shared<SkipListFactory>();
788 options.memtable_factory = factory;
789 ImmutableCFOptions ioptions(options);
790 InternalKeyComparator cmp(BytewiseComparator());
791 WriteBufferManager wb(options.db_write_buffer_size);
792
793 // Create MemTableLists
794 int min_write_buffer_number_to_merge = 3;
795 int max_write_buffer_number_to_maintain = 7;
796 int64_t max_write_buffer_size_to_maintain =
797 7 * static_cast<int64_t>(options.write_buffer_size);
798 autovector<MemTableList*> lists;
799 for (int i = 0; i != num_cfs; ++i) {
800 lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
801 max_write_buffer_number_to_maintain,
802 max_write_buffer_size_to_maintain));
803 }
804
805 autovector<uint32_t> cf_ids;
806 std::vector<std::vector<MemTable*>> tables(num_cfs);
807 autovector<const MutableCFOptions*> mutable_cf_options_list;
808 uint32_t cf_id = 0;
809 for (auto& elem : tables) {
810 mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
811 uint64_t memtable_id = 0;
812 for (int i = 0; i != num_tables_per_cf; ++i) {
813 MemTable* mem =
814 new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
815 kMaxSequenceNumber, cf_id);
816 mem->SetID(memtable_id++);
817 mem->Ref();
818
819 std::string value;
820
821 mem->Add(++seq, kTypeValue, "key1", ToString(i));
822 mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
823 mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
824 mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
825 mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
826
827 elem.push_back(mem);
828 }
829 cf_ids.push_back(cf_id++);
830 }
831
832 std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
833
834 // Nothing to flush
835 for (auto i = 0; i != num_cfs; ++i) {
836 auto* list = lists[i];
837 ASSERT_FALSE(list->IsFlushPending());
838 ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
839 list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
840 ASSERT_EQ(0, flush_candidates[i].size());
841 }
842 // Request flush even though there is nothing to flush
843 for (auto i = 0; i != num_cfs; ++i) {
844 auto* list = lists[i];
845 list->FlushRequested();
846 ASSERT_FALSE(list->IsFlushPending());
847 ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
848 }
849 autovector<MemTable*> to_delete;
850 // Add tables to the immutable memtalbe lists associated with column families
851 for (auto i = 0; i != num_cfs; ++i) {
852 for (auto j = 0; j != num_tables_per_cf; ++j) {
853 lists[i]->Add(tables[i][j], &to_delete);
854 }
855 ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
856 ASSERT_TRUE(lists[i]->IsFlushPending());
857 ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
858 }
859 std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
860 // +----+
861 // list[0]: |0 1|
862 // list[1]: |0 1|
863 // | +--+
864 // list[2]: |0| 1
865 // +-+
866 // Pick memtables to flush
867 for (auto i = 0; i != num_cfs; ++i) {
868 flush_candidates[i].clear();
869 lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
870 &flush_candidates[i]);
871 ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
872 static_cast<uint64_t>(flush_candidates[i].size()));
873 }
874 autovector<MemTableList*> tmp_lists;
875 autovector<uint32_t> tmp_cf_ids;
876 autovector<const MutableCFOptions*> tmp_options_list;
877 autovector<const autovector<MemTable*>*> to_flush;
878 for (auto i = 0; i != num_cfs; ++i) {
879 if (!flush_candidates[i].empty()) {
880 to_flush.push_back(&flush_candidates[i]);
881 tmp_lists.push_back(lists[i]);
882 tmp_cf_ids.push_back(i);
883 tmp_options_list.push_back(mutable_cf_options_list[i]);
884 }
885 }
886 Status s = Mock_InstallMemtableAtomicFlushResults(
887 tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete);
888 ASSERT_OK(s);
889
890 for (auto i = 0; i != num_cfs; ++i) {
891 for (auto j = 0; j != num_tables_per_cf; ++j) {
892 if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
893 ASSERT_LT(0, tables[i][j]->GetFileNumber());
894 }
895 }
896 ASSERT_EQ(
897 static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
898 lists[i]->NumNotFlushed());
899 }
900
901 to_delete.clear();
902 for (auto list : lists) {
903 list->current()->Unref(&to_delete);
904 delete list;
905 }
906 for (auto& mutable_cf_options : mutable_cf_options_list) {
907 if (mutable_cf_options != nullptr) {
908 delete mutable_cf_options;
909 mutable_cf_options = nullptr;
910 }
911 }
912 // All memtables in tables array must have been flushed, thus ready to be
913 // deleted.
914 ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
915 for (const auto& m : to_delete) {
916 // Refcount should be 0 after calling InstallMemtableFlushResults.
917 // Verify this by Ref'ing and then Unref'ing.
918 m->Ref();
919 ASSERT_EQ(m, m->Unref());
920 delete m;
921 }
922 }
923
924 } // namespace ROCKSDB_NAMESPACE
925
main(int argc,char ** argv)926 int main(int argc, char** argv) {
927 ::testing::InitGoogleTest(&argc, argv);
928 return RUN_ALL_TESTS();
929 }
930