1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include <atomic>
11 
12 #include "db/db_impl/db_impl.h"
13 #include "db/db_test_util.h"
14 #include "port/port.h"
15 #include "port/stack_trace.h"
16 #include "test_util/fault_injection_test_env.h"
17 #include "test_util/sync_point.h"
18 #include "util/cast_util.h"
19 #include "util/mutexlock.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 class DBFlushTest : public DBTestBase {
24  public:
DBFlushTest()25   DBFlushTest() : DBTestBase("/db_flush_test") {}
26 };
27 
28 class DBFlushDirectIOTest : public DBFlushTest,
29                             public ::testing::WithParamInterface<bool> {
30  public:
DBFlushDirectIOTest()31   DBFlushDirectIOTest() : DBFlushTest() {}
32 };
33 
34 class DBAtomicFlushTest : public DBFlushTest,
35                           public ::testing::WithParamInterface<bool> {
36  public:
DBAtomicFlushTest()37   DBAtomicFlushTest() : DBFlushTest() {}
38 };
39 
40 // We had issue when two background threads trying to flush at the same time,
41 // only one of them get committed. The test verifies the issue is fixed.
TEST_F(DBFlushTest,FlushWhileWritingManifest)42 TEST_F(DBFlushTest, FlushWhileWritingManifest) {
43   Options options;
44   options.disable_auto_compactions = true;
45   options.max_background_flushes = 2;
46   options.env = env_;
47   Reopen(options);
48   FlushOptions no_wait;
49   no_wait.wait = false;
50   no_wait.allow_write_stall=true;
51 
52   SyncPoint::GetInstance()->LoadDependency(
53       {{"VersionSet::LogAndApply:WriteManifest",
54         "DBFlushTest::FlushWhileWritingManifest:1"},
55        {"MemTableList::TryInstallMemtableFlushResults:InProgress",
56         "VersionSet::LogAndApply:WriteManifestDone"}});
57   SyncPoint::GetInstance()->EnableProcessing();
58 
59   ASSERT_OK(Put("foo", "v"));
60   ASSERT_OK(dbfull()->Flush(no_wait));
61   TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
62   ASSERT_OK(Put("bar", "v"));
63   ASSERT_OK(dbfull()->Flush(no_wait));
64   // If the issue is hit we will wait here forever.
65   dbfull()->TEST_WaitForFlushMemTable();
66 #ifndef ROCKSDB_LITE
67   ASSERT_EQ(2, TotalTableFiles());
68 #endif  // ROCKSDB_LITE
69 }
70 
71 // Disable this test temporarily on Travis as it fails intermittently.
72 // Github issue: #4151
TEST_F(DBFlushTest,SyncFail)73 TEST_F(DBFlushTest, SyncFail) {
74   std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
75       new FaultInjectionTestEnv(env_));
76   Options options;
77   options.disable_auto_compactions = true;
78   options.env = fault_injection_env.get();
79 
80   SyncPoint::GetInstance()->LoadDependency(
81       {{"DBFlushTest::SyncFail:GetVersionRefCount:1",
82         "DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"},
83        {"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
84         "DBFlushTest::SyncFail:GetVersionRefCount:2"},
85        {"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
86        {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
87   SyncPoint::GetInstance()->EnableProcessing();
88 
89   CreateAndReopenWithCF({"pikachu"}, options);
90   Put("key", "value");
91   auto* cfd =
92       reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
93           ->cfd();
94   FlushOptions flush_options;
95   flush_options.wait = false;
96   ASSERT_OK(dbfull()->Flush(flush_options));
97   // Flush installs a new super-version. Get the ref count after that.
98   auto current_before = cfd->current();
99   int refs_before = cfd->current()->TEST_refs();
100   TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1");
101   TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2");
102   int refs_after_picking_memtables = cfd->current()->TEST_refs();
103   ASSERT_EQ(refs_before + 1, refs_after_picking_memtables);
104   fault_injection_env->SetFilesystemActive(false);
105   TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
106   TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
107   fault_injection_env->SetFilesystemActive(true);
108   // Now the background job will do the flush; wait for it.
109   dbfull()->TEST_WaitForFlushMemTable();
110 #ifndef ROCKSDB_LITE
111   ASSERT_EQ("", FilesPerLevel());  // flush failed.
112 #endif                             // ROCKSDB_LITE
113   // Backgroun flush job should release ref count to current version.
114   ASSERT_EQ(current_before, cfd->current());
115   ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
116   Destroy(options);
117 }
118 
TEST_F(DBFlushTest,SyncSkip)119 TEST_F(DBFlushTest, SyncSkip) {
120   Options options = CurrentOptions();
121 
122   SyncPoint::GetInstance()->LoadDependency(
123       {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
124        {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
125   SyncPoint::GetInstance()->EnableProcessing();
126 
127   Reopen(options);
128   Put("key", "value");
129 
130   FlushOptions flush_options;
131   flush_options.wait = false;
132   ASSERT_OK(dbfull()->Flush(flush_options));
133 
134   TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
135   TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
136 
137   // Now the background job will do the flush; wait for it.
138   dbfull()->TEST_WaitForFlushMemTable();
139 
140   Destroy(options);
141 }
142 
TEST_F(DBFlushTest,FlushInLowPriThreadPool)143 TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
144   // Verify setting an empty high-pri (flush) thread pool causes flushes to be
145   // scheduled in the low-pri (compaction) thread pool.
146   Options options = CurrentOptions();
147   options.level0_file_num_compaction_trigger = 4;
148   options.memtable_factory.reset(new SpecialSkipListFactory(1));
149   Reopen(options);
150   env_->SetBackgroundThreads(0, Env::HIGH);
151 
152   std::thread::id tid;
153   int num_flushes = 0, num_compactions = 0;
154   SyncPoint::GetInstance()->SetCallBack(
155       "DBImpl::BGWorkFlush", [&](void* /*arg*/) {
156         if (tid == std::thread::id()) {
157           tid = std::this_thread::get_id();
158         } else {
159           ASSERT_EQ(tid, std::this_thread::get_id());
160         }
161         ++num_flushes;
162       });
163   SyncPoint::GetInstance()->SetCallBack(
164       "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {
165         ASSERT_EQ(tid, std::this_thread::get_id());
166         ++num_compactions;
167       });
168   SyncPoint::GetInstance()->EnableProcessing();
169 
170   ASSERT_OK(Put("key", "val"));
171   for (int i = 0; i < 4; ++i) {
172     ASSERT_OK(Put("key", "val"));
173     dbfull()->TEST_WaitForFlushMemTable();
174   }
175   dbfull()->TEST_WaitForCompact();
176   ASSERT_EQ(4, num_flushes);
177   ASSERT_EQ(1, num_compactions);
178 }
179 
TEST_F(DBFlushTest,ManualFlushWithMinWriteBufferNumberToMerge)180 TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
181   Options options = CurrentOptions();
182   options.write_buffer_size = 100;
183   options.max_write_buffer_number = 4;
184   options.min_write_buffer_number_to_merge = 3;
185   Reopen(options);
186 
187   SyncPoint::GetInstance()->LoadDependency(
188       {{"DBImpl::BGWorkFlush",
189         "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
190        {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
191         "FlushJob::WriteLevel0Table"}});
192   SyncPoint::GetInstance()->EnableProcessing();
193 
194   ASSERT_OK(Put("key1", "value1"));
195 
196   port::Thread t([&]() {
197     // The call wait for flush to finish, i.e. with flush_options.wait = true.
198     ASSERT_OK(Flush());
199   });
200 
201   // Wait for flush start.
202   TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
203   // Insert a second memtable before the manual flush finish.
204   // At the end of the manual flush job, it will check if further flush
205   // is needed, but it will not trigger flush of the second memtable because
206   // min_write_buffer_number_to_merge is not reached.
207   ASSERT_OK(Put("key2", "value2"));
208   ASSERT_OK(dbfull()->TEST_SwitchMemtable());
209   TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
210 
211   // Manual flush should return, without waiting for flush indefinitely.
212   t.join();
213 }
214 
TEST_F(DBFlushTest,ScheduleOnlyOneBgThread)215 TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {
216   Options options = CurrentOptions();
217   Reopen(options);
218   SyncPoint::GetInstance()->DisableProcessing();
219   SyncPoint::GetInstance()->ClearAllCallBacks();
220   int called = 0;
221   SyncPoint::GetInstance()->SetCallBack(
222       "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) {
223         ASSERT_NE(nullptr, arg);
224         auto unscheduled_flushes = *reinterpret_cast<int*>(arg);
225         ASSERT_EQ(0, unscheduled_flushes);
226         ++called;
227       });
228   SyncPoint::GetInstance()->EnableProcessing();
229 
230   ASSERT_OK(Put("a", "foo"));
231   FlushOptions flush_opts;
232   ASSERT_OK(dbfull()->Flush(flush_opts));
233   ASSERT_EQ(1, called);
234 
235   SyncPoint::GetInstance()->DisableProcessing();
236   SyncPoint::GetInstance()->ClearAllCallBacks();
237 }
238 
TEST_P(DBFlushDirectIOTest,DirectIO)239 TEST_P(DBFlushDirectIOTest, DirectIO) {
240   Options options;
241   options.create_if_missing = true;
242   options.disable_auto_compactions = true;
243   options.max_background_flushes = 2;
244   options.use_direct_io_for_flush_and_compaction = GetParam();
245   options.env = new MockEnv(Env::Default());
246   SyncPoint::GetInstance()->SetCallBack(
247       "BuildTable:create_file", [&](void* arg) {
248         bool* use_direct_writes = static_cast<bool*>(arg);
249         ASSERT_EQ(*use_direct_writes,
250                   options.use_direct_io_for_flush_and_compaction);
251       });
252 
253   SyncPoint::GetInstance()->EnableProcessing();
254   Reopen(options);
255   ASSERT_OK(Put("foo", "v"));
256   FlushOptions flush_options;
257   flush_options.wait = true;
258   ASSERT_OK(dbfull()->Flush(flush_options));
259   Destroy(options);
260   delete options.env;
261 }
262 
TEST_F(DBFlushTest,FlushError)263 TEST_F(DBFlushTest, FlushError) {
264   Options options;
265   std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
266       new FaultInjectionTestEnv(env_));
267   options.write_buffer_size = 100;
268   options.max_write_buffer_number = 4;
269   options.min_write_buffer_number_to_merge = 3;
270   options.disable_auto_compactions = true;
271   options.env = fault_injection_env.get();
272   Reopen(options);
273 
274   ASSERT_OK(Put("key1", "value1"));
275   ASSERT_OK(Put("key2", "value2"));
276   fault_injection_env->SetFilesystemActive(false);
277   Status s = dbfull()->TEST_SwitchMemtable();
278   fault_injection_env->SetFilesystemActive(true);
279   Destroy(options);
280   ASSERT_NE(s, Status::OK());
281 }
282 
TEST_F(DBFlushTest,ManualFlushFailsInReadOnlyMode)283 TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
284   // Regression test for bug where manual flush hangs forever when the DB
285   // is in read-only mode. Verify it now at least returns, despite failing.
286   Options options;
287   std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
288       new FaultInjectionTestEnv(env_));
289   options.env = fault_injection_env.get();
290   options.max_write_buffer_number = 2;
291   Reopen(options);
292 
293   // Trigger a first flush but don't let it run
294   ASSERT_OK(db_->PauseBackgroundWork());
295   ASSERT_OK(Put("key1", "value1"));
296   FlushOptions flush_opts;
297   flush_opts.wait = false;
298   ASSERT_OK(db_->Flush(flush_opts));
299 
300   // Write a key to the second memtable so we have something to flush later
301   // after the DB is in read-only mode.
302   ASSERT_OK(Put("key2", "value2"));
303 
304   // Let the first flush continue, hit an error, and put the DB in read-only
305   // mode.
306   fault_injection_env->SetFilesystemActive(false);
307   ASSERT_OK(db_->ContinueBackgroundWork());
308   dbfull()->TEST_WaitForFlushMemTable();
309 #ifndef ROCKSDB_LITE
310   uint64_t num_bg_errors;
311   ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kBackgroundErrors,
312                                   &num_bg_errors));
313   ASSERT_GT(num_bg_errors, 0);
314 #endif  // ROCKSDB_LITE
315 
316   // In the bug scenario, triggering another flush would cause the second flush
317   // to hang forever. After the fix we expect it to return an error.
318   ASSERT_NOK(db_->Flush(FlushOptions()));
319 
320   Close();
321 }
322 
TEST_F(DBFlushTest,CFDropRaceWithWaitForFlushMemTables)323 TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
324   Options options = CurrentOptions();
325   options.create_if_missing = true;
326   CreateAndReopenWithCF({"pikachu"}, options);
327   SyncPoint::GetInstance()->DisableProcessing();
328   SyncPoint::GetInstance()->LoadDependency(
329       {{"DBImpl::FlushMemTable:AfterScheduleFlush",
330         "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
331        {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
332         "DBImpl::BackgroundCallFlush:start"},
333        {"DBImpl::BackgroundCallFlush:start",
334         "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
335   SyncPoint::GetInstance()->EnableProcessing();
336   ASSERT_EQ(2, handles_.size());
337   ASSERT_OK(Put(1, "key", "value"));
338   auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
339   port::Thread drop_cf_thr([&]() {
340     TEST_SYNC_POINT(
341         "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
342     ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
343     ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
344     handles_.resize(1);
345     TEST_SYNC_POINT(
346         "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
347   });
348   FlushOptions flush_opts;
349   flush_opts.allow_write_stall = true;
350   ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));
351   drop_cf_thr.join();
352   Close();
353   SyncPoint::GetInstance()->DisableProcessing();
354 }
355 
356 #ifndef ROCKSDB_LITE
TEST_F(DBFlushTest,FireOnFlushCompletedAfterCommittedResult)357 TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
358   class TestListener : public EventListener {
359    public:
360     void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
361       // There's only one key in each flush.
362       ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
363       ASSERT_NE(0, info.smallest_seqno);
364       if (info.smallest_seqno == seq1) {
365         // First flush completed
366         ASSERT_FALSE(completed1);
367         completed1 = true;
368         CheckFlushResultCommitted(db, seq1);
369       } else {
370         // Second flush completed
371         ASSERT_FALSE(completed2);
372         completed2 = true;
373         ASSERT_EQ(info.smallest_seqno, seq2);
374         CheckFlushResultCommitted(db, seq2);
375       }
376     }
377 
378     void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
379       DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
380       InstrumentedMutex* mutex = db_impl->mutex();
381       mutex->Lock();
382       auto* cfd =
383           reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())
384               ->cfd();
385       ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
386       mutex->Unlock();
387     }
388 
389     std::atomic<SequenceNumber> seq1{0};
390     std::atomic<SequenceNumber> seq2{0};
391     std::atomic<bool> completed1{false};
392     std::atomic<bool> completed2{false};
393   };
394   std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
395 
396   SyncPoint::GetInstance()->LoadDependency(
397       {{"DBImpl::BackgroundCallFlush:start",
398         "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
399        {"DBImpl::FlushMemTableToOutputFile:Finish",
400         "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
401   SyncPoint::GetInstance()->SetCallBack(
402       "FlushJob::WriteLevel0Table", [&listener](void* arg) {
403         // Wait for the second flush finished, out of mutex.
404         auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
405         if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
406           TEST_SYNC_POINT(
407               "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
408               "WaitSecond");
409         }
410       });
411 
412   Options options = CurrentOptions();
413   options.create_if_missing = true;
414   options.listeners.push_back(listener);
415   // Setting max_flush_jobs = max_background_jobs / 4 = 2.
416   options.max_background_jobs = 8;
417   // Allow 2 immutable memtables.
418   options.max_write_buffer_number = 3;
419   Reopen(options);
420   SyncPoint::GetInstance()->EnableProcessing();
421   ASSERT_OK(Put("foo", "v"));
422   listener->seq1 = db_->GetLatestSequenceNumber();
423   // t1 will wait for the second flush complete before committing flush result.
424   auto t1 = port::Thread([&]() {
425     // flush_opts.wait = true
426     ASSERT_OK(db_->Flush(FlushOptions()));
427   });
428   // Wait for first flush started.
429   TEST_SYNC_POINT(
430       "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
431   // The second flush will exit early without commit its result. The work
432   // is delegated to the first flush.
433   ASSERT_OK(Put("bar", "v"));
434   listener->seq2 = db_->GetLatestSequenceNumber();
435   FlushOptions flush_opts;
436   flush_opts.wait = false;
437   ASSERT_OK(db_->Flush(flush_opts));
438   t1.join();
439   ASSERT_TRUE(listener->completed1);
440   ASSERT_TRUE(listener->completed2);
441   SyncPoint::GetInstance()->DisableProcessing();
442   SyncPoint::GetInstance()->ClearAllCallBacks();
443 }
444 #endif  // !ROCKSDB_LITE
445 
TEST_P(DBAtomicFlushTest,ManualAtomicFlush)446 TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
447   Options options = CurrentOptions();
448   options.create_if_missing = true;
449   options.atomic_flush = GetParam();
450   options.write_buffer_size = (static_cast<size_t>(64) << 20);
451 
452   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
453   size_t num_cfs = handles_.size();
454   ASSERT_EQ(3, num_cfs);
455   WriteOptions wopts;
456   wopts.disableWAL = true;
457   for (size_t i = 0; i != num_cfs; ++i) {
458     ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
459   }
460   std::vector<int> cf_ids;
461   for (size_t i = 0; i != num_cfs; ++i) {
462     cf_ids.emplace_back(static_cast<int>(i));
463   }
464   ASSERT_OK(Flush(cf_ids));
465   for (size_t i = 0; i != num_cfs; ++i) {
466     auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
467     ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
468     ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
469   }
470 }
471 
TEST_P(DBAtomicFlushTest,AtomicFlushTriggeredByMemTableFull)472 TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
473   Options options = CurrentOptions();
474   options.create_if_missing = true;
475   options.atomic_flush = GetParam();
476   // 4KB so that we can easily trigger auto flush.
477   options.write_buffer_size = 4096;
478 
479   SyncPoint::GetInstance()->LoadDependency(
480       {{"DBImpl::BackgroundCallFlush:FlushFinish:0",
481         "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
482   SyncPoint::GetInstance()->EnableProcessing();
483 
484   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
485   size_t num_cfs = handles_.size();
486   ASSERT_EQ(3, num_cfs);
487   WriteOptions wopts;
488   wopts.disableWAL = true;
489   for (size_t i = 0; i != num_cfs; ++i) {
490     ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
491   }
492   // Keep writing to one of them column families to trigger auto flush.
493   for (int i = 0; i != 4000; ++i) {
494     ASSERT_OK(Put(static_cast<int>(num_cfs) - 1 /*cf*/,
495                   "key" + std::to_string(i), "value" + std::to_string(i),
496                   wopts));
497   }
498 
499   TEST_SYNC_POINT(
500       "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
501   if (options.atomic_flush) {
502     for (size_t i = 0; i != num_cfs - 1; ++i) {
503       auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
504       ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
505       ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
506     }
507   } else {
508     for (size_t i = 0; i != num_cfs - 1; ++i) {
509       auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
510       ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
511       ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
512     }
513   }
514   SyncPoint::GetInstance()->DisableProcessing();
515 }
516 
TEST_P(DBAtomicFlushTest,AtomicFlushRollbackSomeJobs)517 TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
518   bool atomic_flush = GetParam();
519   if (!atomic_flush) {
520     return;
521   }
522   std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
523       new FaultInjectionTestEnv(env_));
524   Options options = CurrentOptions();
525   options.create_if_missing = true;
526   options.atomic_flush = atomic_flush;
527   options.env = fault_injection_env.get();
528   SyncPoint::GetInstance()->DisableProcessing();
529   SyncPoint::GetInstance()->LoadDependency(
530       {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
531         "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
532        {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
533         "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
534   SyncPoint::GetInstance()->EnableProcessing();
535 
536   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
537   size_t num_cfs = handles_.size();
538   ASSERT_EQ(3, num_cfs);
539   WriteOptions wopts;
540   wopts.disableWAL = true;
541   for (size_t i = 0; i != num_cfs; ++i) {
542     int cf_id = static_cast<int>(i);
543     ASSERT_OK(Put(cf_id, "key", "value", wopts));
544   }
545   FlushOptions flush_opts;
546   flush_opts.wait = false;
547   ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
548   TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
549   fault_injection_env->SetFilesystemActive(false);
550   TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
551   for (auto* cfh : handles_) {
552     dbfull()->TEST_WaitForFlushMemTable(cfh);
553   }
554   for (size_t i = 0; i != num_cfs; ++i) {
555     auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
556     ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed());
557     ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
558   }
559   fault_injection_env->SetFilesystemActive(true);
560   Destroy(options);
561 }
562 
TEST_P(DBAtomicFlushTest,FlushMultipleCFs_DropSomeBeforeRequestFlush)563 TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
564   bool atomic_flush = GetParam();
565   if (!atomic_flush) {
566     return;
567   }
568   Options options = CurrentOptions();
569   options.create_if_missing = true;
570   options.atomic_flush = atomic_flush;
571   SyncPoint::GetInstance()->DisableProcessing();
572   SyncPoint::GetInstance()->ClearAllCallBacks();
573   SyncPoint::GetInstance()->EnableProcessing();
574 
575   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
576   size_t num_cfs = handles_.size();
577   ASSERT_EQ(3, num_cfs);
578   WriteOptions wopts;
579   wopts.disableWAL = true;
580   std::vector<int> cf_ids;
581   for (size_t i = 0; i != num_cfs; ++i) {
582     int cf_id = static_cast<int>(i);
583     ASSERT_OK(Put(cf_id, "key", "value", wopts));
584     cf_ids.push_back(cf_id);
585   }
586   ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
587   ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());
588   Destroy(options);
589 }
590 
TEST_P(DBAtomicFlushTest,FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun)591 TEST_P(DBAtomicFlushTest,
592        FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {
593   bool atomic_flush = GetParam();
594   if (!atomic_flush) {
595     return;
596   }
597   Options options = CurrentOptions();
598   options.create_if_missing = true;
599   options.atomic_flush = atomic_flush;
600 
601   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
602 
603   SyncPoint::GetInstance()->DisableProcessing();
604   SyncPoint::GetInstance()->ClearAllCallBacks();
605   SyncPoint::GetInstance()->LoadDependency(
606       {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
607         "DBAtomicFlushTest::BeforeDropCF"},
608        {"DBAtomicFlushTest::AfterDropCF",
609         "DBImpl::BackgroundCallFlush:start"}});
610   SyncPoint::GetInstance()->EnableProcessing();
611 
612   size_t num_cfs = handles_.size();
613   ASSERT_EQ(3, num_cfs);
614   WriteOptions wopts;
615   wopts.disableWAL = true;
616   for (size_t i = 0; i != num_cfs; ++i) {
617     int cf_id = static_cast<int>(i);
618     ASSERT_OK(Put(cf_id, "key", "value", wopts));
619   }
620   port::Thread user_thread([&]() {
621     TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
622     ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
623     TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
624   });
625   FlushOptions flush_opts;
626   flush_opts.wait = true;
627   ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
628   user_thread.join();
629   for (size_t i = 0; i != num_cfs; ++i) {
630     int cf_id = static_cast<int>(i);
631     ASSERT_EQ("value", Get(cf_id, "key"));
632   }
633 
634   ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);
635   num_cfs = handles_.size();
636   ASSERT_EQ(2, num_cfs);
637   for (size_t i = 0; i != num_cfs; ++i) {
638     int cf_id = static_cast<int>(i);
639     ASSERT_EQ("value", Get(cf_id, "key"));
640   }
641   Destroy(options);
642 }
643 
TEST_P(DBAtomicFlushTest,TriggerFlushAndClose)644 TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
645   bool atomic_flush = GetParam();
646   if (!atomic_flush) {
647     return;
648   }
649   const int kNumKeysTriggerFlush = 4;
650   Options options = CurrentOptions();
651   options.create_if_missing = true;
652   options.atomic_flush = atomic_flush;
653   options.memtable_factory.reset(
654       new SpecialSkipListFactory(kNumKeysTriggerFlush));
655   CreateAndReopenWithCF({"pikachu"}, options);
656 
657   for (int i = 0; i != kNumKeysTriggerFlush; ++i) {
658     ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
659   }
660   SyncPoint::GetInstance()->DisableProcessing();
661   SyncPoint::GetInstance()->ClearAllCallBacks();
662   SyncPoint::GetInstance()->EnableProcessing();
663   ASSERT_OK(Put(0, "key", "value"));
664   Close();
665 
666   ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
667   ASSERT_EQ("value", Get(0, "key"));
668 }
669 
TEST_P(DBAtomicFlushTest,PickMemtablesRaceWithBackgroundFlush)670 TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
671   bool atomic_flush = GetParam();
672   Options options = CurrentOptions();
673   options.create_if_missing = true;
674   options.atomic_flush = atomic_flush;
675   options.max_write_buffer_number = 4;
676   // Set min_write_buffer_number_to_merge to be greater than 1, so that
677   // a column family with one memtable in the imm will not cause IsFlushPending
678   // to return true when flush_requested_ is false.
679   options.min_write_buffer_number_to_merge = 2;
680   CreateAndReopenWithCF({"pikachu"}, options);
681   ASSERT_EQ(2, handles_.size());
682   ASSERT_OK(dbfull()->PauseBackgroundWork());
683   ASSERT_OK(Put(0, "key00", "value00"));
684   ASSERT_OK(Put(1, "key10", "value10"));
685   FlushOptions flush_opts;
686   flush_opts.wait = false;
687   ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
688   ASSERT_OK(Put(0, "key01", "value01"));
689   // Since max_write_buffer_number is 4, the following flush won't cause write
690   // stall.
691   ASSERT_OK(dbfull()->Flush(flush_opts));
692   ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
693   ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
694   handles_[1] = nullptr;
695   ASSERT_OK(dbfull()->ContinueBackgroundWork());
696   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
697   delete handles_[0];
698   handles_.clear();
699 }
700 
TEST_P(DBAtomicFlushTest,CFDropRaceWithWaitForFlushMemTables)701 TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {
702   bool atomic_flush = GetParam();
703   if (!atomic_flush) {
704     return;
705   }
706   Options options = CurrentOptions();
707   options.create_if_missing = true;
708   options.atomic_flush = atomic_flush;
709   CreateAndReopenWithCF({"pikachu"}, options);
710   SyncPoint::GetInstance()->DisableProcessing();
711   SyncPoint::GetInstance()->LoadDependency(
712       {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
713         "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
714        {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
715         "DBImpl::BackgroundCallFlush:start"},
716        {"DBImpl::BackgroundCallFlush:start",
717         "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
718   SyncPoint::GetInstance()->EnableProcessing();
719   ASSERT_EQ(2, handles_.size());
720   ASSERT_OK(Put(0, "key", "value"));
721   ASSERT_OK(Put(1, "key", "value"));
722   auto* cfd_default =
723       static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
724           ->cfd();
725   auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
726   port::Thread drop_cf_thr([&]() {
727     TEST_SYNC_POINT(
728         "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
729     ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
730     delete handles_[1];
731     handles_.resize(1);
732     TEST_SYNC_POINT(
733         "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
734   });
735   FlushOptions flush_opts;
736   flush_opts.allow_write_stall = true;
737   ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},
738                                                 flush_opts));
739   drop_cf_thr.join();
740   Close();
741   SyncPoint::GetInstance()->DisableProcessing();
742 }
743 
TEST_P(DBAtomicFlushTest,RollbackAfterFailToInstallResults)744 TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {
745   bool atomic_flush = GetParam();
746   if (!atomic_flush) {
747     return;
748   }
749   auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
750   Options options = CurrentOptions();
751   options.env = fault_injection_env.get();
752   options.create_if_missing = true;
753   options.atomic_flush = atomic_flush;
754   CreateAndReopenWithCF({"pikachu"}, options);
755   ASSERT_EQ(2, handles_.size());
756   for (size_t cf = 0; cf < handles_.size(); ++cf) {
757     ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
758   }
759   SyncPoint::GetInstance()->DisableProcessing();
760   SyncPoint::GetInstance()->ClearAllCallBacks();
761   SyncPoint::GetInstance()->SetCallBack(
762       "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
763       [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); });
764   SyncPoint::GetInstance()->EnableProcessing();
765   FlushOptions flush_opts;
766   Status s = db_->Flush(flush_opts, handles_);
767   ASSERT_NOK(s);
768   fault_injection_env->SetFilesystemActive(true);
769   Close();
770   SyncPoint::GetInstance()->ClearAllCallBacks();
771 }
772 
773 INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
774                         testing::Bool());
775 
776 INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());
777 
778 }  // namespace ROCKSDB_NAMESPACE
779 
main(int argc,char ** argv)780 int main(int argc, char** argv) {
781   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
782   ::testing::InitGoogleTest(&argc, argv);
783   return RUN_ALL_TESTS();
784 }
785