1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/memtable_list.h"
7 #include <algorithm>
8 #include <string>
9 #include <vector>
10 #include "db/merge_context.h"
11 #include "db/version_set.h"
12 #include "db/write_controller.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/status.h"
15 #include "rocksdb/write_buffer_manager.h"
16 #include "test_util/testharness.h"
17 #include "test_util/testutil.h"
18 #include "util/string_util.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 
22 class MemTableListTest : public testing::Test {
23  public:
24   std::string dbname;
25   DB* db;
26   Options options;
27   std::vector<ColumnFamilyHandle*> handles;
28   std::atomic<uint64_t> file_number;
29 
MemTableListTest()30   MemTableListTest() : db(nullptr), file_number(1) {
31     dbname = test::PerThreadDBPath("memtable_list_test");
32     options.create_if_missing = true;
33     DestroyDB(dbname, options);
34   }
35 
36   // Create a test db if not yet created
CreateDB()37   void CreateDB() {
38     if (db == nullptr) {
39       options.create_if_missing = true;
40       DestroyDB(dbname, options);
41       // Open DB only with default column family
42       ColumnFamilyOptions cf_options;
43       std::vector<ColumnFamilyDescriptor> cf_descs;
44       cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
45       Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
46       EXPECT_OK(s);
47 
48       ColumnFamilyOptions cf_opt1, cf_opt2;
49       cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
50                                     std::numeric_limits<uint64_t>::max());
51       cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
52                                     std::numeric_limits<uint64_t>::max());
53       int sz = static_cast<int>(handles.size());
54       handles.resize(sz + 2);
55       s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
56       EXPECT_OK(s);
57       s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
58       EXPECT_OK(s);
59 
60       cf_descs.emplace_back("one", cf_options);
61       cf_descs.emplace_back("two", cf_options);
62     }
63   }
64 
~MemTableListTest()65   ~MemTableListTest() override {
66     if (db) {
67       std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
68       for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
69         handles[i]->GetDescriptor(&cf_descs[i]);
70       }
71       for (auto h : handles) {
72         if (h) {
73           db->DestroyColumnFamilyHandle(h);
74         }
75       }
76       handles.clear();
77       delete db;
78       db = nullptr;
79       DestroyDB(dbname, options, cf_descs);
80     }
81   }
82 
83   // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
84   // structures needed to call this function.
Mock_InstallMemtableFlushResults(MemTableList * list,const MutableCFOptions & mutable_cf_options,const autovector<MemTable * > & m,autovector<MemTable * > * to_delete)85   Status Mock_InstallMemtableFlushResults(
86       MemTableList* list, const MutableCFOptions& mutable_cf_options,
87       const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) {
88     // Create a mock Logger
89     test::NullLogger logger;
90     LogBuffer log_buffer(DEBUG_LEVEL, &logger);
91 
92     CreateDB();
93     // Create a mock VersionSet
94     DBOptions db_options;
95     ImmutableDBOptions immutable_db_options(db_options);
96     EnvOptions env_options;
97     std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
98     WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
99     WriteController write_controller(10000000u);
100 
101     VersionSet versions(dbname, &immutable_db_options, env_options,
102                         table_cache.get(), &write_buffer_manager,
103                         &write_controller, /*block_cache_tracer=*/nullptr);
104     std::vector<ColumnFamilyDescriptor> cf_descs;
105     cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
106     cf_descs.emplace_back("one", ColumnFamilyOptions());
107     cf_descs.emplace_back("two", ColumnFamilyOptions());
108 
109     EXPECT_OK(versions.Recover(cf_descs, false));
110 
111     // Create mock default ColumnFamilyData
112     auto column_family_set = versions.GetColumnFamilySet();
113     LogsWithPrepTracker dummy_prep_tracker;
114     auto cfd = column_family_set->GetDefault();
115     EXPECT_TRUE(nullptr != cfd);
116     uint64_t file_num = file_number.fetch_add(1);
117     IOStatus io_s;
118     // Create dummy mutex.
119     InstrumentedMutex mutex;
120     InstrumentedMutexLock l(&mutex);
121     std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
122     Status s = list->TryInstallMemtableFlushResults(
123         cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
124         file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info, &io_s);
125     return s;
126   }
127 
128   // Calls MemTableList::InstallMemtableFlushResults() and sets up all
129   // structures needed to call this function.
Mock_InstallMemtableAtomicFlushResults(autovector<MemTableList * > & lists,const autovector<uint32_t> & cf_ids,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<const autovector<MemTable * > * > & mems_list,autovector<MemTable * > * to_delete)130   Status Mock_InstallMemtableAtomicFlushResults(
131       autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
132       const autovector<const MutableCFOptions*>& mutable_cf_options_list,
133       const autovector<const autovector<MemTable*>*>& mems_list,
134       autovector<MemTable*>* to_delete) {
135     // Create a mock Logger
136     test::NullLogger logger;
137     LogBuffer log_buffer(DEBUG_LEVEL, &logger);
138 
139     CreateDB();
140     // Create a mock VersionSet
141     DBOptions db_options;
142 
143     ImmutableDBOptions immutable_db_options(db_options);
144     EnvOptions env_options;
145     std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
146     WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
147     WriteController write_controller(10000000u);
148 
149     VersionSet versions(dbname, &immutable_db_options, env_options,
150                         table_cache.get(), &write_buffer_manager,
151                         &write_controller, /*block_cache_tracer=*/nullptr);
152     std::vector<ColumnFamilyDescriptor> cf_descs;
153     cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
154     cf_descs.emplace_back("one", ColumnFamilyOptions());
155     cf_descs.emplace_back("two", ColumnFamilyOptions());
156     EXPECT_OK(versions.Recover(cf_descs, false));
157 
158     // Create mock default ColumnFamilyData
159 
160     auto column_family_set = versions.GetColumnFamilySet();
161 
162     LogsWithPrepTracker dummy_prep_tracker;
163     autovector<ColumnFamilyData*> cfds;
164     for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
165       cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
166       EXPECT_NE(nullptr, cfds[i]);
167     }
168     std::vector<FileMetaData> file_metas;
169     file_metas.reserve(cf_ids.size());
170     for (size_t i = 0; i != cf_ids.size(); ++i) {
171       FileMetaData meta;
172       uint64_t file_num = file_number.fetch_add(1);
173       meta.fd = FileDescriptor(file_num, 0, 0);
174       file_metas.emplace_back(meta);
175     }
176     autovector<FileMetaData*> file_meta_ptrs;
177     for (auto& meta : file_metas) {
178       file_meta_ptrs.push_back(&meta);
179     }
180     InstrumentedMutex mutex;
181     InstrumentedMutexLock l(&mutex);
182     return InstallMemtableAtomicFlushResults(
183         &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
184         file_meta_ptrs, to_delete, nullptr, &log_buffer);
185   }
186 };
187 
TEST_F(MemTableListTest,Empty)188 TEST_F(MemTableListTest, Empty) {
189   // Create an empty MemTableList and validate basic functions.
190   MemTableList list(1, 0, 0);
191 
192   ASSERT_EQ(0, list.NumNotFlushed());
193   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
194   ASSERT_FALSE(list.IsFlushPending());
195 
196   autovector<MemTable*> mems;
197   list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
198   ASSERT_EQ(0, mems.size());
199 
200   autovector<MemTable*> to_delete;
201   list.current()->Unref(&to_delete);
202   ASSERT_EQ(0, to_delete.size());
203 }
204 
TEST_F(MemTableListTest,GetTest)205 TEST_F(MemTableListTest, GetTest) {
206   // Create MemTableList
207   int min_write_buffer_number_to_merge = 2;
208   int max_write_buffer_number_to_maintain = 0;
209   int64_t max_write_buffer_size_to_maintain = 0;
210   MemTableList list(min_write_buffer_number_to_merge,
211                     max_write_buffer_number_to_maintain,
212                     max_write_buffer_size_to_maintain);
213 
214   SequenceNumber seq = 1;
215   std::string value;
216   Status s;
217   MergeContext merge_context;
218   InternalKeyComparator ikey_cmp(options.comparator);
219   SequenceNumber max_covering_tombstone_seq = 0;
220   autovector<MemTable*> to_delete;
221 
222   LookupKey lkey("key1", seq);
223   bool found = list.current()->Get(
224       lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
225       &max_covering_tombstone_seq, ReadOptions());
226   ASSERT_FALSE(found);
227 
228   // Create a MemTable
229   InternalKeyComparator cmp(BytewiseComparator());
230   auto factory = std::make_shared<SkipListFactory>();
231   options.memtable_factory = factory;
232   ImmutableCFOptions ioptions(options);
233 
234   WriteBufferManager wb(options.db_write_buffer_size);
235   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
236                                kMaxSequenceNumber, 0 /* column_family_id */);
237   mem->Ref();
238 
239   // Write some keys to this memtable.
240   mem->Add(++seq, kTypeDeletion, "key1", "");
241   mem->Add(++seq, kTypeValue, "key2", "value2");
242   mem->Add(++seq, kTypeValue, "key1", "value1");
243   mem->Add(++seq, kTypeValue, "key2", "value2.2");
244 
245   // Fetch the newly written keys
246   merge_context.Clear();
247   found = mem->Get(LookupKey("key1", seq), &value,
248                    /*timestamp*/nullptr, &s, &merge_context,
249                    &max_covering_tombstone_seq, ReadOptions());
250   ASSERT_TRUE(s.ok() && found);
251   ASSERT_EQ(value, "value1");
252 
253   merge_context.Clear();
254   found = mem->Get(LookupKey("key1", 2), &value,
255                    /*timestamp*/nullptr, &s, &merge_context,
256                    &max_covering_tombstone_seq, ReadOptions());
257   // MemTable found out that this key is *not* found (at this sequence#)
258   ASSERT_TRUE(found && s.IsNotFound());
259 
260   merge_context.Clear();
261   found = mem->Get(LookupKey("key2", seq), &value,
262                    /*timestamp*/nullptr, &s, &merge_context,
263                    &max_covering_tombstone_seq, ReadOptions());
264   ASSERT_TRUE(s.ok() && found);
265   ASSERT_EQ(value, "value2.2");
266 
267   ASSERT_EQ(4, mem->num_entries());
268   ASSERT_EQ(1, mem->num_deletes());
269 
270   // Add memtable to list
271   list.Add(mem, &to_delete);
272 
273   SequenceNumber saved_seq = seq;
274 
275   // Create another memtable and write some keys to it
276   WriteBufferManager wb2(options.db_write_buffer_size);
277   MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
278                                 kMaxSequenceNumber, 0 /* column_family_id */);
279   mem2->Ref();
280 
281   mem2->Add(++seq, kTypeDeletion, "key1", "");
282   mem2->Add(++seq, kTypeValue, "key2", "value2.3");
283 
284   // Add second memtable to list
285   list.Add(mem2, &to_delete);
286 
287   // Fetch keys via MemTableList
288   merge_context.Clear();
289   found = list.current()->Get(
290       LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s,
291       &merge_context, &max_covering_tombstone_seq, ReadOptions());
292   ASSERT_TRUE(found && s.IsNotFound());
293 
294   merge_context.Clear();
295   found = list.current()->Get(
296       LookupKey("key1", saved_seq), &value, /*timestamp*/nullptr,
297       &s, &merge_context, &max_covering_tombstone_seq, ReadOptions());
298   ASSERT_TRUE(s.ok() && found);
299   ASSERT_EQ("value1", value);
300 
301   merge_context.Clear();
302   found = list.current()->Get(
303       LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s,
304       &merge_context, &max_covering_tombstone_seq, ReadOptions());
305   ASSERT_TRUE(s.ok() && found);
306   ASSERT_EQ(value, "value2.3");
307 
308   merge_context.Clear();
309   found = list.current()->Get(
310       LookupKey("key2", 1), &value, /*timestamp*/nullptr, &s,
311       &merge_context, &max_covering_tombstone_seq, ReadOptions());
312   ASSERT_FALSE(found);
313 
314   ASSERT_EQ(2, list.NumNotFlushed());
315 
316   list.current()->Unref(&to_delete);
317   for (MemTable* m : to_delete) {
318     delete m;
319   }
320 }
321 
TEST_F(MemTableListTest,GetFromHistoryTest)322 TEST_F(MemTableListTest, GetFromHistoryTest) {
323   // Create MemTableList
324   int min_write_buffer_number_to_merge = 2;
325   int max_write_buffer_number_to_maintain = 2;
326   int64_t max_write_buffer_size_to_maintain = 2000;
327   MemTableList list(min_write_buffer_number_to_merge,
328                     max_write_buffer_number_to_maintain,
329                     max_write_buffer_size_to_maintain);
330 
331   SequenceNumber seq = 1;
332   std::string value;
333   Status s;
334   MergeContext merge_context;
335   InternalKeyComparator ikey_cmp(options.comparator);
336   SequenceNumber max_covering_tombstone_seq = 0;
337   autovector<MemTable*> to_delete;
338 
339   LookupKey lkey("key1", seq);
340   bool found = list.current()->Get(
341       lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
342       &max_covering_tombstone_seq, ReadOptions());
343   ASSERT_FALSE(found);
344 
345   // Create a MemTable
346   InternalKeyComparator cmp(BytewiseComparator());
347   auto factory = std::make_shared<SkipListFactory>();
348   options.memtable_factory = factory;
349   ImmutableCFOptions ioptions(options);
350 
351   WriteBufferManager wb(options.db_write_buffer_size);
352   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
353                                kMaxSequenceNumber, 0 /* column_family_id */);
354   mem->Ref();
355 
356   // Write some keys to this memtable.
357   mem->Add(++seq, kTypeDeletion, "key1", "");
358   mem->Add(++seq, kTypeValue, "key2", "value2");
359   mem->Add(++seq, kTypeValue, "key2", "value2.2");
360 
361   // Fetch the newly written keys
362   merge_context.Clear();
363   found = mem->Get(LookupKey("key1", seq), &value,
364                    /*timestamp*/nullptr, &s, &merge_context,
365                    &max_covering_tombstone_seq, ReadOptions());
366   // MemTable found out that this key is *not* found (at this sequence#)
367   ASSERT_TRUE(found && s.IsNotFound());
368 
369   merge_context.Clear();
370   found = mem->Get(LookupKey("key2", seq), &value,
371                    /*timestamp*/nullptr, &s, &merge_context,
372                    &max_covering_tombstone_seq, ReadOptions());
373   ASSERT_TRUE(s.ok() && found);
374   ASSERT_EQ(value, "value2.2");
375 
376   // Add memtable to list
377   list.Add(mem, &to_delete);
378   ASSERT_EQ(0, to_delete.size());
379 
380   // Fetch keys via MemTableList
381   merge_context.Clear();
382   found = list.current()->Get(LookupKey("key1", seq), &value,
383                               /*timestamp*/nullptr, &s, &merge_context,
384                               &max_covering_tombstone_seq, ReadOptions());
385   ASSERT_TRUE(found && s.IsNotFound());
386 
387   merge_context.Clear();
388   found = list.current()->Get(LookupKey("key2", seq), &value,
389                               /*timestamp*/nullptr, &s, &merge_context,
390                               &max_covering_tombstone_seq, ReadOptions());
391   ASSERT_TRUE(s.ok() && found);
392   ASSERT_EQ("value2.2", value);
393 
394   // Flush this memtable from the list.
395   // (It will then be a part of the memtable history).
396   autovector<MemTable*> to_flush;
397   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
398   ASSERT_EQ(1, to_flush.size());
399 
400   MutableCFOptions mutable_cf_options(options);
401   s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
402                                        &to_delete);
403   ASSERT_OK(s);
404   ASSERT_EQ(0, list.NumNotFlushed());
405   ASSERT_EQ(1, list.NumFlushed());
406   ASSERT_EQ(0, to_delete.size());
407 
408   // Verify keys are no longer in MemTableList
409   merge_context.Clear();
410   found = list.current()->Get(LookupKey("key1", seq), &value,
411                               /*timestamp*/nullptr, &s, &merge_context,
412                               &max_covering_tombstone_seq, ReadOptions());
413   ASSERT_FALSE(found);
414 
415   merge_context.Clear();
416   found = list.current()->Get(LookupKey("key2", seq), &value,
417                               /*timestamp*/nullptr, &s, &merge_context,
418                               &max_covering_tombstone_seq, ReadOptions());
419   ASSERT_FALSE(found);
420 
421   // Verify keys are present in history
422   merge_context.Clear();
423   found = list.current()->GetFromHistory(
424       LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
425       &max_covering_tombstone_seq, ReadOptions());
426   ASSERT_TRUE(found && s.IsNotFound());
427 
428   merge_context.Clear();
429   found = list.current()->GetFromHistory(
430       LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
431       &max_covering_tombstone_seq, ReadOptions());
432   ASSERT_TRUE(found);
433   ASSERT_EQ("value2.2", value);
434 
435   // Create another memtable and write some keys to it
436   WriteBufferManager wb2(options.db_write_buffer_size);
437   MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
438                                 kMaxSequenceNumber, 0 /* column_family_id */);
439   mem2->Ref();
440 
441   mem2->Add(++seq, kTypeDeletion, "key1", "");
442   mem2->Add(++seq, kTypeValue, "key3", "value3");
443 
444   // Add second memtable to list
445   list.Add(mem2, &to_delete);
446   ASSERT_EQ(0, to_delete.size());
447 
448   to_flush.clear();
449   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
450   ASSERT_EQ(1, to_flush.size());
451 
452   // Flush second memtable
453   s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
454                                        &to_delete);
455   ASSERT_OK(s);
456   ASSERT_EQ(0, list.NumNotFlushed());
457   ASSERT_EQ(2, list.NumFlushed());
458   ASSERT_EQ(0, to_delete.size());
459 
460   // Add a third memtable to push the first memtable out of the history
461   WriteBufferManager wb3(options.db_write_buffer_size);
462   MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
463                                 kMaxSequenceNumber, 0 /* column_family_id */);
464   mem3->Ref();
465   list.Add(mem3, &to_delete);
466   ASSERT_EQ(1, list.NumNotFlushed());
467   ASSERT_EQ(1, list.NumFlushed());
468   ASSERT_EQ(1, to_delete.size());
469 
470   // Verify keys are no longer in MemTableList
471   merge_context.Clear();
472   found = list.current()->Get(LookupKey("key1", seq), &value,
473                               /*timestamp*/nullptr, &s, &merge_context,
474                               &max_covering_tombstone_seq, ReadOptions());
475   ASSERT_FALSE(found);
476 
477   merge_context.Clear();
478   found = list.current()->Get(LookupKey("key2", seq), &value,
479                               /*timestamp*/nullptr, &s, &merge_context,
480                               &max_covering_tombstone_seq, ReadOptions());
481   ASSERT_FALSE(found);
482 
483   merge_context.Clear();
484   found = list.current()->Get(LookupKey("key3", seq), &value,
485                               /*timestamp*/nullptr, &s, &merge_context,
486                               &max_covering_tombstone_seq, ReadOptions());
487   ASSERT_FALSE(found);
488 
489   // Verify that the second memtable's keys are in the history
490   merge_context.Clear();
491   found = list.current()->GetFromHistory(
492       LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
493       &max_covering_tombstone_seq, ReadOptions());
494   ASSERT_TRUE(found && s.IsNotFound());
495 
496   merge_context.Clear();
497   found = list.current()->GetFromHistory(
498       LookupKey("key3", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
499       &max_covering_tombstone_seq, ReadOptions());
500   ASSERT_TRUE(found);
501   ASSERT_EQ("value3", value);
502 
503   // Verify that key2 from the first memtable is no longer in the history
504   merge_context.Clear();
505   found = list.current()->Get(LookupKey("key2", seq), &value,
506                               /*timestamp*/nullptr, &s, &merge_context,
507                               &max_covering_tombstone_seq, ReadOptions());
508   ASSERT_FALSE(found);
509 
510   // Cleanup
511   list.current()->Unref(&to_delete);
512   ASSERT_EQ(3, to_delete.size());
513   for (MemTable* m : to_delete) {
514     delete m;
515   }
516 }
517 
TEST_F(MemTableListTest,FlushPendingTest)518 TEST_F(MemTableListTest, FlushPendingTest) {
519   const int num_tables = 6;
520   SequenceNumber seq = 1;
521   Status s;
522 
523   auto factory = std::make_shared<SkipListFactory>();
524   options.memtable_factory = factory;
525   ImmutableCFOptions ioptions(options);
526   InternalKeyComparator cmp(BytewiseComparator());
527   WriteBufferManager wb(options.db_write_buffer_size);
528   autovector<MemTable*> to_delete;
529 
530   // Create MemTableList
531   int min_write_buffer_number_to_merge = 3;
532   int max_write_buffer_number_to_maintain = 7;
533   int64_t max_write_buffer_size_to_maintain =
534       7 * static_cast<int>(options.write_buffer_size);
535   MemTableList list(min_write_buffer_number_to_merge,
536                     max_write_buffer_number_to_maintain,
537                     max_write_buffer_size_to_maintain);
538 
539   // Create some MemTables
540   uint64_t memtable_id = 0;
541   std::vector<MemTable*> tables;
542   MutableCFOptions mutable_cf_options(options);
543   for (int i = 0; i < num_tables; i++) {
544     MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
545                                  kMaxSequenceNumber, 0 /* column_family_id */);
546     mem->SetID(memtable_id++);
547     mem->Ref();
548 
549     std::string value;
550     MergeContext merge_context;
551 
552     mem->Add(++seq, kTypeValue, "key1", ToString(i));
553     mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
554     mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
555     mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
556     mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
557 
558     tables.push_back(mem);
559   }
560 
561   // Nothing to flush
562   ASSERT_FALSE(list.IsFlushPending());
563   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
564   autovector<MemTable*> to_flush;
565   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
566   ASSERT_EQ(0, to_flush.size());
567 
568   // Request a flush even though there is nothing to flush
569   list.FlushRequested();
570   ASSERT_FALSE(list.IsFlushPending());
571   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
572 
573   // Attempt to 'flush' to clear request for flush
574   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
575   ASSERT_EQ(0, to_flush.size());
576   ASSERT_FALSE(list.IsFlushPending());
577   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
578 
579   // Request a flush again
580   list.FlushRequested();
581   // No flush pending since the list is empty.
582   ASSERT_FALSE(list.IsFlushPending());
583   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
584 
585   // Add 2 tables
586   list.Add(tables[0], &to_delete);
587   list.Add(tables[1], &to_delete);
588   ASSERT_EQ(2, list.NumNotFlushed());
589   ASSERT_EQ(0, to_delete.size());
590 
591   // Even though we have less than the minimum to flush, a flush is
592   // pending since we had previously requested a flush and never called
593   // PickMemtablesToFlush() to clear the flush.
594   ASSERT_TRUE(list.IsFlushPending());
595   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
596 
597   // Pick tables to flush
598   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
599   ASSERT_EQ(2, to_flush.size());
600   ASSERT_EQ(2, list.NumNotFlushed());
601   ASSERT_FALSE(list.IsFlushPending());
602   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
603 
604   // Revert flush
605   list.RollbackMemtableFlush(to_flush, 0);
606   ASSERT_FALSE(list.IsFlushPending());
607   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
608   to_flush.clear();
609 
610   // Add another table
611   list.Add(tables[2], &to_delete);
612   // We now have the minimum to flush regardles of whether FlushRequested()
613   // was called.
614   ASSERT_TRUE(list.IsFlushPending());
615   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
616   ASSERT_EQ(0, to_delete.size());
617 
618   // Pick tables to flush
619   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
620   ASSERT_EQ(3, to_flush.size());
621   ASSERT_EQ(3, list.NumNotFlushed());
622   ASSERT_FALSE(list.IsFlushPending());
623   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
624 
625   // Pick tables to flush again
626   autovector<MemTable*> to_flush2;
627   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
628   ASSERT_EQ(0, to_flush2.size());
629   ASSERT_EQ(3, list.NumNotFlushed());
630   ASSERT_FALSE(list.IsFlushPending());
631   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
632 
633   // Add another table
634   list.Add(tables[3], &to_delete);
635   ASSERT_FALSE(list.IsFlushPending());
636   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
637   ASSERT_EQ(0, to_delete.size());
638 
639   // Request a flush again
640   list.FlushRequested();
641   ASSERT_TRUE(list.IsFlushPending());
642   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
643 
644   // Pick tables to flush again
645   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
646   ASSERT_EQ(1, to_flush2.size());
647   ASSERT_EQ(4, list.NumNotFlushed());
648   ASSERT_FALSE(list.IsFlushPending());
649   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
650 
651   // Rollback first pick of tables
652   list.RollbackMemtableFlush(to_flush, 0);
653   ASSERT_TRUE(list.IsFlushPending());
654   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
655   to_flush.clear();
656 
657   // Add another tables
658   list.Add(tables[4], &to_delete);
659   ASSERT_EQ(5, list.NumNotFlushed());
660   // We now have the minimum to flush regardles of whether FlushRequested()
661   ASSERT_TRUE(list.IsFlushPending());
662   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
663   ASSERT_EQ(0, to_delete.size());
664 
665   // Pick tables to flush
666   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
667   // Should pick 4 of 5 since 1 table has been picked in to_flush2
668   ASSERT_EQ(4, to_flush.size());
669   ASSERT_EQ(5, list.NumNotFlushed());
670   ASSERT_FALSE(list.IsFlushPending());
671   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
672 
673   // Pick tables to flush again
674   autovector<MemTable*> to_flush3;
675   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
676   ASSERT_EQ(0, to_flush3.size());  // nothing not in progress of being flushed
677   ASSERT_EQ(5, list.NumNotFlushed());
678   ASSERT_FALSE(list.IsFlushPending());
679   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
680 
681   // Flush the 4 memtables that were picked in to_flush
682   s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
683                                        &to_delete);
684   ASSERT_OK(s);
685 
686   // Note:  now to_flush contains tables[0,1,2,4].  to_flush2 contains
687   // tables[3].
688   // Current implementation will only commit memtables in the order they were
689   // created. So TryInstallMemtableFlushResults will install the first 3 tables
690   // in to_flush and stop when it encounters a table not yet flushed.
691   ASSERT_EQ(2, list.NumNotFlushed());
692   int num_in_history =
693       std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
694                       static_cast<int>(options.write_buffer_size));
695   ASSERT_EQ(num_in_history, list.NumFlushed());
696   ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
697 
698   // Request a flush again. Should be nothing to flush
699   list.FlushRequested();
700   ASSERT_FALSE(list.IsFlushPending());
701   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
702 
703   // Flush the 1 memtable that was picked in to_flush2
704   s = MemTableListTest::Mock_InstallMemtableFlushResults(
705       &list, mutable_cf_options, to_flush2, &to_delete);
706   ASSERT_OK(s);
707 
708   // This will actually install 2 tables.  The 1 we told it to flush, and also
709   // tables[4] which has been waiting for tables[3] to commit.
710   ASSERT_EQ(0, list.NumNotFlushed());
711   num_in_history =
712       std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
713                       static_cast<int>(options.write_buffer_size));
714   ASSERT_EQ(num_in_history, list.NumFlushed());
715   ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
716 
717   for (const auto& m : to_delete) {
718     // Refcount should be 0 after calling TryInstallMemtableFlushResults.
719     // Verify this, by Ref'ing then UnRef'ing:
720     m->Ref();
721     ASSERT_EQ(m, m->Unref());
722     delete m;
723   }
724   to_delete.clear();
725 
726   // Add another table
727   list.Add(tables[5], &to_delete);
728   ASSERT_EQ(1, list.NumNotFlushed());
729   ASSERT_EQ(5, list.GetLatestMemTableID());
730   memtable_id = 4;
731   // Pick tables to flush. The tables to pick must have ID smaller than or
732   // equal to 4. Therefore, no table will be selected in this case.
733   autovector<MemTable*> to_flush4;
734   list.FlushRequested();
735   ASSERT_TRUE(list.HasFlushRequested());
736   list.PickMemtablesToFlush(&memtable_id, &to_flush4);
737   ASSERT_TRUE(to_flush4.empty());
738   ASSERT_EQ(1, list.NumNotFlushed());
739   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
740   ASSERT_FALSE(list.IsFlushPending());
741   ASSERT_FALSE(list.HasFlushRequested());
742 
743   // Pick tables to flush. The tables to pick must have ID smaller than or
744   // equal to 5. Therefore, only tables[5] will be selected.
745   memtable_id = 5;
746   list.FlushRequested();
747   list.PickMemtablesToFlush(&memtable_id, &to_flush4);
748   ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
749   ASSERT_EQ(1, list.NumNotFlushed());
750   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
751   ASSERT_FALSE(list.IsFlushPending());
752   to_delete.clear();
753 
754   list.current()->Unref(&to_delete);
755   int to_delete_size =
756       std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
757                                static_cast<int>(options.write_buffer_size));
758   ASSERT_EQ(to_delete_size, to_delete.size());
759 
760   for (const auto& m : to_delete) {
761     // Refcount should be 0 after calling TryInstallMemtableFlushResults.
762     // Verify this, by Ref'ing then UnRef'ing:
763     m->Ref();
764     ASSERT_EQ(m, m->Unref());
765     delete m;
766   }
767   to_delete.clear();
768 }
769 
TEST_F(MemTableListTest,EmptyAtomicFlusTest)770 TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
771   autovector<MemTableList*> lists;
772   autovector<uint32_t> cf_ids;
773   autovector<const MutableCFOptions*> options_list;
774   autovector<const autovector<MemTable*>*> to_flush;
775   autovector<MemTable*> to_delete;
776   Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
777                                                     to_flush, &to_delete);
778   ASSERT_OK(s);
779   ASSERT_TRUE(to_delete.empty());
780 }
781 
TEST_F(MemTableListTest,AtomicFlusTest)782 TEST_F(MemTableListTest, AtomicFlusTest) {
783   const int num_cfs = 3;
784   const int num_tables_per_cf = 2;
785   SequenceNumber seq = 1;
786 
787   auto factory = std::make_shared<SkipListFactory>();
788   options.memtable_factory = factory;
789   ImmutableCFOptions ioptions(options);
790   InternalKeyComparator cmp(BytewiseComparator());
791   WriteBufferManager wb(options.db_write_buffer_size);
792 
793   // Create MemTableLists
794   int min_write_buffer_number_to_merge = 3;
795   int max_write_buffer_number_to_maintain = 7;
796   int64_t max_write_buffer_size_to_maintain =
797       7 * static_cast<int64_t>(options.write_buffer_size);
798   autovector<MemTableList*> lists;
799   for (int i = 0; i != num_cfs; ++i) {
800     lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
801                                         max_write_buffer_number_to_maintain,
802                                         max_write_buffer_size_to_maintain));
803   }
804 
805   autovector<uint32_t> cf_ids;
806   std::vector<std::vector<MemTable*>> tables(num_cfs);
807   autovector<const MutableCFOptions*> mutable_cf_options_list;
808   uint32_t cf_id = 0;
809   for (auto& elem : tables) {
810     mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
811     uint64_t memtable_id = 0;
812     for (int i = 0; i != num_tables_per_cf; ++i) {
813       MemTable* mem =
814           new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
815                        kMaxSequenceNumber, cf_id);
816       mem->SetID(memtable_id++);
817       mem->Ref();
818 
819       std::string value;
820 
821       mem->Add(++seq, kTypeValue, "key1", ToString(i));
822       mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
823       mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
824       mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
825       mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
826 
827       elem.push_back(mem);
828     }
829     cf_ids.push_back(cf_id++);
830   }
831 
832   std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
833 
834   // Nothing to flush
835   for (auto i = 0; i != num_cfs; ++i) {
836     auto* list = lists[i];
837     ASSERT_FALSE(list->IsFlushPending());
838     ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
839     list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
840     ASSERT_EQ(0, flush_candidates[i].size());
841   }
842   // Request flush even though there is nothing to flush
843   for (auto i = 0; i != num_cfs; ++i) {
844     auto* list = lists[i];
845     list->FlushRequested();
846     ASSERT_FALSE(list->IsFlushPending());
847     ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
848   }
849   autovector<MemTable*> to_delete;
850   // Add tables to the immutable memtalbe lists associated with column families
851   for (auto i = 0; i != num_cfs; ++i) {
852     for (auto j = 0; j != num_tables_per_cf; ++j) {
853       lists[i]->Add(tables[i][j], &to_delete);
854     }
855     ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
856     ASSERT_TRUE(lists[i]->IsFlushPending());
857     ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
858   }
859   std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
860   //          +----+
861   // list[0]: |0  1|
862   // list[1]: |0  1|
863   //          | +--+
864   // list[2]: |0| 1
865   //          +-+
866   // Pick memtables to flush
867   for (auto i = 0; i != num_cfs; ++i) {
868     flush_candidates[i].clear();
869     lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
870                                    &flush_candidates[i]);
871     ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
872               static_cast<uint64_t>(flush_candidates[i].size()));
873   }
874   autovector<MemTableList*> tmp_lists;
875   autovector<uint32_t> tmp_cf_ids;
876   autovector<const MutableCFOptions*> tmp_options_list;
877   autovector<const autovector<MemTable*>*> to_flush;
878   for (auto i = 0; i != num_cfs; ++i) {
879     if (!flush_candidates[i].empty()) {
880       to_flush.push_back(&flush_candidates[i]);
881       tmp_lists.push_back(lists[i]);
882       tmp_cf_ids.push_back(i);
883       tmp_options_list.push_back(mutable_cf_options_list[i]);
884     }
885   }
886   Status s = Mock_InstallMemtableAtomicFlushResults(
887       tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete);
888   ASSERT_OK(s);
889 
890   for (auto i = 0; i != num_cfs; ++i) {
891     for (auto j = 0; j != num_tables_per_cf; ++j) {
892       if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
893         ASSERT_LT(0, tables[i][j]->GetFileNumber());
894       }
895     }
896     ASSERT_EQ(
897         static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
898         lists[i]->NumNotFlushed());
899   }
900 
901   to_delete.clear();
902   for (auto list : lists) {
903     list->current()->Unref(&to_delete);
904     delete list;
905   }
906   for (auto& mutable_cf_options : mutable_cf_options_list) {
907     if (mutable_cf_options != nullptr) {
908       delete mutable_cf_options;
909       mutable_cf_options = nullptr;
910     }
911   }
912   // All memtables in tables array must have been flushed, thus ready to be
913   // deleted.
914   ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
915   for (const auto& m : to_delete) {
916     // Refcount should be 0 after calling InstallMemtableFlushResults.
917     // Verify this by Ref'ing and then Unref'ing.
918     m->Ref();
919     ASSERT_EQ(m, m->Unref());
920     delete m;
921   }
922 }
923 
924 }  // namespace ROCKSDB_NAMESPACE
925 
main(int argc,char ** argv)926 int main(int argc, char** argv) {
927   ::testing::InitGoogleTest(&argc, argv);
928   return RUN_ALL_TESTS();
929 }
930