1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #ifndef ROCKSDB_LITE
10
11 #include "db/db_test_util.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/io_status.h"
14 #include "rocksdb/perf_context.h"
15 #include "rocksdb/sst_file_manager.h"
16 #include "test_util/fault_injection_test_env.h"
17 #include "test_util/fault_injection_test_fs.h"
18 #if !defined(ROCKSDB_LITE)
19 #include "test_util/sync_point.h"
20 #endif
21
22 namespace ROCKSDB_NAMESPACE {
23
24 class DBErrorHandlingFSTest : public DBTestBase {
25 public:
DBErrorHandlingFSTest()26 DBErrorHandlingFSTest() : DBTestBase("/db_error_handling_fs_test") {}
27
GetManifestNameFromLiveFiles()28 std::string GetManifestNameFromLiveFiles() {
29 std::vector<std::string> live_files;
30 uint64_t manifest_size;
31
32 dbfull()->GetLiveFiles(live_files, &manifest_size, false);
33 for (auto& file : live_files) {
34 uint64_t num = 0;
35 FileType type;
36 if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {
37 return file;
38 }
39 }
40 return "";
41 }
42 };
43
44 class DBErrorHandlingFS : public FileSystemWrapper {
45 public:
DBErrorHandlingFS()46 DBErrorHandlingFS()
47 : FileSystemWrapper(FileSystem::Default()),
48 trig_no_space(false),
49 trig_io_error(false) {}
50
SetTrigNoSpace()51 void SetTrigNoSpace() { trig_no_space = true; }
SetTrigIoError()52 void SetTrigIoError() { trig_io_error = true; }
53
54 private:
55 bool trig_no_space;
56 bool trig_io_error;
57 };
58
59 class ErrorHandlerFSListener : public EventListener {
60 public:
ErrorHandlerFSListener()61 ErrorHandlerFSListener()
62 : mutex_(),
63 cv_(&mutex_),
64 no_auto_recovery_(false),
65 recovery_complete_(false),
66 file_creation_started_(false),
67 override_bg_error_(false),
68 file_count_(0),
69 fault_fs_(nullptr) {}
70
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)71 void OnTableFileCreationStarted(
72 const TableFileCreationBriefInfo& /*ti*/) override {
73 InstrumentedMutexLock l(&mutex_);
74 file_creation_started_ = true;
75 if (file_count_ > 0) {
76 if (--file_count_ == 0) {
77 fault_fs_->SetFilesystemActive(false, file_creation_error_);
78 file_creation_error_ = IOStatus::OK();
79 }
80 }
81 cv_.SignalAll();
82 }
83
OnErrorRecoveryBegin(BackgroundErrorReason,Status,bool * auto_recovery)84 void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
85 Status /*bg_error*/, bool* auto_recovery) override {
86 if (*auto_recovery && no_auto_recovery_) {
87 *auto_recovery = false;
88 }
89 }
90
OnErrorRecoveryCompleted(Status)91 void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
92 InstrumentedMutexLock l(&mutex_);
93 recovery_complete_ = true;
94 cv_.SignalAll();
95 }
96
WaitForRecovery(uint64_t)97 bool WaitForRecovery(uint64_t /*abs_time_us*/) {
98 InstrumentedMutexLock l(&mutex_);
99 while (!recovery_complete_) {
100 cv_.Wait(/*abs_time_us*/);
101 }
102 if (recovery_complete_) {
103 recovery_complete_ = false;
104 return true;
105 }
106 return false;
107 }
108
WaitForTableFileCreationStarted(uint64_t)109 void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {
110 InstrumentedMutexLock l(&mutex_);
111 while (!file_creation_started_) {
112 cv_.Wait(/*abs_time_us*/);
113 }
114 file_creation_started_ = false;
115 }
116
OnBackgroundError(BackgroundErrorReason,Status * bg_error)117 void OnBackgroundError(BackgroundErrorReason /*reason*/,
118 Status* bg_error) override {
119 if (override_bg_error_) {
120 *bg_error = bg_error_;
121 override_bg_error_ = false;
122 }
123 }
124
EnableAutoRecovery(bool enable=true)125 void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
126
OverrideBGError(Status bg_err)127 void OverrideBGError(Status bg_err) {
128 bg_error_ = bg_err;
129 override_bg_error_ = true;
130 }
131
InjectFileCreationError(FaultInjectionTestFS * fs,int file_count,IOStatus io_s)132 void InjectFileCreationError(FaultInjectionTestFS* fs, int file_count,
133 IOStatus io_s) {
134 fault_fs_ = fs;
135 file_count_ = file_count;
136 file_creation_error_ = io_s;
137 }
138
139 private:
140 InstrumentedMutex mutex_;
141 InstrumentedCondVar cv_;
142 bool no_auto_recovery_;
143 bool recovery_complete_;
144 bool file_creation_started_;
145 bool override_bg_error_;
146 int file_count_;
147 IOStatus file_creation_error_;
148 Status bg_error_;
149 FaultInjectionTestFS* fault_fs_;
150 };
151
TEST_F(DBErrorHandlingFSTest,FLushWriteError)152 TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
153 std::shared_ptr<FaultInjectionTestFS> fault_fs(
154 new FaultInjectionTestFS(FileSystem::Default()));
155 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
156 std::shared_ptr<ErrorHandlerFSListener> listener(
157 new ErrorHandlerFSListener());
158 Options options = GetDefaultOptions();
159 options.env = fault_fs_env.get();
160 options.create_if_missing = true;
161 options.listeners.emplace_back(listener);
162 Status s;
163
164 listener->EnableAutoRecovery(false);
165 DestroyAndReopen(options);
166
167 Put(Key(0), "val");
168 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
169 fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
170 });
171 SyncPoint::GetInstance()->EnableProcessing();
172 s = Flush();
173 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
174 SyncPoint::GetInstance()->DisableProcessing();
175 fault_fs->SetFilesystemActive(true);
176 s = dbfull()->Resume();
177 ASSERT_EQ(s, Status::OK());
178
179 Reopen(options);
180 ASSERT_EQ("val", Get(Key(0)));
181 Destroy(options);
182 }
183
TEST_F(DBErrorHandlingFSTest,FLushWritRetryableeError)184 TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) {
185 std::shared_ptr<FaultInjectionTestFS> fault_fs(
186 new FaultInjectionTestFS(FileSystem::Default()));
187 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
188 std::shared_ptr<ErrorHandlerFSListener> listener(
189 new ErrorHandlerFSListener());
190 Options options = GetDefaultOptions();
191 options.env = fault_fs_env.get();
192 options.create_if_missing = true;
193 options.listeners.emplace_back(listener);
194 Status s;
195
196 listener->EnableAutoRecovery(false);
197 DestroyAndReopen(options);
198
199 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
200 error_msg.SetRetryable(true);
201
202 Put(Key(1), "val1");
203 SyncPoint::GetInstance()->SetCallBack(
204 "BuildTable:BeforeFinishBuildTable",
205 [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
206 SyncPoint::GetInstance()->EnableProcessing();
207 s = Flush();
208 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
209 SyncPoint::GetInstance()->DisableProcessing();
210 fault_fs->SetFilesystemActive(true);
211 s = dbfull()->Resume();
212 ASSERT_EQ(s, Status::OK());
213 Reopen(options);
214 ASSERT_EQ("val1", Get(Key(1)));
215
216 Put(Key(2), "val2");
217 SyncPoint::GetInstance()->SetCallBack(
218 "BuildTable:BeforeSyncTable",
219 [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
220 SyncPoint::GetInstance()->EnableProcessing();
221 s = Flush();
222 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
223 SyncPoint::GetInstance()->DisableProcessing();
224 fault_fs->SetFilesystemActive(true);
225 s = dbfull()->Resume();
226 ASSERT_EQ(s, Status::OK());
227 Reopen(options);
228 ASSERT_EQ("val2", Get(Key(2)));
229
230 Put(Key(3), "val3");
231 SyncPoint::GetInstance()->SetCallBack(
232 "BuildTable:BeforeCloseTableFile",
233 [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
234 SyncPoint::GetInstance()->EnableProcessing();
235 s = Flush();
236 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
237 SyncPoint::GetInstance()->DisableProcessing();
238 fault_fs->SetFilesystemActive(true);
239 s = dbfull()->Resume();
240 ASSERT_EQ(s, Status::OK());
241 Reopen(options);
242 ASSERT_EQ("val3", Get(Key(3)));
243
244 Destroy(options);
245 }
246
TEST_F(DBErrorHandlingFSTest,ManifestWriteError)247 TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
248 std::shared_ptr<FaultInjectionTestFS> fault_fs(
249 new FaultInjectionTestFS(FileSystem::Default()));
250 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
251 std::shared_ptr<ErrorHandlerFSListener> listener(
252 new ErrorHandlerFSListener());
253 Options options = GetDefaultOptions();
254 options.env = fault_fs_env.get();
255 options.create_if_missing = true;
256 options.listeners.emplace_back(listener);
257 Status s;
258 std::string old_manifest;
259 std::string new_manifest;
260
261 listener->EnableAutoRecovery(false);
262 DestroyAndReopen(options);
263 old_manifest = GetManifestNameFromLiveFiles();
264
265 Put(Key(0), "val");
266 Flush();
267 Put(Key(1), "val");
268 SyncPoint::GetInstance()->SetCallBack(
269 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
270 fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
271 });
272 SyncPoint::GetInstance()->EnableProcessing();
273 s = Flush();
274 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
275 SyncPoint::GetInstance()->ClearAllCallBacks();
276 SyncPoint::GetInstance()->DisableProcessing();
277 fault_fs->SetFilesystemActive(true);
278 s = dbfull()->Resume();
279 ASSERT_EQ(s, Status::OK());
280
281 new_manifest = GetManifestNameFromLiveFiles();
282 ASSERT_NE(new_manifest, old_manifest);
283
284 Reopen(options);
285 ASSERT_EQ("val", Get(Key(0)));
286 ASSERT_EQ("val", Get(Key(1)));
287 Close();
288 }
289
TEST_F(DBErrorHandlingFSTest,ManifestWriteRetryableError)290 TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
291 std::shared_ptr<FaultInjectionTestFS> fault_fs(
292 new FaultInjectionTestFS(FileSystem::Default()));
293 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
294 std::shared_ptr<ErrorHandlerFSListener> listener(
295 new ErrorHandlerFSListener());
296 Options options = GetDefaultOptions();
297 options.env = fault_fs_env.get();
298 options.create_if_missing = true;
299 options.listeners.emplace_back(listener);
300 Status s;
301 std::string old_manifest;
302 std::string new_manifest;
303
304 listener->EnableAutoRecovery(false);
305 DestroyAndReopen(options);
306 old_manifest = GetManifestNameFromLiveFiles();
307
308 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
309 error_msg.SetRetryable(true);
310
311 Put(Key(0), "val");
312 Flush();
313 Put(Key(1), "val");
314 SyncPoint::GetInstance()->SetCallBack(
315 "VersionSet::LogAndApply:WriteManifest",
316 [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
317 SyncPoint::GetInstance()->EnableProcessing();
318 s = Flush();
319 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
320 SyncPoint::GetInstance()->ClearAllCallBacks();
321 SyncPoint::GetInstance()->DisableProcessing();
322 fault_fs->SetFilesystemActive(true);
323 s = dbfull()->Resume();
324 ASSERT_EQ(s, Status::OK());
325
326 new_manifest = GetManifestNameFromLiveFiles();
327 ASSERT_NE(new_manifest, old_manifest);
328
329 Reopen(options);
330 ASSERT_EQ("val", Get(Key(0)));
331 ASSERT_EQ("val", Get(Key(1)));
332 Close();
333 }
334
TEST_F(DBErrorHandlingFSTest,DoubleManifestWriteError)335 TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
336 std::shared_ptr<FaultInjectionTestFS> fault_fs(
337 new FaultInjectionTestFS(FileSystem::Default()));
338 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
339 std::shared_ptr<ErrorHandlerFSListener> listener(
340 new ErrorHandlerFSListener());
341 Options options = GetDefaultOptions();
342 options.env = fault_fs_env.get();
343 options.create_if_missing = true;
344 options.listeners.emplace_back(listener);
345 Status s;
346 std::string old_manifest;
347 std::string new_manifest;
348
349 listener->EnableAutoRecovery(false);
350 DestroyAndReopen(options);
351 old_manifest = GetManifestNameFromLiveFiles();
352
353 Put(Key(0), "val");
354 Flush();
355 Put(Key(1), "val");
356 SyncPoint::GetInstance()->SetCallBack(
357 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
358 fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
359 });
360 SyncPoint::GetInstance()->EnableProcessing();
361 s = Flush();
362 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
363 fault_fs->SetFilesystemActive(true);
364
365 // This Resume() will attempt to create a new manifest file and fail again
366 s = dbfull()->Resume();
367 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
368 fault_fs->SetFilesystemActive(true);
369 SyncPoint::GetInstance()->ClearAllCallBacks();
370 SyncPoint::GetInstance()->DisableProcessing();
371
372 // A successful Resume() will create a new manifest file
373 s = dbfull()->Resume();
374 ASSERT_EQ(s, Status::OK());
375
376 new_manifest = GetManifestNameFromLiveFiles();
377 ASSERT_NE(new_manifest, old_manifest);
378
379 Reopen(options);
380 ASSERT_EQ("val", Get(Key(0)));
381 ASSERT_EQ("val", Get(Key(1)));
382 Close();
383 }
384
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteError)385 TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
386 std::shared_ptr<FaultInjectionTestFS> fault_fs(
387 new FaultInjectionTestFS(FileSystem::Default()));
388 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
389 std::shared_ptr<ErrorHandlerFSListener> listener(
390 new ErrorHandlerFSListener());
391 Options options = GetDefaultOptions();
392 options.env = fault_fs_env.get();
393 options.create_if_missing = true;
394 options.level0_file_num_compaction_trigger = 2;
395 options.listeners.emplace_back(listener);
396 Status s;
397 std::string old_manifest;
398 std::string new_manifest;
399 std::atomic<bool> fail_manifest(false);
400 DestroyAndReopen(options);
401 old_manifest = GetManifestNameFromLiveFiles();
402
403 Put(Key(0), "val");
404 Put(Key(2), "val");
405 s = Flush();
406 ASSERT_EQ(s, Status::OK());
407
408 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
409 // Wait for flush of 2nd L0 file before starting compaction
410 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
411 "BackgroundCallCompaction:0"},
412 // Wait for compaction to detect manifest write error
413 {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
414 // Make compaction thread wait for error to be cleared
415 {"CompactionManifestWriteError:1",
416 "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
417 // Wait for DB instance to clear bg_error before calling
418 // TEST_WaitForCompact
419 {"SstFileManagerImpl::ErrorCleared", "CompactionManifestWriteError:2"}});
420 // trigger manifest write failure in compaction thread
421 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
422 "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
423 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
424 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
425 if (fail_manifest.load()) {
426 fault_fs->SetFilesystemActive(false,
427 IOStatus::NoSpace("Out of space"));
428 }
429 });
430 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
431
432 Put(Key(1), "val");
433 // This Flush will trigger a compaction, which will fail when appending to
434 // the manifest
435 s = Flush();
436 ASSERT_EQ(s, Status::OK());
437
438 TEST_SYNC_POINT("CompactionManifestWriteError:0");
439 // Clear all errors so when the compaction is retried, it will succeed
440 fault_fs->SetFilesystemActive(true);
441 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
442 TEST_SYNC_POINT("CompactionManifestWriteError:1");
443 TEST_SYNC_POINT("CompactionManifestWriteError:2");
444
445 s = dbfull()->TEST_WaitForCompact();
446 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
447 ASSERT_EQ(s, Status::OK());
448
449 new_manifest = GetManifestNameFromLiveFiles();
450 ASSERT_NE(new_manifest, old_manifest);
451 Reopen(options);
452 ASSERT_EQ("val", Get(Key(0)));
453 ASSERT_EQ("val", Get(Key(1)));
454 ASSERT_EQ("val", Get(Key(2)));
455 Close();
456 }
457
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteRetryableError)458 TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
459 std::shared_ptr<FaultInjectionTestFS> fault_fs(
460 new FaultInjectionTestFS(FileSystem::Default()));
461 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
462 std::shared_ptr<ErrorHandlerFSListener> listener(
463 new ErrorHandlerFSListener());
464 Options options = GetDefaultOptions();
465 options.env = fault_fs_env.get();
466 options.create_if_missing = true;
467 options.level0_file_num_compaction_trigger = 2;
468 options.listeners.emplace_back(listener);
469 Status s;
470 std::string old_manifest;
471 std::string new_manifest;
472 DestroyAndReopen(options);
473 old_manifest = GetManifestNameFromLiveFiles();
474
475 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
476 error_msg.SetRetryable(true);
477
478 Put(Key(0), "val");
479 Put(Key(2), "val");
480 s = Flush();
481 ASSERT_EQ(s, Status::OK());
482
483 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
484 listener->EnableAutoRecovery(false);
485 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
486 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
487 [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
488 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
489
490 Put(Key(1), "val");
491 s = Flush();
492 ASSERT_EQ(s, Status::OK());
493
494 s = dbfull()->TEST_WaitForCompact();
495 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
496
497 fault_fs->SetFilesystemActive(true);
498 SyncPoint::GetInstance()->ClearAllCallBacks();
499 SyncPoint::GetInstance()->DisableProcessing();
500 s = dbfull()->Resume();
501 ASSERT_EQ(s, Status::OK());
502
503 new_manifest = GetManifestNameFromLiveFiles();
504 ASSERT_NE(new_manifest, old_manifest);
505
506 Reopen(options);
507 ASSERT_EQ("val", Get(Key(0)));
508 ASSERT_EQ("val", Get(Key(1)));
509 ASSERT_EQ("val", Get(Key(2)));
510 Close();
511 }
512
TEST_F(DBErrorHandlingFSTest,CompactionWriteError)513 TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
514 std::shared_ptr<FaultInjectionTestFS> fault_fs(
515 new FaultInjectionTestFS(FileSystem::Default()));
516 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
517 std::shared_ptr<ErrorHandlerFSListener> listener(
518 new ErrorHandlerFSListener());
519 Options options = GetDefaultOptions();
520 options.env = fault_fs_env.get();
521 options.create_if_missing = true;
522 options.level0_file_num_compaction_trigger = 2;
523 options.listeners.emplace_back(listener);
524 Status s;
525 DestroyAndReopen(options);
526
527 Put(Key(0), "va;");
528 Put(Key(2), "va;");
529 s = Flush();
530 ASSERT_EQ(s, Status::OK());
531
532 listener->OverrideBGError(
533 Status(Status::NoSpace(), Status::Severity::kHardError));
534 listener->EnableAutoRecovery(false);
535 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
536 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
537 "BackgroundCallCompaction:0"}});
538 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
539 "BackgroundCallCompaction:0", [&](void*) {
540 fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
541 });
542 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
543
544 Put(Key(1), "val");
545 s = Flush();
546 ASSERT_EQ(s, Status::OK());
547
548 s = dbfull()->TEST_WaitForCompact();
549 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
550
551 fault_fs->SetFilesystemActive(true);
552 s = dbfull()->Resume();
553 ASSERT_EQ(s, Status::OK());
554 Destroy(options);
555 }
556
TEST_F(DBErrorHandlingFSTest,CompactionWriteRetryableError)557 TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
558 std::shared_ptr<FaultInjectionTestFS> fault_fs(
559 new FaultInjectionTestFS(FileSystem::Default()));
560 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
561 std::shared_ptr<ErrorHandlerFSListener> listener(
562 new ErrorHandlerFSListener());
563 Options options = GetDefaultOptions();
564 options.env = fault_fs_env.get();
565 options.create_if_missing = true;
566 options.level0_file_num_compaction_trigger = 2;
567 options.listeners.emplace_back(listener);
568 Status s;
569 DestroyAndReopen(options);
570
571 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
572 error_msg.SetRetryable(true);
573
574 Put(Key(0), "va;");
575 Put(Key(2), "va;");
576 s = Flush();
577 ASSERT_EQ(s, Status::OK());
578
579 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
580 listener->EnableAutoRecovery(false);
581 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
582 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
583 "BackgroundCallCompaction:0"}});
584 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
585 "BackgroundCallCompaction:0",
586 [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
587 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
588
589 Put(Key(1), "val");
590 s = Flush();
591 ASSERT_EQ(s, Status::OK());
592
593 s = dbfull()->TEST_WaitForCompact();
594 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
595
596 fault_fs->SetFilesystemActive(true);
597 SyncPoint::GetInstance()->ClearAllCallBacks();
598 SyncPoint::GetInstance()->DisableProcessing();
599 s = dbfull()->Resume();
600 ASSERT_EQ(s, Status::OK());
601 Destroy(options);
602 }
603
TEST_F(DBErrorHandlingFSTest,CorruptionError)604 TEST_F(DBErrorHandlingFSTest, CorruptionError) {
605 std::shared_ptr<FaultInjectionTestFS> fault_fs(
606 new FaultInjectionTestFS(FileSystem::Default()));
607 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
608 Options options = GetDefaultOptions();
609 options.env = fault_fs_env.get();
610 options.create_if_missing = true;
611 options.level0_file_num_compaction_trigger = 2;
612 Status s;
613 DestroyAndReopen(options);
614
615 Put(Key(0), "va;");
616 Put(Key(2), "va;");
617 s = Flush();
618 ASSERT_EQ(s, Status::OK());
619
620 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
621 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
622 "BackgroundCallCompaction:0"}});
623 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
624 "BackgroundCallCompaction:0", [&](void*) {
625 fault_fs->SetFilesystemActive(false,
626 IOStatus::Corruption("Corruption"));
627 });
628 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
629
630 Put(Key(1), "val");
631 s = Flush();
632 ASSERT_EQ(s, Status::OK());
633
634 s = dbfull()->TEST_WaitForCompact();
635 ASSERT_EQ(s.severity(),
636 ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
637
638 fault_fs->SetFilesystemActive(true);
639 s = dbfull()->Resume();
640 ASSERT_NE(s, Status::OK());
641 Destroy(options);
642 }
643
TEST_F(DBErrorHandlingFSTest,AutoRecoverFlushError)644 TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
645 std::shared_ptr<FaultInjectionTestFS> fault_fs(
646 new FaultInjectionTestFS(FileSystem::Default()));
647 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
648 std::shared_ptr<ErrorHandlerFSListener> listener(
649 new ErrorHandlerFSListener());
650 Options options = GetDefaultOptions();
651 options.env = fault_fs_env.get();
652 options.create_if_missing = true;
653 options.listeners.emplace_back(listener);
654 Status s;
655
656 listener->EnableAutoRecovery();
657 DestroyAndReopen(options);
658
659 Put(Key(0), "val");
660 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
661 fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
662 });
663 SyncPoint::GetInstance()->EnableProcessing();
664 s = Flush();
665 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
666 SyncPoint::GetInstance()->DisableProcessing();
667 fault_fs->SetFilesystemActive(true);
668 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
669
670 s = Put(Key(1), "val");
671 ASSERT_EQ(s, Status::OK());
672
673 Reopen(options);
674 ASSERT_EQ("val", Get(Key(0)));
675 ASSERT_EQ("val", Get(Key(1)));
676 Destroy(options);
677 }
678
TEST_F(DBErrorHandlingFSTest,FailRecoverFlushError)679 TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) {
680 std::shared_ptr<FaultInjectionTestFS> fault_fs(
681 new FaultInjectionTestFS(FileSystem::Default()));
682 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
683 std::shared_ptr<ErrorHandlerFSListener> listener(
684 new ErrorHandlerFSListener());
685 Options options = GetDefaultOptions();
686 options.env = fault_fs_env.get();
687 options.create_if_missing = true;
688 options.listeners.emplace_back(listener);
689 Status s;
690
691 listener->EnableAutoRecovery();
692 DestroyAndReopen(options);
693
694 Put(Key(0), "val");
695 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
696 fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
697 });
698 SyncPoint::GetInstance()->EnableProcessing();
699 s = Flush();
700 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
701 // We should be able to shutdown the database while auto recovery is going
702 // on in the background
703 Close();
704 DestroyDB(dbname_, options);
705 }
706
TEST_F(DBErrorHandlingFSTest,WALWriteError)707 TEST_F(DBErrorHandlingFSTest, WALWriteError) {
708 std::shared_ptr<FaultInjectionTestFS> fault_fs(
709 new FaultInjectionTestFS(FileSystem::Default()));
710 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
711 std::shared_ptr<ErrorHandlerFSListener> listener(
712 new ErrorHandlerFSListener());
713 Options options = GetDefaultOptions();
714 options.env = fault_fs_env.get();
715 options.create_if_missing = true;
716 options.writable_file_max_buffer_size = 32768;
717 options.listeners.emplace_back(listener);
718 Status s;
719 Random rnd(301);
720
721 listener->EnableAutoRecovery();
722 DestroyAndReopen(options);
723
724 {
725 WriteBatch batch;
726
727 for (auto i = 0; i < 100; ++i) {
728 batch.Put(Key(i), RandomString(&rnd, 1024));
729 }
730
731 WriteOptions wopts;
732 wopts.sync = true;
733 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
734 };
735
736 {
737 WriteBatch batch;
738 int write_error = 0;
739
740 for (auto i = 100; i < 199; ++i) {
741 batch.Put(Key(i), RandomString(&rnd, 1024));
742 }
743
744 SyncPoint::GetInstance()->SetCallBack(
745 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
746 write_error++;
747 if (write_error > 2) {
748 fault_fs->SetFilesystemActive(false,
749 IOStatus::NoSpace("Out of space"));
750 }
751 });
752 SyncPoint::GetInstance()->EnableProcessing();
753 WriteOptions wopts;
754 wopts.sync = true;
755 s = dbfull()->Write(wopts, &batch);
756 ASSERT_EQ(s, s.NoSpace());
757 }
758 SyncPoint::GetInstance()->DisableProcessing();
759 fault_fs->SetFilesystemActive(true);
760 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
761 for (auto i = 0; i < 199; ++i) {
762 if (i < 100) {
763 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
764 } else {
765 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
766 }
767 }
768 Reopen(options);
769 for (auto i = 0; i < 199; ++i) {
770 if (i < 100) {
771 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
772 } else {
773 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
774 }
775 }
776 Close();
777 }
778
TEST_F(DBErrorHandlingFSTest,WALWriteRetryableError)779 TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
780 std::shared_ptr<FaultInjectionTestFS> fault_fs(
781 new FaultInjectionTestFS(FileSystem::Default()));
782 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
783 std::shared_ptr<ErrorHandlerFSListener> listener(
784 new ErrorHandlerFSListener());
785 Options options = GetDefaultOptions();
786 options.env = fault_fs_env.get();
787 options.create_if_missing = true;
788 options.writable_file_max_buffer_size = 32768;
789 options.listeners.emplace_back(listener);
790 options.paranoid_checks = true;
791 Status s;
792 Random rnd(301);
793
794 DestroyAndReopen(options);
795
796 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
797 error_msg.SetRetryable(true);
798
799 // For the first batch, write is successful, require sync
800 {
801 WriteBatch batch;
802
803 for (auto i = 0; i < 100; ++i) {
804 batch.Put(Key(i), RandomString(&rnd, 1024));
805 }
806
807 WriteOptions wopts;
808 wopts.sync = true;
809 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
810 };
811
812 // For the second batch, the first 2 file Append are successful, then the
813 // following Append fails due to file system retryable IOError.
814 {
815 WriteBatch batch;
816 int write_error = 0;
817
818 for (auto i = 100; i < 200; ++i) {
819 batch.Put(Key(i), RandomString(&rnd, 1024));
820 }
821
822 SyncPoint::GetInstance()->SetCallBack(
823 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
824 write_error++;
825 if (write_error > 2) {
826 fault_fs->SetFilesystemActive(false, error_msg);
827 }
828 });
829 SyncPoint::GetInstance()->EnableProcessing();
830 WriteOptions wopts;
831 wopts.sync = true;
832 s = dbfull()->Write(wopts, &batch);
833 ASSERT_EQ(true, s.IsIOError());
834 }
835 fault_fs->SetFilesystemActive(true);
836 SyncPoint::GetInstance()->ClearAllCallBacks();
837 SyncPoint::GetInstance()->DisableProcessing();
838
839 // Data in corrupted WAL are not stored
840 for (auto i = 0; i < 199; ++i) {
841 if (i < 100) {
842 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
843 } else {
844 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
845 }
846 }
847
848 // Resume and write a new batch, should be in the WAL
849 s = dbfull()->Resume();
850 ASSERT_EQ(s, Status::OK());
851 {
852 WriteBatch batch;
853
854 for (auto i = 200; i < 300; ++i) {
855 batch.Put(Key(i), RandomString(&rnd, 1024));
856 }
857
858 WriteOptions wopts;
859 wopts.sync = true;
860 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
861 };
862
863 Reopen(options);
864 for (auto i = 0; i < 300; ++i) {
865 if (i < 100 || i >= 200) {
866 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
867 } else {
868 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
869 }
870 }
871 Close();
872 }
873
TEST_F(DBErrorHandlingFSTest,MultiCFWALWriteError)874 TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
875 std::shared_ptr<FaultInjectionTestFS> fault_fs(
876 new FaultInjectionTestFS(FileSystem::Default()));
877 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
878 std::shared_ptr<ErrorHandlerFSListener> listener(
879 new ErrorHandlerFSListener());
880 Options options = GetDefaultOptions();
881 options.env = fault_fs_env.get();
882 options.create_if_missing = true;
883 options.writable_file_max_buffer_size = 32768;
884 options.listeners.emplace_back(listener);
885 Status s;
886 Random rnd(301);
887
888 listener->EnableAutoRecovery();
889 CreateAndReopenWithCF({"one", "two", "three"}, options);
890
891 {
892 WriteBatch batch;
893
894 for (auto i = 1; i < 4; ++i) {
895 for (auto j = 0; j < 100; ++j) {
896 batch.Put(handles_[i], Key(j), RandomString(&rnd, 1024));
897 }
898 }
899
900 WriteOptions wopts;
901 wopts.sync = true;
902 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
903 };
904
905 {
906 WriteBatch batch;
907 int write_error = 0;
908
909 // Write to one CF
910 for (auto i = 100; i < 199; ++i) {
911 batch.Put(handles_[2], Key(i), RandomString(&rnd, 1024));
912 }
913
914 SyncPoint::GetInstance()->SetCallBack(
915 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
916 write_error++;
917 if (write_error > 2) {
918 fault_fs->SetFilesystemActive(false,
919 IOStatus::NoSpace("Out of space"));
920 }
921 });
922 SyncPoint::GetInstance()->EnableProcessing();
923 WriteOptions wopts;
924 wopts.sync = true;
925 s = dbfull()->Write(wopts, &batch);
926 ASSERT_EQ(s, s.NoSpace());
927 }
928 SyncPoint::GetInstance()->DisableProcessing();
929 fault_fs->SetFilesystemActive(true);
930 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
931
932 for (auto i = 1; i < 4; ++i) {
933 // Every CF should have been flushed
934 ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);
935 }
936
937 for (auto i = 1; i < 4; ++i) {
938 for (auto j = 0; j < 199; ++j) {
939 if (j < 100) {
940 ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
941 } else {
942 ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
943 }
944 }
945 }
946 ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
947 for (auto i = 1; i < 4; ++i) {
948 for (auto j = 0; j < 199; ++j) {
949 if (j < 100) {
950 ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
951 } else {
952 ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
953 }
954 }
955 }
956 Close();
957 }
958
TEST_F(DBErrorHandlingFSTest,MultiDBCompactionError)959 TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
960 FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
961 std::vector<std::unique_ptr<Env>> fault_envs;
962 std::vector<FaultInjectionTestFS*> fault_fs;
963 std::vector<Options> options;
964 std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
965 std::vector<DB*> db;
966 std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
967 int kNumDbInstances = 3;
968 Random rnd(301);
969
970 for (auto i = 0; i < kNumDbInstances; ++i) {
971 listener.emplace_back(new ErrorHandlerFSListener());
972 options.emplace_back(GetDefaultOptions());
973 fault_fs.emplace_back(new FaultInjectionTestFS(FileSystem::Default()));
974 std::shared_ptr<FileSystem> fs(fault_fs.back());
975 fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
976 options[i].env = fault_envs.back().get();
977 options[i].create_if_missing = true;
978 options[i].level0_file_num_compaction_trigger = 2;
979 options[i].writable_file_max_buffer_size = 32768;
980 options[i].listeners.emplace_back(listener[i]);
981 options[i].sst_file_manager = sfm;
982 DB* dbptr;
983 char buf[16];
984
985 listener[i]->EnableAutoRecovery();
986 // Setup for returning error for the 3rd SST, which would be level 1
987 listener[i]->InjectFileCreationError(fault_fs[i], 3,
988 IOStatus::NoSpace("Out of space"));
989 snprintf(buf, sizeof(buf), "_%d", i);
990 DestroyDB(dbname_ + std::string(buf), options[i]);
991 ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
992 Status::OK());
993 db.emplace_back(dbptr);
994 }
995
996 for (auto i = 0; i < kNumDbInstances; ++i) {
997 WriteBatch batch;
998
999 for (auto j = 0; j <= 100; ++j) {
1000 batch.Put(Key(j), RandomString(&rnd, 1024));
1001 }
1002
1003 WriteOptions wopts;
1004 wopts.sync = true;
1005 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1006 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1007 }
1008
1009 def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1010 for (auto i = 0; i < kNumDbInstances; ++i) {
1011 WriteBatch batch;
1012
1013 // Write to one CF
1014 for (auto j = 100; j < 199; ++j) {
1015 batch.Put(Key(j), RandomString(&rnd, 1024));
1016 }
1017
1018 WriteOptions wopts;
1019 wopts.sync = true;
1020 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1021 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1022 }
1023
1024 for (auto i = 0; i < kNumDbInstances; ++i) {
1025 Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1026 ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1027 fault_fs[i]->SetFilesystemActive(true);
1028 }
1029
1030 def_env->SetFilesystemActive(true);
1031 for (auto i = 0; i < kNumDbInstances; ++i) {
1032 std::string prop;
1033 ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1034 ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
1035 Status::OK());
1036 EXPECT_TRUE(db[i]->GetProperty(
1037 "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1038 EXPECT_EQ(atoi(prop.c_str()), 0);
1039 EXPECT_TRUE(db[i]->GetProperty(
1040 "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1041 EXPECT_EQ(atoi(prop.c_str()), 1);
1042 }
1043
1044 for (auto i = 0; i < kNumDbInstances; ++i) {
1045 char buf[16];
1046 snprintf(buf, sizeof(buf), "_%d", i);
1047 delete db[i];
1048 fault_fs[i]->SetFilesystemActive(true);
1049 if (getenv("KEEP_DB")) {
1050 printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1051 } else {
1052 Status s = DestroyDB(dbname_ + std::string(buf), options[i]);
1053 }
1054 }
1055 options.clear();
1056 sfm.reset();
1057 delete def_env;
1058 }
1059
TEST_F(DBErrorHandlingFSTest,MultiDBVariousErrors)1060 TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
1061 FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
1062 std::vector<std::unique_ptr<Env>> fault_envs;
1063 std::vector<FaultInjectionTestFS*> fault_fs;
1064 std::vector<Options> options;
1065 std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
1066 std::vector<DB*> db;
1067 std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
1068 int kNumDbInstances = 3;
1069 Random rnd(301);
1070
1071 for (auto i = 0; i < kNumDbInstances; ++i) {
1072 listener.emplace_back(new ErrorHandlerFSListener());
1073 options.emplace_back(GetDefaultOptions());
1074 fault_fs.emplace_back(new FaultInjectionTestFS(FileSystem::Default()));
1075 std::shared_ptr<FileSystem> fs(fault_fs.back());
1076 fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
1077 options[i].env = fault_envs.back().get();
1078 options[i].create_if_missing = true;
1079 options[i].level0_file_num_compaction_trigger = 2;
1080 options[i].writable_file_max_buffer_size = 32768;
1081 options[i].listeners.emplace_back(listener[i]);
1082 options[i].sst_file_manager = sfm;
1083 DB* dbptr;
1084 char buf[16];
1085
1086 listener[i]->EnableAutoRecovery();
1087 switch (i) {
1088 case 0:
1089 // Setup for returning error for the 3rd SST, which would be level 1
1090 listener[i]->InjectFileCreationError(fault_fs[i], 3,
1091 IOStatus::NoSpace("Out of space"));
1092 break;
1093 case 1:
1094 // Setup for returning error after the 1st SST, which would result
1095 // in a hard error
1096 listener[i]->InjectFileCreationError(fault_fs[i], 2,
1097 IOStatus::NoSpace("Out of space"));
1098 break;
1099 default:
1100 break;
1101 }
1102 snprintf(buf, sizeof(buf), "_%d", i);
1103 DestroyDB(dbname_ + std::string(buf), options[i]);
1104 ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
1105 Status::OK());
1106 db.emplace_back(dbptr);
1107 }
1108
1109 for (auto i = 0; i < kNumDbInstances; ++i) {
1110 WriteBatch batch;
1111
1112 for (auto j = 0; j <= 100; ++j) {
1113 batch.Put(Key(j), RandomString(&rnd, 1024));
1114 }
1115
1116 WriteOptions wopts;
1117 wopts.sync = true;
1118 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1119 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1120 }
1121
1122 def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1123 for (auto i = 0; i < kNumDbInstances; ++i) {
1124 WriteBatch batch;
1125
1126 // Write to one CF
1127 for (auto j = 100; j < 199; ++j) {
1128 batch.Put(Key(j), RandomString(&rnd, 1024));
1129 }
1130
1131 WriteOptions wopts;
1132 wopts.sync = true;
1133 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1134 if (i != 1) {
1135 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1136 } else {
1137 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace());
1138 }
1139 }
1140
1141 for (auto i = 0; i < kNumDbInstances; ++i) {
1142 Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1143 switch (i) {
1144 case 0:
1145 ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1146 break;
1147 case 1:
1148 ASSERT_EQ(s.severity(), Status::Severity::kHardError);
1149 break;
1150 case 2:
1151 ASSERT_EQ(s, Status::OK());
1152 break;
1153 }
1154 fault_fs[i]->SetFilesystemActive(true);
1155 }
1156
1157 def_env->SetFilesystemActive(true);
1158 for (auto i = 0; i < kNumDbInstances; ++i) {
1159 std::string prop;
1160 if (i < 2) {
1161 ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1162 }
1163 if (i == 1) {
1164 ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
1165 Status::OK());
1166 }
1167 EXPECT_TRUE(db[i]->GetProperty(
1168 "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1169 EXPECT_EQ(atoi(prop.c_str()), 0);
1170 EXPECT_TRUE(db[i]->GetProperty(
1171 "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1172 EXPECT_EQ(atoi(prop.c_str()), 1);
1173 }
1174
1175 for (auto i = 0; i < kNumDbInstances; ++i) {
1176 char buf[16];
1177 snprintf(buf, sizeof(buf), "_%d", i);
1178 fault_fs[i]->SetFilesystemActive(true);
1179 delete db[i];
1180 if (getenv("KEEP_DB")) {
1181 printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1182 } else {
1183 DestroyDB(dbname_ + std::string(buf), options[i]);
1184 }
1185 }
1186 options.clear();
1187 delete def_env;
1188 }
1189 } // namespace ROCKSDB_NAMESPACE
1190
main(int argc,char ** argv)1191 int main(int argc, char** argv) {
1192 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1193 ::testing::InitGoogleTest(&argc, argv);
1194 return RUN_ALL_TESTS();
1195 }
1196
1197 #else
1198 #include <stdio.h>
1199
main(int,char **)1200 int main(int /*argc*/, char** /*argv*/) {
1201 fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n");
1202 return 0;
1203 }
1204
1205 #endif // ROCKSDB_LITE
1206