1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright 2014 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 // This test uses a custom Env to keep track of the state of a filesystem as of
11 // the last "sync". It then checks for data loss errors by purposely dropping
12 // file data (or entire files) not protected by a "sync".
13 
14 #include "db/db_impl/db_impl.h"
15 #include "db/log_format.h"
16 #include "db/version_set.h"
17 #include "env/mock_env.h"
18 #include "file/filename.h"
19 #include "logging/logging.h"
20 #include "rocksdb/cache.h"
21 #include "rocksdb/db.h"
22 #include "rocksdb/env.h"
23 #include "rocksdb/table.h"
24 #include "rocksdb/write_batch.h"
25 #include "test_util/fault_injection_test_env.h"
26 #include "test_util/sync_point.h"
27 #include "test_util/testharness.h"
28 #include "test_util/testutil.h"
29 #include "util/mutexlock.h"
30 
31 namespace ROCKSDB_NAMESPACE {
32 
33 static const int kValueSize = 1000;
34 static const int kMaxNumValues = 2000;
35 static const size_t kNumIterations = 3;
36 
37 enum FaultInjectionOptionConfig {
38   kDefault,
39   kDifferentDataDir,
40   kWalDir,
41   kSyncWal,
42   kWalDirSyncWal,
43   kMultiLevels,
44   kEnd,
45 };
46 class FaultInjectionTest
47     : public testing::Test,
48       public testing::WithParamInterface<std::tuple<
49           bool, FaultInjectionOptionConfig, FaultInjectionOptionConfig>> {
50  protected:
51   int option_config_;
52   int non_inclusive_end_range_;  // kEnd or equivalent to that
53   // When need to make sure data is persistent, sync WAL
54   bool sync_use_wal_;
55   // When need to make sure data is persistent, call DB::CompactRange()
56   bool sync_use_compact_;
57 
58   bool sequential_order_;
59 
60  protected:
61  public:
62   enum ExpectedVerifResult { kValExpectFound, kValExpectNoError };
63   enum ResetMethod {
64     kResetDropUnsyncedData,
65     kResetDropRandomUnsyncedData,
66     kResetDeleteUnsyncedFiles,
67     kResetDropAndDeleteUnsynced
68   };
69 
70   std::unique_ptr<Env> base_env_;
71   FaultInjectionTestEnv* env_;
72   std::string dbname_;
73   std::shared_ptr<Cache> tiny_cache_;
74   Options options_;
75   DB* db_;
76 
FaultInjectionTest()77   FaultInjectionTest()
78       : option_config_(std::get<1>(GetParam())),
79         non_inclusive_end_range_(std::get<2>(GetParam())),
80         sync_use_wal_(false),
81         sync_use_compact_(true),
82         base_env_(nullptr),
83         env_(nullptr),
84         db_(nullptr) {}
85 
~FaultInjectionTest()86   ~FaultInjectionTest() override {
87     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
88     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
89   }
90 
ChangeOptions()91   bool ChangeOptions() {
92     option_config_++;
93     if (option_config_ >= non_inclusive_end_range_) {
94       return false;
95     } else {
96       if (option_config_ == kMultiLevels) {
97         base_env_.reset(new MockEnv(Env::Default()));
98       }
99       return true;
100     }
101   }
102 
103   // Return the current option configuration.
CurrentOptions()104   Options CurrentOptions() {
105     sync_use_wal_ = false;
106     sync_use_compact_ = true;
107     Options options;
108     switch (option_config_) {
109       case kWalDir:
110         options.wal_dir = test::PerThreadDBPath(env_, "fault_test_wal");
111         break;
112       case kDifferentDataDir:
113         options.db_paths.emplace_back(
114             test::PerThreadDBPath(env_, "fault_test_data"), 1000000U);
115         break;
116       case kSyncWal:
117         sync_use_wal_ = true;
118         sync_use_compact_ = false;
119         break;
120       case kWalDirSyncWal:
121         options.wal_dir = test::PerThreadDBPath(env_, "/fault_test_wal");
122         sync_use_wal_ = true;
123         sync_use_compact_ = false;
124         break;
125       case kMultiLevels:
126         options.write_buffer_size = 64 * 1024;
127         options.target_file_size_base = 64 * 1024;
128         options.level0_file_num_compaction_trigger = 2;
129         options.level0_slowdown_writes_trigger = 2;
130         options.level0_stop_writes_trigger = 4;
131         options.max_bytes_for_level_base = 128 * 1024;
132         options.max_write_buffer_number = 2;
133         options.max_background_compactions = 8;
134         options.max_background_flushes = 8;
135         sync_use_wal_ = true;
136         sync_use_compact_ = false;
137         break;
138       default:
139         break;
140     }
141     return options;
142   }
143 
NewDB()144   Status NewDB() {
145     assert(db_ == nullptr);
146     assert(tiny_cache_ == nullptr);
147     assert(env_ == nullptr);
148 
149     env_ =
150         new FaultInjectionTestEnv(base_env_ ? base_env_.get() : Env::Default());
151 
152     options_ = CurrentOptions();
153     options_.env = env_;
154     options_.paranoid_checks = true;
155 
156     BlockBasedTableOptions table_options;
157     tiny_cache_ = NewLRUCache(100);
158     table_options.block_cache = tiny_cache_;
159     options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
160 
161     dbname_ = test::PerThreadDBPath("fault_test");
162 
163     EXPECT_OK(DestroyDB(dbname_, options_));
164 
165     options_.create_if_missing = true;
166     Status s = OpenDB();
167     options_.create_if_missing = false;
168     return s;
169   }
170 
SetUp()171   void SetUp() override {
172     sequential_order_ = std::get<0>(GetParam());
173     ASSERT_OK(NewDB());
174   }
175 
TearDown()176   void TearDown() override {
177     CloseDB();
178 
179     Status s = DestroyDB(dbname_, options_);
180 
181     delete env_;
182     env_ = nullptr;
183 
184     tiny_cache_.reset();
185 
186     ASSERT_OK(s);
187   }
188 
Build(const WriteOptions & write_options,int start_idx,int num_vals)189   void Build(const WriteOptions& write_options, int start_idx, int num_vals) {
190     std::string key_space, value_space;
191     WriteBatch batch;
192     for (int i = start_idx; i < start_idx + num_vals; i++) {
193       Slice key = Key(i, &key_space);
194       batch.Clear();
195       batch.Put(key, Value(i, &value_space));
196       ASSERT_OK(db_->Write(write_options, &batch));
197     }
198   }
199 
ReadValue(int i,std::string * val) const200   Status ReadValue(int i, std::string* val) const {
201     std::string key_space, value_space;
202     Slice key = Key(i, &key_space);
203     Value(i, &value_space);
204     ReadOptions options;
205     return db_->Get(options, key, val);
206   }
207 
Verify(int start_idx,int num_vals,ExpectedVerifResult expected) const208   Status Verify(int start_idx, int num_vals,
209                 ExpectedVerifResult expected) const {
210     std::string val;
211     std::string value_space;
212     Status s;
213     for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) {
214       Value(i, &value_space);
215       s = ReadValue(i, &val);
216       if (s.ok()) {
217         EXPECT_EQ(value_space, val);
218       }
219       if (expected == kValExpectFound) {
220         if (!s.ok()) {
221           fprintf(stderr, "Error when read %dth record (expect found): %s\n", i,
222                   s.ToString().c_str());
223           return s;
224         }
225       } else if (!s.ok() && !s.IsNotFound()) {
226         fprintf(stderr, "Error when read %dth record: %s\n", i,
227                 s.ToString().c_str());
228         return s;
229       }
230     }
231     return Status::OK();
232   }
233 
234   // Return the ith key
Key(int i,std::string * storage) const235   Slice Key(int i, std::string* storage) const {
236     unsigned long long num = i;
237     if (!sequential_order_) {
238       // random transfer
239       const int m = 0x5bd1e995;
240       num *= m;
241       num ^= num << 24;
242     }
243     char buf[100];
244     snprintf(buf, sizeof(buf), "%016d", static_cast<int>(num));
245     storage->assign(buf, strlen(buf));
246     return Slice(*storage);
247   }
248 
249   // Return the value to associate with the specified key
Value(int k,std::string * storage) const250   Slice Value(int k, std::string* storage) const {
251     Random r(k);
252     return test::RandomString(&r, kValueSize, storage);
253   }
254 
CloseDB()255   void CloseDB() {
256     delete db_;
257     db_ = nullptr;
258   }
259 
OpenDB()260   Status OpenDB() {
261     CloseDB();
262     env_->ResetState();
263     Status s = DB::Open(options_, dbname_, &db_);
264     assert(db_ != nullptr);
265     return s;
266   }
267 
DeleteAllData()268   void DeleteAllData() {
269     Iterator* iter = db_->NewIterator(ReadOptions());
270     WriteOptions options;
271     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
272       ASSERT_OK(db_->Delete(WriteOptions(), iter->key()));
273     }
274 
275     delete iter;
276 
277     FlushOptions flush_options;
278     flush_options.wait = true;
279     db_->Flush(flush_options);
280   }
281 
282   // rnd cannot be null for kResetDropRandomUnsyncedData
ResetDBState(ResetMethod reset_method,Random * rnd=nullptr)283   void ResetDBState(ResetMethod reset_method, Random* rnd = nullptr) {
284     env_->AssertNoOpenFile();
285     switch (reset_method) {
286       case kResetDropUnsyncedData:
287         ASSERT_OK(env_->DropUnsyncedFileData());
288         break;
289       case kResetDropRandomUnsyncedData:
290         ASSERT_OK(env_->DropRandomUnsyncedFileData(rnd));
291         break;
292       case kResetDeleteUnsyncedFiles:
293         ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync());
294         break;
295       case kResetDropAndDeleteUnsynced:
296         ASSERT_OK(env_->DropUnsyncedFileData());
297         ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync());
298         break;
299       default:
300         assert(false);
301     }
302   }
303 
PartialCompactTestPreFault(int num_pre_sync,int num_post_sync)304   void PartialCompactTestPreFault(int num_pre_sync, int num_post_sync) {
305     DeleteAllData();
306 
307     WriteOptions write_options;
308     write_options.sync = sync_use_wal_;
309 
310     Build(write_options, 0, num_pre_sync);
311     if (sync_use_compact_) {
312       db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
313     }
314     write_options.sync = false;
315     Build(write_options, num_pre_sync, num_post_sync);
316   }
317 
PartialCompactTestReopenWithFault(ResetMethod reset_method,int num_pre_sync,int num_post_sync,Random * rnd=nullptr)318   void PartialCompactTestReopenWithFault(ResetMethod reset_method,
319                                          int num_pre_sync, int num_post_sync,
320                                          Random* rnd = nullptr) {
321     env_->SetFilesystemActive(false);
322     CloseDB();
323     ResetDBState(reset_method, rnd);
324     ASSERT_OK(OpenDB());
325     ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound));
326     ASSERT_OK(Verify(num_pre_sync, num_post_sync,
327                      FaultInjectionTest::kValExpectNoError));
328     WaitCompactionFinish();
329     ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound));
330     ASSERT_OK(Verify(num_pre_sync, num_post_sync,
331                      FaultInjectionTest::kValExpectNoError));
332   }
333 
NoWriteTestPreFault()334   void NoWriteTestPreFault() {
335   }
336 
NoWriteTestReopenWithFault(ResetMethod reset_method)337   void NoWriteTestReopenWithFault(ResetMethod reset_method) {
338     CloseDB();
339     ResetDBState(reset_method);
340     ASSERT_OK(OpenDB());
341   }
342 
WaitCompactionFinish()343   void WaitCompactionFinish() {
344     static_cast<DBImpl*>(db_->GetRootDB())->TEST_WaitForCompact();
345     ASSERT_OK(db_->Put(WriteOptions(), "", ""));
346   }
347 };
348 
349 class FaultInjectionTestSplitted : public FaultInjectionTest {};
350 
TEST_P(FaultInjectionTestSplitted,FaultTest)351 TEST_P(FaultInjectionTestSplitted, FaultTest) {
352   do {
353     Random rnd(301);
354 
355     for (size_t idx = 0; idx < kNumIterations; idx++) {
356       int num_pre_sync = rnd.Uniform(kMaxNumValues);
357       int num_post_sync = rnd.Uniform(kMaxNumValues);
358 
359       PartialCompactTestPreFault(num_pre_sync, num_post_sync);
360       PartialCompactTestReopenWithFault(kResetDropUnsyncedData, num_pre_sync,
361                                         num_post_sync);
362       NoWriteTestPreFault();
363       NoWriteTestReopenWithFault(kResetDropUnsyncedData);
364 
365       PartialCompactTestPreFault(num_pre_sync, num_post_sync);
366       PartialCompactTestReopenWithFault(kResetDropRandomUnsyncedData,
367                                         num_pre_sync, num_post_sync, &rnd);
368       NoWriteTestPreFault();
369       NoWriteTestReopenWithFault(kResetDropUnsyncedData);
370 
371       // Setting a separate data path won't pass the test as we don't sync
372       // it after creating new files,
373       PartialCompactTestPreFault(num_pre_sync, num_post_sync);
374       PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced,
375                                         num_pre_sync, num_post_sync);
376       NoWriteTestPreFault();
377       NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
378 
379       PartialCompactTestPreFault(num_pre_sync, num_post_sync);
380       // No new files created so we expect all values since no files will be
381       // dropped.
382       PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles, num_pre_sync,
383                                         num_post_sync);
384       NoWriteTestPreFault();
385       NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles);
386     }
387   } while (ChangeOptions());
388 }
389 
390 // Previous log file is not fsynced if sync is forced after log rolling.
TEST_P(FaultInjectionTest,WriteOptionSyncTest)391 TEST_P(FaultInjectionTest, WriteOptionSyncTest) {
392   test::SleepingBackgroundTask sleeping_task_low;
393   env_->SetBackgroundThreads(1, Env::HIGH);
394   // Block the job queue to prevent flush job from running.
395   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
396                  Env::Priority::HIGH);
397   sleeping_task_low.WaitUntilSleeping();
398 
399   WriteOptions write_options;
400   write_options.sync = false;
401 
402   std::string key_space, value_space;
403   ASSERT_OK(
404       db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
405   FlushOptions flush_options;
406   flush_options.wait = false;
407   ASSERT_OK(db_->Flush(flush_options));
408   write_options.sync = true;
409   ASSERT_OK(
410       db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
411   db_->FlushWAL(false);
412 
413   env_->SetFilesystemActive(false);
414   NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
415   sleeping_task_low.WakeUp();
416   sleeping_task_low.WaitUntilDone();
417 
418   ASSERT_OK(OpenDB());
419   std::string val;
420   Value(2, &value_space);
421   ASSERT_OK(ReadValue(2, &val));
422   ASSERT_EQ(value_space, val);
423 
424   Value(1, &value_space);
425   ASSERT_OK(ReadValue(1, &val));
426   ASSERT_EQ(value_space, val);
427 }
428 
TEST_P(FaultInjectionTest,UninstalledCompaction)429 TEST_P(FaultInjectionTest, UninstalledCompaction) {
430   options_.target_file_size_base = 32 * 1024;
431   options_.write_buffer_size = 100 << 10;  // 100KB
432   options_.level0_file_num_compaction_trigger = 6;
433   options_.level0_stop_writes_trigger = 1 << 10;
434   options_.level0_slowdown_writes_trigger = 1 << 10;
435   options_.max_background_compactions = 1;
436   OpenDB();
437 
438   if (!sequential_order_) {
439     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
440         {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"},
441         {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"},
442         {"FaultInjectionTest::FaultTest:2",
443          "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
444     });
445   }
446   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
447 
448   int kNumKeys = 1000;
449   Build(WriteOptions(), 0, kNumKeys);
450   FlushOptions flush_options;
451   flush_options.wait = true;
452   db_->Flush(flush_options);
453   ASSERT_OK(db_->Put(WriteOptions(), "", ""));
454   TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0");
455   TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1");
456   env_->SetFilesystemActive(false);
457   TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2");
458   CloseDB();
459   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
460   ResetDBState(kResetDropUnsyncedData);
461 
462   std::atomic<bool> opened(false);
463   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
464       "DBImpl::Open:Opened", [&](void* /*arg*/) { opened.store(true); });
465   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
466       "DBImpl::BGWorkCompaction",
467       [&](void* /*arg*/) { ASSERT_TRUE(opened.load()); });
468   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
469   ASSERT_OK(OpenDB());
470   ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound));
471   WaitCompactionFinish();
472   ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound));
473   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
474   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
475 }
476 
TEST_P(FaultInjectionTest,ManualLogSyncTest)477 TEST_P(FaultInjectionTest, ManualLogSyncTest) {
478   test::SleepingBackgroundTask sleeping_task_low;
479   env_->SetBackgroundThreads(1, Env::HIGH);
480   // Block the job queue to prevent flush job from running.
481   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
482                  Env::Priority::HIGH);
483   sleeping_task_low.WaitUntilSleeping();
484 
485   WriteOptions write_options;
486   write_options.sync = false;
487 
488   std::string key_space, value_space;
489   ASSERT_OK(
490       db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
491   FlushOptions flush_options;
492   flush_options.wait = false;
493   ASSERT_OK(db_->Flush(flush_options));
494   ASSERT_OK(
495       db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
496   ASSERT_OK(db_->FlushWAL(true));
497 
498   env_->SetFilesystemActive(false);
499   NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
500   sleeping_task_low.WakeUp();
501   sleeping_task_low.WaitUntilDone();
502 
503   ASSERT_OK(OpenDB());
504   std::string val;
505   Value(2, &value_space);
506   ASSERT_OK(ReadValue(2, &val));
507   ASSERT_EQ(value_space, val);
508 
509   Value(1, &value_space);
510   ASSERT_OK(ReadValue(1, &val));
511   ASSERT_EQ(value_space, val);
512 }
513 
TEST_P(FaultInjectionTest,WriteBatchWalTerminationTest)514 TEST_P(FaultInjectionTest, WriteBatchWalTerminationTest) {
515   ReadOptions ro;
516   Options options = CurrentOptions();
517   options.env = env_;
518 
519   WriteOptions wo;
520   wo.sync = true;
521   wo.disableWAL = false;
522   WriteBatch batch;
523   batch.Put("cats", "dogs");
524   batch.MarkWalTerminationPoint();
525   batch.Put("boys", "girls");
526   ASSERT_OK(db_->Write(wo, &batch));
527 
528   env_->SetFilesystemActive(false);
529   NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
530   ASSERT_OK(OpenDB());
531 
532   std::string val;
533   ASSERT_OK(db_->Get(ro, "cats", &val));
534   ASSERT_EQ("dogs", val);
535   ASSERT_EQ(db_->Get(ro, "boys", &val), Status::NotFound());
536 }
537 
538 INSTANTIATE_TEST_CASE_P(
539     FaultTest, FaultInjectionTest,
540     ::testing::Values(std::make_tuple(false, kDefault, kEnd),
541                       std::make_tuple(true, kDefault, kEnd)));
542 
543 INSTANTIATE_TEST_CASE_P(
544     FaultTest, FaultInjectionTestSplitted,
545     ::testing::Values(std::make_tuple(false, kDefault, kSyncWal),
546                       std::make_tuple(true, kDefault, kSyncWal),
547                       std::make_tuple(false, kSyncWal, kEnd),
548                       std::make_tuple(true, kSyncWal, kEnd)));
549 
550 }  // namespace ROCKSDB_NAMESPACE
551 
main(int argc,char ** argv)552 int main(int argc, char** argv) {
553   ::testing::InitGoogleTest(&argc, argv);
554   return RUN_ALL_TESTS();
555 }
556