1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #ifndef ROCKSDB_LITE
10 
11 #include "db/db_test_util.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/io_status.h"
14 #include "rocksdb/perf_context.h"
15 #include "rocksdb/sst_file_manager.h"
16 #include "test_util/fault_injection_test_env.h"
17 #include "test_util/fault_injection_test_fs.h"
18 #if !defined(ROCKSDB_LITE)
19 #include "test_util/sync_point.h"
20 #endif
21 
22 namespace ROCKSDB_NAMESPACE {
23 
24 class DBErrorHandlingFSTest : public DBTestBase {
25  public:
DBErrorHandlingFSTest()26   DBErrorHandlingFSTest() : DBTestBase("/db_error_handling_fs_test") {}
27 
GetManifestNameFromLiveFiles()28   std::string GetManifestNameFromLiveFiles() {
29     std::vector<std::string> live_files;
30     uint64_t manifest_size;
31 
32     dbfull()->GetLiveFiles(live_files, &manifest_size, false);
33     for (auto& file : live_files) {
34       uint64_t num = 0;
35       FileType type;
36       if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {
37         return file;
38       }
39     }
40     return "";
41   }
42 };
43 
44 class DBErrorHandlingFS : public FileSystemWrapper {
45  public:
DBErrorHandlingFS()46   DBErrorHandlingFS()
47       : FileSystemWrapper(FileSystem::Default()),
48         trig_no_space(false),
49         trig_io_error(false) {}
50 
SetTrigNoSpace()51   void SetTrigNoSpace() { trig_no_space = true; }
SetTrigIoError()52   void SetTrigIoError() { trig_io_error = true; }
53 
54  private:
55   bool trig_no_space;
56   bool trig_io_error;
57 };
58 
59 class ErrorHandlerFSListener : public EventListener {
60  public:
ErrorHandlerFSListener()61   ErrorHandlerFSListener()
62       : mutex_(),
63         cv_(&mutex_),
64         no_auto_recovery_(false),
65         recovery_complete_(false),
66         file_creation_started_(false),
67         override_bg_error_(false),
68         file_count_(0),
69         fault_fs_(nullptr) {}
70 
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)71   void OnTableFileCreationStarted(
72       const TableFileCreationBriefInfo& /*ti*/) override {
73     InstrumentedMutexLock l(&mutex_);
74     file_creation_started_ = true;
75     if (file_count_ > 0) {
76       if (--file_count_ == 0) {
77         fault_fs_->SetFilesystemActive(false, file_creation_error_);
78         file_creation_error_ = IOStatus::OK();
79       }
80     }
81     cv_.SignalAll();
82   }
83 
OnErrorRecoveryBegin(BackgroundErrorReason,Status,bool * auto_recovery)84   void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
85                             Status /*bg_error*/, bool* auto_recovery) override {
86     if (*auto_recovery && no_auto_recovery_) {
87       *auto_recovery = false;
88     }
89   }
90 
OnErrorRecoveryCompleted(Status)91   void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
92     InstrumentedMutexLock l(&mutex_);
93     recovery_complete_ = true;
94     cv_.SignalAll();
95   }
96 
WaitForRecovery(uint64_t)97   bool WaitForRecovery(uint64_t /*abs_time_us*/) {
98     InstrumentedMutexLock l(&mutex_);
99     while (!recovery_complete_) {
100       cv_.Wait(/*abs_time_us*/);
101     }
102     if (recovery_complete_) {
103       recovery_complete_ = false;
104       return true;
105     }
106     return false;
107   }
108 
WaitForTableFileCreationStarted(uint64_t)109   void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {
110     InstrumentedMutexLock l(&mutex_);
111     while (!file_creation_started_) {
112       cv_.Wait(/*abs_time_us*/);
113     }
114     file_creation_started_ = false;
115   }
116 
OnBackgroundError(BackgroundErrorReason,Status * bg_error)117   void OnBackgroundError(BackgroundErrorReason /*reason*/,
118                          Status* bg_error) override {
119     if (override_bg_error_) {
120       *bg_error = bg_error_;
121       override_bg_error_ = false;
122     }
123   }
124 
EnableAutoRecovery(bool enable=true)125   void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
126 
OverrideBGError(Status bg_err)127   void OverrideBGError(Status bg_err) {
128     bg_error_ = bg_err;
129     override_bg_error_ = true;
130   }
131 
InjectFileCreationError(FaultInjectionTestFS * fs,int file_count,IOStatus io_s)132   void InjectFileCreationError(FaultInjectionTestFS* fs, int file_count,
133                                IOStatus io_s) {
134     fault_fs_ = fs;
135     file_count_ = file_count;
136     file_creation_error_ = io_s;
137   }
138 
139  private:
140   InstrumentedMutex mutex_;
141   InstrumentedCondVar cv_;
142   bool no_auto_recovery_;
143   bool recovery_complete_;
144   bool file_creation_started_;
145   bool override_bg_error_;
146   int file_count_;
147   IOStatus file_creation_error_;
148   Status bg_error_;
149   FaultInjectionTestFS* fault_fs_;
150 };
151 
TEST_F(DBErrorHandlingFSTest,FLushWriteError)152 TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
153   std::shared_ptr<FaultInjectionTestFS> fault_fs(
154       new FaultInjectionTestFS(FileSystem::Default()));
155   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
156   std::shared_ptr<ErrorHandlerFSListener> listener(
157       new ErrorHandlerFSListener());
158   Options options = GetDefaultOptions();
159   options.env = fault_fs_env.get();
160   options.create_if_missing = true;
161   options.listeners.emplace_back(listener);
162   Status s;
163 
164   listener->EnableAutoRecovery(false);
165   DestroyAndReopen(options);
166 
167   Put(Key(0), "val");
168   SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
169     fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
170   });
171   SyncPoint::GetInstance()->EnableProcessing();
172   s = Flush();
173   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
174   SyncPoint::GetInstance()->DisableProcessing();
175   fault_fs->SetFilesystemActive(true);
176   s = dbfull()->Resume();
177   ASSERT_EQ(s, Status::OK());
178 
179   Reopen(options);
180   ASSERT_EQ("val", Get(Key(0)));
181   Destroy(options);
182 }
183 
TEST_F(DBErrorHandlingFSTest,FLushWritRetryableeError)184 TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) {
185   std::shared_ptr<FaultInjectionTestFS> fault_fs(
186       new FaultInjectionTestFS(FileSystem::Default()));
187   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
188   std::shared_ptr<ErrorHandlerFSListener> listener(
189       new ErrorHandlerFSListener());
190   Options options = GetDefaultOptions();
191   options.env = fault_fs_env.get();
192   options.create_if_missing = true;
193   options.listeners.emplace_back(listener);
194   Status s;
195 
196   listener->EnableAutoRecovery(false);
197   DestroyAndReopen(options);
198 
199   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
200   error_msg.SetRetryable(true);
201 
202   Put(Key(1), "val1");
203   SyncPoint::GetInstance()->SetCallBack(
204       "BuildTable:BeforeFinishBuildTable",
205       [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
206   SyncPoint::GetInstance()->EnableProcessing();
207   s = Flush();
208   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
209   SyncPoint::GetInstance()->DisableProcessing();
210   fault_fs->SetFilesystemActive(true);
211   s = dbfull()->Resume();
212   ASSERT_EQ(s, Status::OK());
213   Reopen(options);
214   ASSERT_EQ("val1", Get(Key(1)));
215 
216   Put(Key(2), "val2");
217   SyncPoint::GetInstance()->SetCallBack(
218       "BuildTable:BeforeSyncTable",
219       [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
220   SyncPoint::GetInstance()->EnableProcessing();
221   s = Flush();
222   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
223   SyncPoint::GetInstance()->DisableProcessing();
224   fault_fs->SetFilesystemActive(true);
225   s = dbfull()->Resume();
226   ASSERT_EQ(s, Status::OK());
227   Reopen(options);
228   ASSERT_EQ("val2", Get(Key(2)));
229 
230   Put(Key(3), "val3");
231   SyncPoint::GetInstance()->SetCallBack(
232       "BuildTable:BeforeCloseTableFile",
233       [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
234   SyncPoint::GetInstance()->EnableProcessing();
235   s = Flush();
236   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
237   SyncPoint::GetInstance()->DisableProcessing();
238   fault_fs->SetFilesystemActive(true);
239   s = dbfull()->Resume();
240   ASSERT_EQ(s, Status::OK());
241   Reopen(options);
242   ASSERT_EQ("val3", Get(Key(3)));
243 
244   Destroy(options);
245 }
246 
TEST_F(DBErrorHandlingFSTest,ManifestWriteError)247 TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
248   std::shared_ptr<FaultInjectionTestFS> fault_fs(
249       new FaultInjectionTestFS(FileSystem::Default()));
250   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
251   std::shared_ptr<ErrorHandlerFSListener> listener(
252       new ErrorHandlerFSListener());
253   Options options = GetDefaultOptions();
254   options.env = fault_fs_env.get();
255   options.create_if_missing = true;
256   options.listeners.emplace_back(listener);
257   Status s;
258   std::string old_manifest;
259   std::string new_manifest;
260 
261   listener->EnableAutoRecovery(false);
262   DestroyAndReopen(options);
263   old_manifest = GetManifestNameFromLiveFiles();
264 
265   Put(Key(0), "val");
266   Flush();
267   Put(Key(1), "val");
268   SyncPoint::GetInstance()->SetCallBack(
269       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
270         fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
271       });
272   SyncPoint::GetInstance()->EnableProcessing();
273   s = Flush();
274   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
275   SyncPoint::GetInstance()->ClearAllCallBacks();
276   SyncPoint::GetInstance()->DisableProcessing();
277   fault_fs->SetFilesystemActive(true);
278   s = dbfull()->Resume();
279   ASSERT_EQ(s, Status::OK());
280 
281   new_manifest = GetManifestNameFromLiveFiles();
282   ASSERT_NE(new_manifest, old_manifest);
283 
284   Reopen(options);
285   ASSERT_EQ("val", Get(Key(0)));
286   ASSERT_EQ("val", Get(Key(1)));
287   Close();
288 }
289 
TEST_F(DBErrorHandlingFSTest,ManifestWriteRetryableError)290 TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
291   std::shared_ptr<FaultInjectionTestFS> fault_fs(
292       new FaultInjectionTestFS(FileSystem::Default()));
293   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
294   std::shared_ptr<ErrorHandlerFSListener> listener(
295       new ErrorHandlerFSListener());
296   Options options = GetDefaultOptions();
297   options.env = fault_fs_env.get();
298   options.create_if_missing = true;
299   options.listeners.emplace_back(listener);
300   Status s;
301   std::string old_manifest;
302   std::string new_manifest;
303 
304   listener->EnableAutoRecovery(false);
305   DestroyAndReopen(options);
306   old_manifest = GetManifestNameFromLiveFiles();
307 
308   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
309   error_msg.SetRetryable(true);
310 
311   Put(Key(0), "val");
312   Flush();
313   Put(Key(1), "val");
314   SyncPoint::GetInstance()->SetCallBack(
315       "VersionSet::LogAndApply:WriteManifest",
316       [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
317   SyncPoint::GetInstance()->EnableProcessing();
318   s = Flush();
319   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
320   SyncPoint::GetInstance()->ClearAllCallBacks();
321   SyncPoint::GetInstance()->DisableProcessing();
322   fault_fs->SetFilesystemActive(true);
323   s = dbfull()->Resume();
324   ASSERT_EQ(s, Status::OK());
325 
326   new_manifest = GetManifestNameFromLiveFiles();
327   ASSERT_NE(new_manifest, old_manifest);
328 
329   Reopen(options);
330   ASSERT_EQ("val", Get(Key(0)));
331   ASSERT_EQ("val", Get(Key(1)));
332   Close();
333 }
334 
TEST_F(DBErrorHandlingFSTest,DoubleManifestWriteError)335 TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
336   std::shared_ptr<FaultInjectionTestFS> fault_fs(
337       new FaultInjectionTestFS(FileSystem::Default()));
338   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
339   std::shared_ptr<ErrorHandlerFSListener> listener(
340       new ErrorHandlerFSListener());
341   Options options = GetDefaultOptions();
342   options.env = fault_fs_env.get();
343   options.create_if_missing = true;
344   options.listeners.emplace_back(listener);
345   Status s;
346   std::string old_manifest;
347   std::string new_manifest;
348 
349   listener->EnableAutoRecovery(false);
350   DestroyAndReopen(options);
351   old_manifest = GetManifestNameFromLiveFiles();
352 
353   Put(Key(0), "val");
354   Flush();
355   Put(Key(1), "val");
356   SyncPoint::GetInstance()->SetCallBack(
357       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
358         fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
359       });
360   SyncPoint::GetInstance()->EnableProcessing();
361   s = Flush();
362   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
363   fault_fs->SetFilesystemActive(true);
364 
365   // This Resume() will attempt to create a new manifest file and fail again
366   s = dbfull()->Resume();
367   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
368   fault_fs->SetFilesystemActive(true);
369   SyncPoint::GetInstance()->ClearAllCallBacks();
370   SyncPoint::GetInstance()->DisableProcessing();
371 
372   // A successful Resume() will create a new manifest file
373   s = dbfull()->Resume();
374   ASSERT_EQ(s, Status::OK());
375 
376   new_manifest = GetManifestNameFromLiveFiles();
377   ASSERT_NE(new_manifest, old_manifest);
378 
379   Reopen(options);
380   ASSERT_EQ("val", Get(Key(0)));
381   ASSERT_EQ("val", Get(Key(1)));
382   Close();
383 }
384 
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteError)385 TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
386   std::shared_ptr<FaultInjectionTestFS> fault_fs(
387       new FaultInjectionTestFS(FileSystem::Default()));
388   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
389   std::shared_ptr<ErrorHandlerFSListener> listener(
390       new ErrorHandlerFSListener());
391   Options options = GetDefaultOptions();
392   options.env = fault_fs_env.get();
393   options.create_if_missing = true;
394   options.level0_file_num_compaction_trigger = 2;
395   options.listeners.emplace_back(listener);
396   Status s;
397   std::string old_manifest;
398   std::string new_manifest;
399   std::atomic<bool> fail_manifest(false);
400   DestroyAndReopen(options);
401   old_manifest = GetManifestNameFromLiveFiles();
402 
403   Put(Key(0), "val");
404   Put(Key(2), "val");
405   s = Flush();
406   ASSERT_EQ(s, Status::OK());
407 
408   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
409       // Wait for flush of 2nd L0 file before starting compaction
410       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
411         "BackgroundCallCompaction:0"},
412        // Wait for compaction to detect manifest write error
413        {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
414        // Make compaction thread wait for error to be cleared
415        {"CompactionManifestWriteError:1",
416         "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
417        // Wait for DB instance to clear bg_error before calling
418        // TEST_WaitForCompact
419        {"SstFileManagerImpl::ErrorCleared", "CompactionManifestWriteError:2"}});
420   // trigger manifest write failure in compaction thread
421   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
422       "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
423   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
424       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
425         if (fail_manifest.load()) {
426           fault_fs->SetFilesystemActive(false,
427                                         IOStatus::NoSpace("Out of space"));
428         }
429       });
430   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
431 
432   Put(Key(1), "val");
433   // This Flush will trigger a compaction, which will fail when appending to
434   // the manifest
435   s = Flush();
436   ASSERT_EQ(s, Status::OK());
437 
438   TEST_SYNC_POINT("CompactionManifestWriteError:0");
439   // Clear all errors so when the compaction is retried, it will succeed
440   fault_fs->SetFilesystemActive(true);
441   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
442   TEST_SYNC_POINT("CompactionManifestWriteError:1");
443   TEST_SYNC_POINT("CompactionManifestWriteError:2");
444 
445   s = dbfull()->TEST_WaitForCompact();
446   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
447   ASSERT_EQ(s, Status::OK());
448 
449   new_manifest = GetManifestNameFromLiveFiles();
450   ASSERT_NE(new_manifest, old_manifest);
451   Reopen(options);
452   ASSERT_EQ("val", Get(Key(0)));
453   ASSERT_EQ("val", Get(Key(1)));
454   ASSERT_EQ("val", Get(Key(2)));
455   Close();
456 }
457 
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteRetryableError)458 TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
459   std::shared_ptr<FaultInjectionTestFS> fault_fs(
460       new FaultInjectionTestFS(FileSystem::Default()));
461   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
462   std::shared_ptr<ErrorHandlerFSListener> listener(
463       new ErrorHandlerFSListener());
464   Options options = GetDefaultOptions();
465   options.env = fault_fs_env.get();
466   options.create_if_missing = true;
467   options.level0_file_num_compaction_trigger = 2;
468   options.listeners.emplace_back(listener);
469   Status s;
470   std::string old_manifest;
471   std::string new_manifest;
472   DestroyAndReopen(options);
473   old_manifest = GetManifestNameFromLiveFiles();
474 
475   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
476   error_msg.SetRetryable(true);
477 
478   Put(Key(0), "val");
479   Put(Key(2), "val");
480   s = Flush();
481   ASSERT_EQ(s, Status::OK());
482 
483   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
484   listener->EnableAutoRecovery(false);
485   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
486       "DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
487       [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
488   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
489 
490   Put(Key(1), "val");
491   s = Flush();
492   ASSERT_EQ(s, Status::OK());
493 
494   s = dbfull()->TEST_WaitForCompact();
495   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
496 
497   fault_fs->SetFilesystemActive(true);
498   SyncPoint::GetInstance()->ClearAllCallBacks();
499   SyncPoint::GetInstance()->DisableProcessing();
500   s = dbfull()->Resume();
501   ASSERT_EQ(s, Status::OK());
502 
503   new_manifest = GetManifestNameFromLiveFiles();
504   ASSERT_NE(new_manifest, old_manifest);
505 
506   Reopen(options);
507   ASSERT_EQ("val", Get(Key(0)));
508   ASSERT_EQ("val", Get(Key(1)));
509   ASSERT_EQ("val", Get(Key(2)));
510   Close();
511 }
512 
TEST_F(DBErrorHandlingFSTest,CompactionWriteError)513 TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
514   std::shared_ptr<FaultInjectionTestFS> fault_fs(
515       new FaultInjectionTestFS(FileSystem::Default()));
516   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
517   std::shared_ptr<ErrorHandlerFSListener> listener(
518       new ErrorHandlerFSListener());
519   Options options = GetDefaultOptions();
520   options.env = fault_fs_env.get();
521   options.create_if_missing = true;
522   options.level0_file_num_compaction_trigger = 2;
523   options.listeners.emplace_back(listener);
524   Status s;
525   DestroyAndReopen(options);
526 
527   Put(Key(0), "va;");
528   Put(Key(2), "va;");
529   s = Flush();
530   ASSERT_EQ(s, Status::OK());
531 
532   listener->OverrideBGError(
533       Status(Status::NoSpace(), Status::Severity::kHardError));
534   listener->EnableAutoRecovery(false);
535   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
536       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
537         "BackgroundCallCompaction:0"}});
538   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
539       "BackgroundCallCompaction:0", [&](void*) {
540         fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
541       });
542   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
543 
544   Put(Key(1), "val");
545   s = Flush();
546   ASSERT_EQ(s, Status::OK());
547 
548   s = dbfull()->TEST_WaitForCompact();
549   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
550 
551   fault_fs->SetFilesystemActive(true);
552   s = dbfull()->Resume();
553   ASSERT_EQ(s, Status::OK());
554   Destroy(options);
555 }
556 
TEST_F(DBErrorHandlingFSTest,CompactionWriteRetryableError)557 TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
558   std::shared_ptr<FaultInjectionTestFS> fault_fs(
559       new FaultInjectionTestFS(FileSystem::Default()));
560   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
561   std::shared_ptr<ErrorHandlerFSListener> listener(
562       new ErrorHandlerFSListener());
563   Options options = GetDefaultOptions();
564   options.env = fault_fs_env.get();
565   options.create_if_missing = true;
566   options.level0_file_num_compaction_trigger = 2;
567   options.listeners.emplace_back(listener);
568   Status s;
569   DestroyAndReopen(options);
570 
571   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
572   error_msg.SetRetryable(true);
573 
574   Put(Key(0), "va;");
575   Put(Key(2), "va;");
576   s = Flush();
577   ASSERT_EQ(s, Status::OK());
578 
579   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
580   listener->EnableAutoRecovery(false);
581   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
582       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
583         "BackgroundCallCompaction:0"}});
584   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
585       "BackgroundCallCompaction:0",
586       [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
587   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
588 
589   Put(Key(1), "val");
590   s = Flush();
591   ASSERT_EQ(s, Status::OK());
592 
593   s = dbfull()->TEST_WaitForCompact();
594   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
595 
596   fault_fs->SetFilesystemActive(true);
597   SyncPoint::GetInstance()->ClearAllCallBacks();
598   SyncPoint::GetInstance()->DisableProcessing();
599   s = dbfull()->Resume();
600   ASSERT_EQ(s, Status::OK());
601   Destroy(options);
602 }
603 
TEST_F(DBErrorHandlingFSTest,CorruptionError)604 TEST_F(DBErrorHandlingFSTest, CorruptionError) {
605   std::shared_ptr<FaultInjectionTestFS> fault_fs(
606       new FaultInjectionTestFS(FileSystem::Default()));
607   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
608   Options options = GetDefaultOptions();
609   options.env = fault_fs_env.get();
610   options.create_if_missing = true;
611   options.level0_file_num_compaction_trigger = 2;
612   Status s;
613   DestroyAndReopen(options);
614 
615   Put(Key(0), "va;");
616   Put(Key(2), "va;");
617   s = Flush();
618   ASSERT_EQ(s, Status::OK());
619 
620   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
621       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
622         "BackgroundCallCompaction:0"}});
623   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
624       "BackgroundCallCompaction:0", [&](void*) {
625         fault_fs->SetFilesystemActive(false,
626                                       IOStatus::Corruption("Corruption"));
627       });
628   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
629 
630   Put(Key(1), "val");
631   s = Flush();
632   ASSERT_EQ(s, Status::OK());
633 
634   s = dbfull()->TEST_WaitForCompact();
635   ASSERT_EQ(s.severity(),
636             ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
637 
638   fault_fs->SetFilesystemActive(true);
639   s = dbfull()->Resume();
640   ASSERT_NE(s, Status::OK());
641   Destroy(options);
642 }
643 
TEST_F(DBErrorHandlingFSTest,AutoRecoverFlushError)644 TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
645   std::shared_ptr<FaultInjectionTestFS> fault_fs(
646       new FaultInjectionTestFS(FileSystem::Default()));
647   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
648   std::shared_ptr<ErrorHandlerFSListener> listener(
649       new ErrorHandlerFSListener());
650   Options options = GetDefaultOptions();
651   options.env = fault_fs_env.get();
652   options.create_if_missing = true;
653   options.listeners.emplace_back(listener);
654   Status s;
655 
656   listener->EnableAutoRecovery();
657   DestroyAndReopen(options);
658 
659   Put(Key(0), "val");
660   SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
661     fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
662   });
663   SyncPoint::GetInstance()->EnableProcessing();
664   s = Flush();
665   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
666   SyncPoint::GetInstance()->DisableProcessing();
667   fault_fs->SetFilesystemActive(true);
668   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
669 
670   s = Put(Key(1), "val");
671   ASSERT_EQ(s, Status::OK());
672 
673   Reopen(options);
674   ASSERT_EQ("val", Get(Key(0)));
675   ASSERT_EQ("val", Get(Key(1)));
676   Destroy(options);
677 }
678 
TEST_F(DBErrorHandlingFSTest,FailRecoverFlushError)679 TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) {
680   std::shared_ptr<FaultInjectionTestFS> fault_fs(
681       new FaultInjectionTestFS(FileSystem::Default()));
682   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
683   std::shared_ptr<ErrorHandlerFSListener> listener(
684       new ErrorHandlerFSListener());
685   Options options = GetDefaultOptions();
686   options.env = fault_fs_env.get();
687   options.create_if_missing = true;
688   options.listeners.emplace_back(listener);
689   Status s;
690 
691   listener->EnableAutoRecovery();
692   DestroyAndReopen(options);
693 
694   Put(Key(0), "val");
695   SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
696     fault_fs->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
697   });
698   SyncPoint::GetInstance()->EnableProcessing();
699   s = Flush();
700   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
701   // We should be able to shutdown the database while auto recovery is going
702   // on in the background
703   Close();
704   DestroyDB(dbname_, options);
705 }
706 
TEST_F(DBErrorHandlingFSTest,WALWriteError)707 TEST_F(DBErrorHandlingFSTest, WALWriteError) {
708   std::shared_ptr<FaultInjectionTestFS> fault_fs(
709       new FaultInjectionTestFS(FileSystem::Default()));
710   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
711   std::shared_ptr<ErrorHandlerFSListener> listener(
712       new ErrorHandlerFSListener());
713   Options options = GetDefaultOptions();
714   options.env = fault_fs_env.get();
715   options.create_if_missing = true;
716   options.writable_file_max_buffer_size = 32768;
717   options.listeners.emplace_back(listener);
718   Status s;
719   Random rnd(301);
720 
721   listener->EnableAutoRecovery();
722   DestroyAndReopen(options);
723 
724   {
725     WriteBatch batch;
726 
727     for (auto i = 0; i < 100; ++i) {
728       batch.Put(Key(i), RandomString(&rnd, 1024));
729     }
730 
731     WriteOptions wopts;
732     wopts.sync = true;
733     ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
734   };
735 
736   {
737     WriteBatch batch;
738     int write_error = 0;
739 
740     for (auto i = 100; i < 199; ++i) {
741       batch.Put(Key(i), RandomString(&rnd, 1024));
742     }
743 
744     SyncPoint::GetInstance()->SetCallBack(
745         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
746           write_error++;
747           if (write_error > 2) {
748             fault_fs->SetFilesystemActive(false,
749                                           IOStatus::NoSpace("Out of space"));
750           }
751         });
752     SyncPoint::GetInstance()->EnableProcessing();
753     WriteOptions wopts;
754     wopts.sync = true;
755     s = dbfull()->Write(wopts, &batch);
756     ASSERT_EQ(s, s.NoSpace());
757   }
758   SyncPoint::GetInstance()->DisableProcessing();
759   fault_fs->SetFilesystemActive(true);
760   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
761   for (auto i = 0; i < 199; ++i) {
762     if (i < 100) {
763       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
764     } else {
765       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
766     }
767   }
768   Reopen(options);
769   for (auto i = 0; i < 199; ++i) {
770     if (i < 100) {
771       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
772     } else {
773       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
774     }
775   }
776   Close();
777 }
778 
TEST_F(DBErrorHandlingFSTest,WALWriteRetryableError)779 TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
780   std::shared_ptr<FaultInjectionTestFS> fault_fs(
781       new FaultInjectionTestFS(FileSystem::Default()));
782   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
783   std::shared_ptr<ErrorHandlerFSListener> listener(
784       new ErrorHandlerFSListener());
785   Options options = GetDefaultOptions();
786   options.env = fault_fs_env.get();
787   options.create_if_missing = true;
788   options.writable_file_max_buffer_size = 32768;
789   options.listeners.emplace_back(listener);
790   options.paranoid_checks = true;
791   Status s;
792   Random rnd(301);
793 
794   DestroyAndReopen(options);
795 
796   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
797   error_msg.SetRetryable(true);
798 
799   // For the first batch, write is successful, require sync
800   {
801     WriteBatch batch;
802 
803     for (auto i = 0; i < 100; ++i) {
804       batch.Put(Key(i), RandomString(&rnd, 1024));
805     }
806 
807     WriteOptions wopts;
808     wopts.sync = true;
809     ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
810   };
811 
812   // For the second batch, the first 2 file Append are successful, then the
813   // following Append fails due to file system retryable IOError.
814   {
815     WriteBatch batch;
816     int write_error = 0;
817 
818     for (auto i = 100; i < 200; ++i) {
819       batch.Put(Key(i), RandomString(&rnd, 1024));
820     }
821 
822     SyncPoint::GetInstance()->SetCallBack(
823         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
824           write_error++;
825           if (write_error > 2) {
826             fault_fs->SetFilesystemActive(false, error_msg);
827           }
828         });
829     SyncPoint::GetInstance()->EnableProcessing();
830     WriteOptions wopts;
831     wopts.sync = true;
832     s = dbfull()->Write(wopts, &batch);
833     ASSERT_EQ(true, s.IsIOError());
834   }
835   fault_fs->SetFilesystemActive(true);
836   SyncPoint::GetInstance()->ClearAllCallBacks();
837   SyncPoint::GetInstance()->DisableProcessing();
838 
839   // Data in corrupted WAL are not stored
840   for (auto i = 0; i < 199; ++i) {
841     if (i < 100) {
842       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
843     } else {
844       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
845     }
846   }
847 
848   // Resume and write a new batch, should be in the WAL
849   s = dbfull()->Resume();
850   ASSERT_EQ(s, Status::OK());
851   {
852     WriteBatch batch;
853 
854     for (auto i = 200; i < 300; ++i) {
855       batch.Put(Key(i), RandomString(&rnd, 1024));
856     }
857 
858     WriteOptions wopts;
859     wopts.sync = true;
860     ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
861   };
862 
863   Reopen(options);
864   for (auto i = 0; i < 300; ++i) {
865     if (i < 100 || i >= 200) {
866       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
867     } else {
868       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
869     }
870   }
871   Close();
872 }
873 
TEST_F(DBErrorHandlingFSTest,MultiCFWALWriteError)874 TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
875   std::shared_ptr<FaultInjectionTestFS> fault_fs(
876       new FaultInjectionTestFS(FileSystem::Default()));
877   std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
878   std::shared_ptr<ErrorHandlerFSListener> listener(
879       new ErrorHandlerFSListener());
880   Options options = GetDefaultOptions();
881   options.env = fault_fs_env.get();
882   options.create_if_missing = true;
883   options.writable_file_max_buffer_size = 32768;
884   options.listeners.emplace_back(listener);
885   Status s;
886   Random rnd(301);
887 
888   listener->EnableAutoRecovery();
889   CreateAndReopenWithCF({"one", "two", "three"}, options);
890 
891   {
892     WriteBatch batch;
893 
894     for (auto i = 1; i < 4; ++i) {
895       for (auto j = 0; j < 100; ++j) {
896         batch.Put(handles_[i], Key(j), RandomString(&rnd, 1024));
897       }
898     }
899 
900     WriteOptions wopts;
901     wopts.sync = true;
902     ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
903   };
904 
905   {
906     WriteBatch batch;
907     int write_error = 0;
908 
909     // Write to one CF
910     for (auto i = 100; i < 199; ++i) {
911       batch.Put(handles_[2], Key(i), RandomString(&rnd, 1024));
912     }
913 
914     SyncPoint::GetInstance()->SetCallBack(
915         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
916           write_error++;
917           if (write_error > 2) {
918             fault_fs->SetFilesystemActive(false,
919                                           IOStatus::NoSpace("Out of space"));
920           }
921         });
922     SyncPoint::GetInstance()->EnableProcessing();
923     WriteOptions wopts;
924     wopts.sync = true;
925     s = dbfull()->Write(wopts, &batch);
926     ASSERT_EQ(s, s.NoSpace());
927   }
928   SyncPoint::GetInstance()->DisableProcessing();
929   fault_fs->SetFilesystemActive(true);
930   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
931 
932   for (auto i = 1; i < 4; ++i) {
933     // Every CF should have been flushed
934     ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);
935   }
936 
937   for (auto i = 1; i < 4; ++i) {
938     for (auto j = 0; j < 199; ++j) {
939       if (j < 100) {
940         ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
941       } else {
942         ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
943       }
944     }
945   }
946   ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
947   for (auto i = 1; i < 4; ++i) {
948     for (auto j = 0; j < 199; ++j) {
949       if (j < 100) {
950         ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
951       } else {
952         ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
953       }
954     }
955   }
956   Close();
957 }
958 
TEST_F(DBErrorHandlingFSTest,MultiDBCompactionError)959 TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
960   FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
961   std::vector<std::unique_ptr<Env>> fault_envs;
962   std::vector<FaultInjectionTestFS*> fault_fs;
963   std::vector<Options> options;
964   std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
965   std::vector<DB*> db;
966   std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
967   int kNumDbInstances = 3;
968   Random rnd(301);
969 
970   for (auto i = 0; i < kNumDbInstances; ++i) {
971     listener.emplace_back(new ErrorHandlerFSListener());
972     options.emplace_back(GetDefaultOptions());
973     fault_fs.emplace_back(new FaultInjectionTestFS(FileSystem::Default()));
974     std::shared_ptr<FileSystem> fs(fault_fs.back());
975     fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
976     options[i].env = fault_envs.back().get();
977     options[i].create_if_missing = true;
978     options[i].level0_file_num_compaction_trigger = 2;
979     options[i].writable_file_max_buffer_size = 32768;
980     options[i].listeners.emplace_back(listener[i]);
981     options[i].sst_file_manager = sfm;
982     DB* dbptr;
983     char buf[16];
984 
985     listener[i]->EnableAutoRecovery();
986     // Setup for returning error for the 3rd SST, which would be level 1
987     listener[i]->InjectFileCreationError(fault_fs[i], 3,
988                                          IOStatus::NoSpace("Out of space"));
989     snprintf(buf, sizeof(buf), "_%d", i);
990     DestroyDB(dbname_ + std::string(buf), options[i]);
991     ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
992               Status::OK());
993     db.emplace_back(dbptr);
994   }
995 
996   for (auto i = 0; i < kNumDbInstances; ++i) {
997     WriteBatch batch;
998 
999     for (auto j = 0; j <= 100; ++j) {
1000       batch.Put(Key(j), RandomString(&rnd, 1024));
1001     }
1002 
1003     WriteOptions wopts;
1004     wopts.sync = true;
1005     ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1006     ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1007   }
1008 
1009   def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1010   for (auto i = 0; i < kNumDbInstances; ++i) {
1011     WriteBatch batch;
1012 
1013     // Write to one CF
1014     for (auto j = 100; j < 199; ++j) {
1015       batch.Put(Key(j), RandomString(&rnd, 1024));
1016     }
1017 
1018     WriteOptions wopts;
1019     wopts.sync = true;
1020     ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1021     ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1022   }
1023 
1024   for (auto i = 0; i < kNumDbInstances; ++i) {
1025     Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1026     ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1027     fault_fs[i]->SetFilesystemActive(true);
1028   }
1029 
1030   def_env->SetFilesystemActive(true);
1031   for (auto i = 0; i < kNumDbInstances; ++i) {
1032     std::string prop;
1033     ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1034     ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
1035               Status::OK());
1036     EXPECT_TRUE(db[i]->GetProperty(
1037         "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1038     EXPECT_EQ(atoi(prop.c_str()), 0);
1039     EXPECT_TRUE(db[i]->GetProperty(
1040         "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1041     EXPECT_EQ(atoi(prop.c_str()), 1);
1042   }
1043 
1044   for (auto i = 0; i < kNumDbInstances; ++i) {
1045     char buf[16];
1046     snprintf(buf, sizeof(buf), "_%d", i);
1047     delete db[i];
1048     fault_fs[i]->SetFilesystemActive(true);
1049     if (getenv("KEEP_DB")) {
1050       printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1051     } else {
1052       Status s = DestroyDB(dbname_ + std::string(buf), options[i]);
1053     }
1054   }
1055   options.clear();
1056   sfm.reset();
1057   delete def_env;
1058 }
1059 
TEST_F(DBErrorHandlingFSTest,MultiDBVariousErrors)1060 TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
1061   FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
1062   std::vector<std::unique_ptr<Env>> fault_envs;
1063   std::vector<FaultInjectionTestFS*> fault_fs;
1064   std::vector<Options> options;
1065   std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
1066   std::vector<DB*> db;
1067   std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
1068   int kNumDbInstances = 3;
1069   Random rnd(301);
1070 
1071   for (auto i = 0; i < kNumDbInstances; ++i) {
1072     listener.emplace_back(new ErrorHandlerFSListener());
1073     options.emplace_back(GetDefaultOptions());
1074     fault_fs.emplace_back(new FaultInjectionTestFS(FileSystem::Default()));
1075     std::shared_ptr<FileSystem> fs(fault_fs.back());
1076     fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
1077     options[i].env = fault_envs.back().get();
1078     options[i].create_if_missing = true;
1079     options[i].level0_file_num_compaction_trigger = 2;
1080     options[i].writable_file_max_buffer_size = 32768;
1081     options[i].listeners.emplace_back(listener[i]);
1082     options[i].sst_file_manager = sfm;
1083     DB* dbptr;
1084     char buf[16];
1085 
1086     listener[i]->EnableAutoRecovery();
1087     switch (i) {
1088       case 0:
1089         // Setup for returning error for the 3rd SST, which would be level 1
1090         listener[i]->InjectFileCreationError(fault_fs[i], 3,
1091                                              IOStatus::NoSpace("Out of space"));
1092         break;
1093       case 1:
1094         // Setup for returning error after the 1st SST, which would result
1095         // in a hard error
1096         listener[i]->InjectFileCreationError(fault_fs[i], 2,
1097                                              IOStatus::NoSpace("Out of space"));
1098         break;
1099       default:
1100         break;
1101     }
1102     snprintf(buf, sizeof(buf), "_%d", i);
1103     DestroyDB(dbname_ + std::string(buf), options[i]);
1104     ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
1105               Status::OK());
1106     db.emplace_back(dbptr);
1107   }
1108 
1109   for (auto i = 0; i < kNumDbInstances; ++i) {
1110     WriteBatch batch;
1111 
1112     for (auto j = 0; j <= 100; ++j) {
1113       batch.Put(Key(j), RandomString(&rnd, 1024));
1114     }
1115 
1116     WriteOptions wopts;
1117     wopts.sync = true;
1118     ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1119     ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1120   }
1121 
1122   def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1123   for (auto i = 0; i < kNumDbInstances; ++i) {
1124     WriteBatch batch;
1125 
1126     // Write to one CF
1127     for (auto j = 100; j < 199; ++j) {
1128       batch.Put(Key(j), RandomString(&rnd, 1024));
1129     }
1130 
1131     WriteOptions wopts;
1132     wopts.sync = true;
1133     ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1134     if (i != 1) {
1135       ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1136     } else {
1137       ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace());
1138     }
1139   }
1140 
1141   for (auto i = 0; i < kNumDbInstances; ++i) {
1142     Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1143     switch (i) {
1144       case 0:
1145         ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1146         break;
1147       case 1:
1148         ASSERT_EQ(s.severity(), Status::Severity::kHardError);
1149         break;
1150       case 2:
1151         ASSERT_EQ(s, Status::OK());
1152         break;
1153     }
1154     fault_fs[i]->SetFilesystemActive(true);
1155   }
1156 
1157   def_env->SetFilesystemActive(true);
1158   for (auto i = 0; i < kNumDbInstances; ++i) {
1159     std::string prop;
1160     if (i < 2) {
1161       ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1162     }
1163     if (i == 1) {
1164       ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
1165                 Status::OK());
1166     }
1167     EXPECT_TRUE(db[i]->GetProperty(
1168         "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1169     EXPECT_EQ(atoi(prop.c_str()), 0);
1170     EXPECT_TRUE(db[i]->GetProperty(
1171         "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1172     EXPECT_EQ(atoi(prop.c_str()), 1);
1173   }
1174 
1175   for (auto i = 0; i < kNumDbInstances; ++i) {
1176     char buf[16];
1177     snprintf(buf, sizeof(buf), "_%d", i);
1178     fault_fs[i]->SetFilesystemActive(true);
1179     delete db[i];
1180     if (getenv("KEEP_DB")) {
1181       printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1182     } else {
1183       DestroyDB(dbname_ + std::string(buf), options[i]);
1184     }
1185   }
1186   options.clear();
1187   delete def_env;
1188 }
1189 }  // namespace ROCKSDB_NAMESPACE
1190 
main(int argc,char ** argv)1191 int main(int argc, char** argv) {
1192   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1193   ::testing::InitGoogleTest(&argc, argv);
1194   return RUN_ALL_TESTS();
1195 }
1196 
1197 #else
1198 #include <stdio.h>
1199 
main(int,char **)1200 int main(int /*argc*/, char** /*argv*/) {
1201   fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n");
1202   return 0;
1203 }
1204 
1205 #endif  // ROCKSDB_LITE
1206