1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright 2014 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 // This test uses a custom Env to keep track of the state of a filesystem as of
11 // the last "sync". It then checks for data loss errors by purposely dropping
12 // file data (or entire files) not protected by a "sync".
13
14 #include "db/db_impl/db_impl.h"
15 #include "db/log_format.h"
16 #include "db/version_set.h"
17 #include "env/mock_env.h"
18 #include "file/filename.h"
19 #include "logging/logging.h"
20 #include "rocksdb/cache.h"
21 #include "rocksdb/db.h"
22 #include "rocksdb/env.h"
23 #include "rocksdb/table.h"
24 #include "rocksdb/write_batch.h"
25 #include "test_util/fault_injection_test_env.h"
26 #include "test_util/sync_point.h"
27 #include "test_util/testharness.h"
28 #include "test_util/testutil.h"
29 #include "util/mutexlock.h"
30
31 namespace ROCKSDB_NAMESPACE {
32
33 static const int kValueSize = 1000;
34 static const int kMaxNumValues = 2000;
35 static const size_t kNumIterations = 3;
36
37 enum FaultInjectionOptionConfig {
38 kDefault,
39 kDifferentDataDir,
40 kWalDir,
41 kSyncWal,
42 kWalDirSyncWal,
43 kMultiLevels,
44 kEnd,
45 };
46 class FaultInjectionTest
47 : public testing::Test,
48 public testing::WithParamInterface<std::tuple<
49 bool, FaultInjectionOptionConfig, FaultInjectionOptionConfig>> {
50 protected:
51 int option_config_;
52 int non_inclusive_end_range_; // kEnd or equivalent to that
53 // When need to make sure data is persistent, sync WAL
54 bool sync_use_wal_;
55 // When need to make sure data is persistent, call DB::CompactRange()
56 bool sync_use_compact_;
57
58 bool sequential_order_;
59
60 protected:
61 public:
62 enum ExpectedVerifResult { kValExpectFound, kValExpectNoError };
63 enum ResetMethod {
64 kResetDropUnsyncedData,
65 kResetDropRandomUnsyncedData,
66 kResetDeleteUnsyncedFiles,
67 kResetDropAndDeleteUnsynced
68 };
69
70 std::unique_ptr<Env> base_env_;
71 FaultInjectionTestEnv* env_;
72 std::string dbname_;
73 std::shared_ptr<Cache> tiny_cache_;
74 Options options_;
75 DB* db_;
76
FaultInjectionTest()77 FaultInjectionTest()
78 : option_config_(std::get<1>(GetParam())),
79 non_inclusive_end_range_(std::get<2>(GetParam())),
80 sync_use_wal_(false),
81 sync_use_compact_(true),
82 base_env_(nullptr),
83 env_(nullptr),
84 db_(nullptr) {}
85
~FaultInjectionTest()86 ~FaultInjectionTest() override {
87 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
88 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
89 }
90
ChangeOptions()91 bool ChangeOptions() {
92 option_config_++;
93 if (option_config_ >= non_inclusive_end_range_) {
94 return false;
95 } else {
96 if (option_config_ == kMultiLevels) {
97 base_env_.reset(new MockEnv(Env::Default()));
98 }
99 return true;
100 }
101 }
102
103 // Return the current option configuration.
CurrentOptions()104 Options CurrentOptions() {
105 sync_use_wal_ = false;
106 sync_use_compact_ = true;
107 Options options;
108 switch (option_config_) {
109 case kWalDir:
110 options.wal_dir = test::PerThreadDBPath(env_, "fault_test_wal");
111 break;
112 case kDifferentDataDir:
113 options.db_paths.emplace_back(
114 test::PerThreadDBPath(env_, "fault_test_data"), 1000000U);
115 break;
116 case kSyncWal:
117 sync_use_wal_ = true;
118 sync_use_compact_ = false;
119 break;
120 case kWalDirSyncWal:
121 options.wal_dir = test::PerThreadDBPath(env_, "/fault_test_wal");
122 sync_use_wal_ = true;
123 sync_use_compact_ = false;
124 break;
125 case kMultiLevels:
126 options.write_buffer_size = 64 * 1024;
127 options.target_file_size_base = 64 * 1024;
128 options.level0_file_num_compaction_trigger = 2;
129 options.level0_slowdown_writes_trigger = 2;
130 options.level0_stop_writes_trigger = 4;
131 options.max_bytes_for_level_base = 128 * 1024;
132 options.max_write_buffer_number = 2;
133 options.max_background_compactions = 8;
134 options.max_background_flushes = 8;
135 sync_use_wal_ = true;
136 sync_use_compact_ = false;
137 break;
138 default:
139 break;
140 }
141 return options;
142 }
143
NewDB()144 Status NewDB() {
145 assert(db_ == nullptr);
146 assert(tiny_cache_ == nullptr);
147 assert(env_ == nullptr);
148
149 env_ =
150 new FaultInjectionTestEnv(base_env_ ? base_env_.get() : Env::Default());
151
152 options_ = CurrentOptions();
153 options_.env = env_;
154 options_.paranoid_checks = true;
155
156 BlockBasedTableOptions table_options;
157 tiny_cache_ = NewLRUCache(100);
158 table_options.block_cache = tiny_cache_;
159 options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
160
161 dbname_ = test::PerThreadDBPath("fault_test");
162
163 EXPECT_OK(DestroyDB(dbname_, options_));
164
165 options_.create_if_missing = true;
166 Status s = OpenDB();
167 options_.create_if_missing = false;
168 return s;
169 }
170
SetUp()171 void SetUp() override {
172 sequential_order_ = std::get<0>(GetParam());
173 ASSERT_OK(NewDB());
174 }
175
TearDown()176 void TearDown() override {
177 CloseDB();
178
179 Status s = DestroyDB(dbname_, options_);
180
181 delete env_;
182 env_ = nullptr;
183
184 tiny_cache_.reset();
185
186 ASSERT_OK(s);
187 }
188
Build(const WriteOptions & write_options,int start_idx,int num_vals)189 void Build(const WriteOptions& write_options, int start_idx, int num_vals) {
190 std::string key_space, value_space;
191 WriteBatch batch;
192 for (int i = start_idx; i < start_idx + num_vals; i++) {
193 Slice key = Key(i, &key_space);
194 batch.Clear();
195 batch.Put(key, Value(i, &value_space));
196 ASSERT_OK(db_->Write(write_options, &batch));
197 }
198 }
199
ReadValue(int i,std::string * val) const200 Status ReadValue(int i, std::string* val) const {
201 std::string key_space, value_space;
202 Slice key = Key(i, &key_space);
203 Value(i, &value_space);
204 ReadOptions options;
205 return db_->Get(options, key, val);
206 }
207
Verify(int start_idx,int num_vals,ExpectedVerifResult expected) const208 Status Verify(int start_idx, int num_vals,
209 ExpectedVerifResult expected) const {
210 std::string val;
211 std::string value_space;
212 Status s;
213 for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) {
214 Value(i, &value_space);
215 s = ReadValue(i, &val);
216 if (s.ok()) {
217 EXPECT_EQ(value_space, val);
218 }
219 if (expected == kValExpectFound) {
220 if (!s.ok()) {
221 fprintf(stderr, "Error when read %dth record (expect found): %s\n", i,
222 s.ToString().c_str());
223 return s;
224 }
225 } else if (!s.ok() && !s.IsNotFound()) {
226 fprintf(stderr, "Error when read %dth record: %s\n", i,
227 s.ToString().c_str());
228 return s;
229 }
230 }
231 return Status::OK();
232 }
233
234 // Return the ith key
Key(int i,std::string * storage) const235 Slice Key(int i, std::string* storage) const {
236 unsigned long long num = i;
237 if (!sequential_order_) {
238 // random transfer
239 const int m = 0x5bd1e995;
240 num *= m;
241 num ^= num << 24;
242 }
243 char buf[100];
244 snprintf(buf, sizeof(buf), "%016d", static_cast<int>(num));
245 storage->assign(buf, strlen(buf));
246 return Slice(*storage);
247 }
248
249 // Return the value to associate with the specified key
Value(int k,std::string * storage) const250 Slice Value(int k, std::string* storage) const {
251 Random r(k);
252 return test::RandomString(&r, kValueSize, storage);
253 }
254
CloseDB()255 void CloseDB() {
256 delete db_;
257 db_ = nullptr;
258 }
259
OpenDB()260 Status OpenDB() {
261 CloseDB();
262 env_->ResetState();
263 Status s = DB::Open(options_, dbname_, &db_);
264 assert(db_ != nullptr);
265 return s;
266 }
267
DeleteAllData()268 void DeleteAllData() {
269 Iterator* iter = db_->NewIterator(ReadOptions());
270 WriteOptions options;
271 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
272 ASSERT_OK(db_->Delete(WriteOptions(), iter->key()));
273 }
274
275 delete iter;
276
277 FlushOptions flush_options;
278 flush_options.wait = true;
279 db_->Flush(flush_options);
280 }
281
282 // rnd cannot be null for kResetDropRandomUnsyncedData
ResetDBState(ResetMethod reset_method,Random * rnd=nullptr)283 void ResetDBState(ResetMethod reset_method, Random* rnd = nullptr) {
284 env_->AssertNoOpenFile();
285 switch (reset_method) {
286 case kResetDropUnsyncedData:
287 ASSERT_OK(env_->DropUnsyncedFileData());
288 break;
289 case kResetDropRandomUnsyncedData:
290 ASSERT_OK(env_->DropRandomUnsyncedFileData(rnd));
291 break;
292 case kResetDeleteUnsyncedFiles:
293 ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync());
294 break;
295 case kResetDropAndDeleteUnsynced:
296 ASSERT_OK(env_->DropUnsyncedFileData());
297 ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync());
298 break;
299 default:
300 assert(false);
301 }
302 }
303
PartialCompactTestPreFault(int num_pre_sync,int num_post_sync)304 void PartialCompactTestPreFault(int num_pre_sync, int num_post_sync) {
305 DeleteAllData();
306
307 WriteOptions write_options;
308 write_options.sync = sync_use_wal_;
309
310 Build(write_options, 0, num_pre_sync);
311 if (sync_use_compact_) {
312 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
313 }
314 write_options.sync = false;
315 Build(write_options, num_pre_sync, num_post_sync);
316 }
317
PartialCompactTestReopenWithFault(ResetMethod reset_method,int num_pre_sync,int num_post_sync,Random * rnd=nullptr)318 void PartialCompactTestReopenWithFault(ResetMethod reset_method,
319 int num_pre_sync, int num_post_sync,
320 Random* rnd = nullptr) {
321 env_->SetFilesystemActive(false);
322 CloseDB();
323 ResetDBState(reset_method, rnd);
324 ASSERT_OK(OpenDB());
325 ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound));
326 ASSERT_OK(Verify(num_pre_sync, num_post_sync,
327 FaultInjectionTest::kValExpectNoError));
328 WaitCompactionFinish();
329 ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound));
330 ASSERT_OK(Verify(num_pre_sync, num_post_sync,
331 FaultInjectionTest::kValExpectNoError));
332 }
333
NoWriteTestPreFault()334 void NoWriteTestPreFault() {
335 }
336
NoWriteTestReopenWithFault(ResetMethod reset_method)337 void NoWriteTestReopenWithFault(ResetMethod reset_method) {
338 CloseDB();
339 ResetDBState(reset_method);
340 ASSERT_OK(OpenDB());
341 }
342
WaitCompactionFinish()343 void WaitCompactionFinish() {
344 static_cast<DBImpl*>(db_->GetRootDB())->TEST_WaitForCompact();
345 ASSERT_OK(db_->Put(WriteOptions(), "", ""));
346 }
347 };
348
349 class FaultInjectionTestSplitted : public FaultInjectionTest {};
350
TEST_P(FaultInjectionTestSplitted,FaultTest)351 TEST_P(FaultInjectionTestSplitted, FaultTest) {
352 do {
353 Random rnd(301);
354
355 for (size_t idx = 0; idx < kNumIterations; idx++) {
356 int num_pre_sync = rnd.Uniform(kMaxNumValues);
357 int num_post_sync = rnd.Uniform(kMaxNumValues);
358
359 PartialCompactTestPreFault(num_pre_sync, num_post_sync);
360 PartialCompactTestReopenWithFault(kResetDropUnsyncedData, num_pre_sync,
361 num_post_sync);
362 NoWriteTestPreFault();
363 NoWriteTestReopenWithFault(kResetDropUnsyncedData);
364
365 PartialCompactTestPreFault(num_pre_sync, num_post_sync);
366 PartialCompactTestReopenWithFault(kResetDropRandomUnsyncedData,
367 num_pre_sync, num_post_sync, &rnd);
368 NoWriteTestPreFault();
369 NoWriteTestReopenWithFault(kResetDropUnsyncedData);
370
371 // Setting a separate data path won't pass the test as we don't sync
372 // it after creating new files,
373 PartialCompactTestPreFault(num_pre_sync, num_post_sync);
374 PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced,
375 num_pre_sync, num_post_sync);
376 NoWriteTestPreFault();
377 NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
378
379 PartialCompactTestPreFault(num_pre_sync, num_post_sync);
380 // No new files created so we expect all values since no files will be
381 // dropped.
382 PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles, num_pre_sync,
383 num_post_sync);
384 NoWriteTestPreFault();
385 NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles);
386 }
387 } while (ChangeOptions());
388 }
389
390 // Previous log file is not fsynced if sync is forced after log rolling.
TEST_P(FaultInjectionTest,WriteOptionSyncTest)391 TEST_P(FaultInjectionTest, WriteOptionSyncTest) {
392 test::SleepingBackgroundTask sleeping_task_low;
393 env_->SetBackgroundThreads(1, Env::HIGH);
394 // Block the job queue to prevent flush job from running.
395 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
396 Env::Priority::HIGH);
397 sleeping_task_low.WaitUntilSleeping();
398
399 WriteOptions write_options;
400 write_options.sync = false;
401
402 std::string key_space, value_space;
403 ASSERT_OK(
404 db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
405 FlushOptions flush_options;
406 flush_options.wait = false;
407 ASSERT_OK(db_->Flush(flush_options));
408 write_options.sync = true;
409 ASSERT_OK(
410 db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
411 db_->FlushWAL(false);
412
413 env_->SetFilesystemActive(false);
414 NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
415 sleeping_task_low.WakeUp();
416 sleeping_task_low.WaitUntilDone();
417
418 ASSERT_OK(OpenDB());
419 std::string val;
420 Value(2, &value_space);
421 ASSERT_OK(ReadValue(2, &val));
422 ASSERT_EQ(value_space, val);
423
424 Value(1, &value_space);
425 ASSERT_OK(ReadValue(1, &val));
426 ASSERT_EQ(value_space, val);
427 }
428
TEST_P(FaultInjectionTest,UninstalledCompaction)429 TEST_P(FaultInjectionTest, UninstalledCompaction) {
430 options_.target_file_size_base = 32 * 1024;
431 options_.write_buffer_size = 100 << 10; // 100KB
432 options_.level0_file_num_compaction_trigger = 6;
433 options_.level0_stop_writes_trigger = 1 << 10;
434 options_.level0_slowdown_writes_trigger = 1 << 10;
435 options_.max_background_compactions = 1;
436 OpenDB();
437
438 if (!sequential_order_) {
439 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
440 {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"},
441 {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"},
442 {"FaultInjectionTest::FaultTest:2",
443 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
444 });
445 }
446 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
447
448 int kNumKeys = 1000;
449 Build(WriteOptions(), 0, kNumKeys);
450 FlushOptions flush_options;
451 flush_options.wait = true;
452 db_->Flush(flush_options);
453 ASSERT_OK(db_->Put(WriteOptions(), "", ""));
454 TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0");
455 TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1");
456 env_->SetFilesystemActive(false);
457 TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2");
458 CloseDB();
459 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
460 ResetDBState(kResetDropUnsyncedData);
461
462 std::atomic<bool> opened(false);
463 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
464 "DBImpl::Open:Opened", [&](void* /*arg*/) { opened.store(true); });
465 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
466 "DBImpl::BGWorkCompaction",
467 [&](void* /*arg*/) { ASSERT_TRUE(opened.load()); });
468 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
469 ASSERT_OK(OpenDB());
470 ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound));
471 WaitCompactionFinish();
472 ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound));
473 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
474 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
475 }
476
TEST_P(FaultInjectionTest,ManualLogSyncTest)477 TEST_P(FaultInjectionTest, ManualLogSyncTest) {
478 test::SleepingBackgroundTask sleeping_task_low;
479 env_->SetBackgroundThreads(1, Env::HIGH);
480 // Block the job queue to prevent flush job from running.
481 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
482 Env::Priority::HIGH);
483 sleeping_task_low.WaitUntilSleeping();
484
485 WriteOptions write_options;
486 write_options.sync = false;
487
488 std::string key_space, value_space;
489 ASSERT_OK(
490 db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
491 FlushOptions flush_options;
492 flush_options.wait = false;
493 ASSERT_OK(db_->Flush(flush_options));
494 ASSERT_OK(
495 db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
496 ASSERT_OK(db_->FlushWAL(true));
497
498 env_->SetFilesystemActive(false);
499 NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
500 sleeping_task_low.WakeUp();
501 sleeping_task_low.WaitUntilDone();
502
503 ASSERT_OK(OpenDB());
504 std::string val;
505 Value(2, &value_space);
506 ASSERT_OK(ReadValue(2, &val));
507 ASSERT_EQ(value_space, val);
508
509 Value(1, &value_space);
510 ASSERT_OK(ReadValue(1, &val));
511 ASSERT_EQ(value_space, val);
512 }
513
TEST_P(FaultInjectionTest,WriteBatchWalTerminationTest)514 TEST_P(FaultInjectionTest, WriteBatchWalTerminationTest) {
515 ReadOptions ro;
516 Options options = CurrentOptions();
517 options.env = env_;
518
519 WriteOptions wo;
520 wo.sync = true;
521 wo.disableWAL = false;
522 WriteBatch batch;
523 batch.Put("cats", "dogs");
524 batch.MarkWalTerminationPoint();
525 batch.Put("boys", "girls");
526 ASSERT_OK(db_->Write(wo, &batch));
527
528 env_->SetFilesystemActive(false);
529 NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
530 ASSERT_OK(OpenDB());
531
532 std::string val;
533 ASSERT_OK(db_->Get(ro, "cats", &val));
534 ASSERT_EQ("dogs", val);
535 ASSERT_EQ(db_->Get(ro, "boys", &val), Status::NotFound());
536 }
537
538 INSTANTIATE_TEST_CASE_P(
539 FaultTest, FaultInjectionTest,
540 ::testing::Values(std::make_tuple(false, kDefault, kEnd),
541 std::make_tuple(true, kDefault, kEnd)));
542
543 INSTANTIATE_TEST_CASE_P(
544 FaultTest, FaultInjectionTestSplitted,
545 ::testing::Values(std::make_tuple(false, kDefault, kSyncWal),
546 std::make_tuple(true, kDefault, kSyncWal),
547 std::make_tuple(false, kSyncWal, kEnd),
548 std::make_tuple(true, kSyncWal, kEnd)));
549
550 } // namespace ROCKSDB_NAMESPACE
551
main(int argc,char ** argv)552 int main(int argc, char** argv) {
553 ::testing::InitGoogleTest(&argc, argv);
554 return RUN_ALL_TESTS();
555 }
556