1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_test_util.h"
11 #include "env/composite_env_wrapper.h"
12 #include "options/options_helper.h"
13 #include "port/port.h"
14 #include "port/stack_trace.h"
15 #include "test_util/fault_injection_test_env.h"
16 #include "test_util/sync_point.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 class DBWALTest : public DBTestBase {
20  public:
DBWALTest()21   DBWALTest() : DBTestBase("/db_wal_test") {}
22 
23 #if defined(ROCKSDB_PLATFORM_POSIX)
GetAllocatedFileSize(std::string file_name)24   uint64_t GetAllocatedFileSize(std::string file_name) {
25     struct stat sbuf;
26     int err = stat(file_name.c_str(), &sbuf);
27     assert(err == 0);
28     return sbuf.st_blocks * 512;
29   }
30 #endif
31 };
32 
33 // A SpecialEnv enriched to give more insight about deleted files
34 class EnrichedSpecialEnv : public SpecialEnv {
35  public:
EnrichedSpecialEnv(Env * base)36   explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
NewSequentialFile(const std::string & f,std::unique_ptr<SequentialFile> * r,const EnvOptions & soptions)37   Status NewSequentialFile(const std::string& f,
38                            std::unique_ptr<SequentialFile>* r,
39                            const EnvOptions& soptions) override {
40     InstrumentedMutexLock l(&env_mutex_);
41     if (f == skipped_wal) {
42       deleted_wal_reopened = true;
43       if (IsWAL(f) && largetest_deleted_wal.size() != 0 &&
44           f.compare(largetest_deleted_wal) <= 0) {
45         gap_in_wals = true;
46       }
47     }
48     return SpecialEnv::NewSequentialFile(f, r, soptions);
49   }
DeleteFile(const std::string & fname)50   Status DeleteFile(const std::string& fname) override {
51     if (IsWAL(fname)) {
52       deleted_wal_cnt++;
53       InstrumentedMutexLock l(&env_mutex_);
54       // If this is the first WAL, remember its name and skip deleting it. We
55       // remember its name partly because the application might attempt to
56       // delete the file again.
57       if (skipped_wal.size() != 0 && skipped_wal != fname) {
58         if (largetest_deleted_wal.size() == 0 ||
59             largetest_deleted_wal.compare(fname) < 0) {
60           largetest_deleted_wal = fname;
61         }
62       } else {
63         skipped_wal = fname;
64         return Status::OK();
65       }
66     }
67     return SpecialEnv::DeleteFile(fname);
68   }
IsWAL(const std::string & fname)69   bool IsWAL(const std::string& fname) {
70     // printf("iswal %s\n", fname.c_str());
71     return fname.compare(fname.size() - 3, 3, "log") == 0;
72   }
73 
74   InstrumentedMutex env_mutex_;
75   // the wal whose actual delete was skipped by the env
76   std::string skipped_wal = "";
77   // the largest WAL that was requested to be deleted
78   std::string largetest_deleted_wal = "";
79   // number of WALs that were successfully deleted
80   std::atomic<size_t> deleted_wal_cnt = {0};
81   // the WAL whose delete from fs was skipped is reopened during recovery
82   std::atomic<bool> deleted_wal_reopened = {false};
83   // whether a gap in the WALs was detected during recovery
84   std::atomic<bool> gap_in_wals = {false};
85 };
86 
87 class DBWALTestWithEnrichedEnv : public DBTestBase {
88  public:
DBWALTestWithEnrichedEnv()89   DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") {
90     enriched_env_ = new EnrichedSpecialEnv(env_->target());
91     auto options = CurrentOptions();
92     options.env = enriched_env_;
93     options.allow_2pc = true;
94     Reopen(options);
95     delete env_;
96     // to be deleted by the parent class
97     env_ = enriched_env_;
98   }
99 
100  protected:
101   EnrichedSpecialEnv* enriched_env_;
102 };
103 
104 // Test that the recovery would successfully avoid the gaps between the logs.
105 // One known scenario that could cause this is that the application issue the
106 // WAL deletion out of order. For the sake of simplicity in the test, here we
107 // create the gap by manipulating the env to skip deletion of the first WAL but
108 // not the ones after it.
TEST_F(DBWALTestWithEnrichedEnv,SkipDeletedWALs)109 TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
110   auto options = last_options_;
111   // To cause frequent WAL deletion
112   options.write_buffer_size = 128;
113   Reopen(options);
114 
115   WriteOptions writeOpt = WriteOptions();
116   for (int i = 0; i < 128 * 5; i++) {
117     ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
118   }
119   FlushOptions fo;
120   fo.wait = true;
121   ASSERT_OK(db_->Flush(fo));
122 
123   // some wals are deleted
124   ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
125   // but not the first one
126   ASSERT_NE(0, enriched_env_->skipped_wal.size());
127 
128   // Test that the WAL that was not deleted will be skipped during recovery
129   options = last_options_;
130   Reopen(options);
131   ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
132   ASSERT_FALSE(enriched_env_->gap_in_wals);
133 }
134 
TEST_F(DBWALTest,WAL)135 TEST_F(DBWALTest, WAL) {
136   do {
137     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
138     WriteOptions writeOpt = WriteOptions();
139     writeOpt.disableWAL = true;
140     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
141     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
142 
143     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
144     ASSERT_EQ("v1", Get(1, "foo"));
145     ASSERT_EQ("v1", Get(1, "bar"));
146 
147     writeOpt.disableWAL = false;
148     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
149     writeOpt.disableWAL = true;
150     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
151 
152     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
153     // Both value's should be present.
154     ASSERT_EQ("v2", Get(1, "bar"));
155     ASSERT_EQ("v2", Get(1, "foo"));
156 
157     writeOpt.disableWAL = true;
158     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
159     writeOpt.disableWAL = false;
160     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
161 
162     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
163     // again both values should be present.
164     ASSERT_EQ("v3", Get(1, "foo"));
165     ASSERT_EQ("v3", Get(1, "bar"));
166   } while (ChangeWalOptions());
167 }
168 
TEST_F(DBWALTest,RollLog)169 TEST_F(DBWALTest, RollLog) {
170   do {
171     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
172     ASSERT_OK(Put(1, "foo", "v1"));
173     ASSERT_OK(Put(1, "baz", "v5"));
174 
175     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
176     for (int i = 0; i < 10; i++) {
177       ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
178     }
179     ASSERT_OK(Put(1, "foo", "v4"));
180     for (int i = 0; i < 10; i++) {
181       ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
182     }
183   } while (ChangeWalOptions());
184 }
185 
TEST_F(DBWALTest,SyncWALNotBlockWrite)186 TEST_F(DBWALTest, SyncWALNotBlockWrite) {
187   Options options = CurrentOptions();
188   options.max_write_buffer_number = 4;
189   DestroyAndReopen(options);
190 
191   ASSERT_OK(Put("foo1", "bar1"));
192   ASSERT_OK(Put("foo5", "bar5"));
193 
194   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
195       {"WritableFileWriter::SyncWithoutFlush:1",
196        "DBWALTest::SyncWALNotBlockWrite:1"},
197       {"DBWALTest::SyncWALNotBlockWrite:2",
198        "WritableFileWriter::SyncWithoutFlush:2"},
199   });
200   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
201 
202   ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
203 
204   TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
205   ASSERT_OK(Put("foo2", "bar2"));
206   ASSERT_OK(Put("foo3", "bar3"));
207   FlushOptions fo;
208   fo.wait = false;
209   ASSERT_OK(db_->Flush(fo));
210   ASSERT_OK(Put("foo4", "bar4"));
211 
212   TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2");
213 
214   thread.join();
215 
216   ASSERT_EQ(Get("foo1"), "bar1");
217   ASSERT_EQ(Get("foo2"), "bar2");
218   ASSERT_EQ(Get("foo3"), "bar3");
219   ASSERT_EQ(Get("foo4"), "bar4");
220   ASSERT_EQ(Get("foo5"), "bar5");
221   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
222 }
223 
TEST_F(DBWALTest,SyncWALNotWaitWrite)224 TEST_F(DBWALTest, SyncWALNotWaitWrite) {
225   ASSERT_OK(Put("foo1", "bar1"));
226   ASSERT_OK(Put("foo3", "bar3"));
227 
228   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
229       {"SpecialEnv::WalFile::Append:1", "DBWALTest::SyncWALNotWaitWrite:1"},
230       {"DBWALTest::SyncWALNotWaitWrite:2", "SpecialEnv::WalFile::Append:2"},
231   });
232   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
233 
234   ROCKSDB_NAMESPACE::port::Thread thread(
235       [&]() { ASSERT_OK(Put("foo2", "bar2")); });
236   // Moving this to SyncWAL before the actual fsync
237   // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
238   ASSERT_OK(db_->SyncWAL());
239   // Moving this to SyncWAL after actual fsync
240   // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
241 
242   thread.join();
243 
244   ASSERT_EQ(Get("foo1"), "bar1");
245   ASSERT_EQ(Get("foo2"), "bar2");
246   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
247 }
248 
TEST_F(DBWALTest,Recover)249 TEST_F(DBWALTest, Recover) {
250   do {
251     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
252     ASSERT_OK(Put(1, "foo", "v1"));
253     ASSERT_OK(Put(1, "baz", "v5"));
254 
255     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
256     ASSERT_EQ("v1", Get(1, "foo"));
257 
258     ASSERT_EQ("v1", Get(1, "foo"));
259     ASSERT_EQ("v5", Get(1, "baz"));
260     ASSERT_OK(Put(1, "bar", "v2"));
261     ASSERT_OK(Put(1, "foo", "v3"));
262 
263     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
264     ASSERT_EQ("v3", Get(1, "foo"));
265     ASSERT_OK(Put(1, "foo", "v4"));
266     ASSERT_EQ("v4", Get(1, "foo"));
267     ASSERT_EQ("v2", Get(1, "bar"));
268     ASSERT_EQ("v5", Get(1, "baz"));
269   } while (ChangeWalOptions());
270 }
271 
TEST_F(DBWALTest,RecoverWithTableHandle)272 TEST_F(DBWALTest, RecoverWithTableHandle) {
273   do {
274     Options options = CurrentOptions();
275     options.create_if_missing = true;
276     options.disable_auto_compactions = true;
277     options.avoid_flush_during_recovery = false;
278     DestroyAndReopen(options);
279     CreateAndReopenWithCF({"pikachu"}, options);
280 
281     ASSERT_OK(Put(1, "foo", "v1"));
282     ASSERT_OK(Put(1, "bar", "v2"));
283     ASSERT_OK(Flush(1));
284     ASSERT_OK(Put(1, "foo", "v3"));
285     ASSERT_OK(Put(1, "bar", "v4"));
286     ASSERT_OK(Flush(1));
287     ASSERT_OK(Put(1, "big", std::string(100, 'a')));
288 
289     options = CurrentOptions();
290     const int kSmallMaxOpenFiles = 13;
291     if (option_config_ == kDBLogDir) {
292       // Use this option to check not preloading files
293       // Set the max open files to be small enough so no preload will
294       // happen.
295       options.max_open_files = kSmallMaxOpenFiles;
296       // RocksDB sanitize max open files to at least 20. Modify it back.
297       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
298           "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
299             int* max_open_files = static_cast<int*>(arg);
300             *max_open_files = kSmallMaxOpenFiles;
301           });
302 
303     } else if (option_config_ == kWalDirAndMmapReads) {
304       // Use this option to check always loading all files.
305       options.max_open_files = 100;
306     } else {
307       options.max_open_files = -1;
308     }
309     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
310     ReopenWithColumnFamilies({"default", "pikachu"}, options);
311     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
312     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
313 
314     std::vector<std::vector<FileMetaData>> files;
315     dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
316     size_t total_files = 0;
317     for (const auto& level : files) {
318       total_files += level.size();
319     }
320     ASSERT_EQ(total_files, 3);
321     for (const auto& level : files) {
322       for (const auto& file : level) {
323         if (options.max_open_files == kSmallMaxOpenFiles) {
324           ASSERT_TRUE(file.table_reader_handle == nullptr);
325         } else {
326           ASSERT_TRUE(file.table_reader_handle != nullptr);
327         }
328       }
329     }
330   } while (ChangeWalOptions());
331 }
332 
TEST_F(DBWALTest,IgnoreRecoveredLog)333 TEST_F(DBWALTest, IgnoreRecoveredLog) {
334   std::string backup_logs = dbname_ + "/backup_logs";
335 
336   do {
337     // delete old files in backup_logs directory
338     env_->CreateDirIfMissing(backup_logs);
339     std::vector<std::string> old_files;
340     env_->GetChildren(backup_logs, &old_files);
341     for (auto& file : old_files) {
342       if (file != "." && file != "..") {
343         env_->DeleteFile(backup_logs + "/" + file);
344       }
345     }
346     Options options = CurrentOptions();
347     options.create_if_missing = true;
348     options.merge_operator = MergeOperators::CreateUInt64AddOperator();
349     options.wal_dir = dbname_ + "/logs";
350     DestroyAndReopen(options);
351 
352     // fill up the DB
353     std::string one, two;
354     PutFixed64(&one, 1);
355     PutFixed64(&two, 2);
356     ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
357     ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
358     ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
359 
360     // copy the logs to backup
361     std::vector<std::string> logs;
362     env_->GetChildren(options.wal_dir, &logs);
363     for (auto& log : logs) {
364       if (log != ".." && log != ".") {
365         CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
366       }
367     }
368 
369     // recover the DB
370     Reopen(options);
371     ASSERT_EQ(two, Get("foo"));
372     ASSERT_EQ(one, Get("bar"));
373     Close();
374 
375     // copy the logs from backup back to wal dir
376     for (auto& log : logs) {
377       if (log != ".." && log != ".") {
378         CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
379       }
380     }
381     // this should ignore the log files, recovery should not happen again
382     // if the recovery happens, the same merge operator would be called twice,
383     // leading to incorrect results
384     Reopen(options);
385     ASSERT_EQ(two, Get("foo"));
386     ASSERT_EQ(one, Get("bar"));
387     Close();
388     Destroy(options);
389     Reopen(options);
390     Close();
391 
392     // copy the logs from backup back to wal dir
393     env_->CreateDirIfMissing(options.wal_dir);
394     for (auto& log : logs) {
395       if (log != ".." && log != ".") {
396         CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
397       }
398     }
399     // assert that we successfully recovered only from logs, even though we
400     // destroyed the DB
401     Reopen(options);
402     ASSERT_EQ(two, Get("foo"));
403     ASSERT_EQ(one, Get("bar"));
404 
405     // Recovery will fail if DB directory doesn't exist.
406     Destroy(options);
407     // copy the logs from backup back to wal dir
408     env_->CreateDirIfMissing(options.wal_dir);
409     for (auto& log : logs) {
410       if (log != ".." && log != ".") {
411         CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
412         // we won't be needing this file no more
413         env_->DeleteFile(backup_logs + "/" + log);
414       }
415     }
416     Status s = TryReopen(options);
417     ASSERT_TRUE(!s.ok());
418     Destroy(options);
419   } while (ChangeWalOptions());
420 }
421 
TEST_F(DBWALTest,RecoveryWithEmptyLog)422 TEST_F(DBWALTest, RecoveryWithEmptyLog) {
423   do {
424     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
425     ASSERT_OK(Put(1, "foo", "v1"));
426     ASSERT_OK(Put(1, "foo", "v2"));
427     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
428     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
429     ASSERT_OK(Put(1, "foo", "v3"));
430     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
431     ASSERT_EQ("v3", Get(1, "foo"));
432   } while (ChangeWalOptions());
433 }
434 
435 #if !(defined NDEBUG) || !defined(OS_WIN)
TEST_F(DBWALTest,PreallocateBlock)436 TEST_F(DBWALTest, PreallocateBlock) {
437   Options options = CurrentOptions();
438   options.write_buffer_size = 10 * 1000 * 1000;
439   options.max_total_wal_size = 0;
440 
441   size_t expected_preallocation_size = static_cast<size_t>(
442       options.write_buffer_size + options.write_buffer_size / 10);
443 
444   DestroyAndReopen(options);
445 
446   std::atomic<int> called(0);
447   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
448       "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
449         ASSERT_TRUE(arg != nullptr);
450         size_t preallocation_size = *(static_cast<size_t*>(arg));
451         ASSERT_EQ(expected_preallocation_size, preallocation_size);
452         called.fetch_add(1);
453       });
454   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
455   Put("", "");
456   Flush();
457   Put("", "");
458   Close();
459   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
460   ASSERT_EQ(2, called.load());
461 
462   options.max_total_wal_size = 1000 * 1000;
463   expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
464   Reopen(options);
465   called.store(0);
466   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
467       "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
468         ASSERT_TRUE(arg != nullptr);
469         size_t preallocation_size = *(static_cast<size_t*>(arg));
470         ASSERT_EQ(expected_preallocation_size, preallocation_size);
471         called.fetch_add(1);
472       });
473   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
474   Put("", "");
475   Flush();
476   Put("", "");
477   Close();
478   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
479   ASSERT_EQ(2, called.load());
480 
481   options.db_write_buffer_size = 800 * 1000;
482   expected_preallocation_size =
483       static_cast<size_t>(options.db_write_buffer_size);
484   Reopen(options);
485   called.store(0);
486   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
487       "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
488         ASSERT_TRUE(arg != nullptr);
489         size_t preallocation_size = *(static_cast<size_t*>(arg));
490         ASSERT_EQ(expected_preallocation_size, preallocation_size);
491         called.fetch_add(1);
492       });
493   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
494   Put("", "");
495   Flush();
496   Put("", "");
497   Close();
498   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
499   ASSERT_EQ(2, called.load());
500 
501   expected_preallocation_size = 700 * 1000;
502   std::shared_ptr<WriteBufferManager> write_buffer_manager =
503       std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
504   options.write_buffer_manager = write_buffer_manager;
505   Reopen(options);
506   called.store(0);
507   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
508       "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
509         ASSERT_TRUE(arg != nullptr);
510         size_t preallocation_size = *(static_cast<size_t*>(arg));
511         ASSERT_EQ(expected_preallocation_size, preallocation_size);
512         called.fetch_add(1);
513       });
514   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
515   Put("", "");
516   Flush();
517   Put("", "");
518   Close();
519   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
520   ASSERT_EQ(2, called.load());
521 }
522 #endif  // !(defined NDEBUG) || !defined(OS_WIN)
523 
524 #ifndef ROCKSDB_LITE
TEST_F(DBWALTest,FullPurgePreservesRecycledLog)525 TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
526   // For github issue #1303
527   for (int i = 0; i < 2; ++i) {
528     Options options = CurrentOptions();
529     options.create_if_missing = true;
530     options.recycle_log_file_num = 2;
531     if (i != 0) {
532       options.wal_dir = alternative_wal_dir_;
533     }
534 
535     DestroyAndReopen(options);
536     ASSERT_OK(Put("foo", "v1"));
537     VectorLogPtr log_files;
538     ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
539     ASSERT_GT(log_files.size(), 0);
540     ASSERT_OK(Flush());
541 
542     // Now the original WAL is in log_files[0] and should be marked for
543     // recycling.
544     // Verify full purge cannot remove this file.
545     JobContext job_context(0);
546     dbfull()->TEST_LockMutex();
547     dbfull()->FindObsoleteFiles(&job_context, true /* force */);
548     dbfull()->TEST_UnlockMutex();
549     dbfull()->PurgeObsoleteFiles(job_context);
550 
551     if (i == 0) {
552       ASSERT_OK(
553           env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
554     } else {
555       ASSERT_OK(env_->FileExists(
556           LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
557     }
558   }
559 }
560 
TEST_F(DBWALTest,FullPurgePreservesLogPendingReuse)561 TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
562   // Ensures full purge cannot delete a WAL while it's in the process of being
563   // recycled. In particular, we force the full purge after a file has been
564   // chosen for reuse, but before it has been renamed.
565   for (int i = 0; i < 2; ++i) {
566     Options options = CurrentOptions();
567     options.recycle_log_file_num = 1;
568     if (i != 0) {
569       options.wal_dir = alternative_wal_dir_;
570     }
571     DestroyAndReopen(options);
572 
573     // The first flush creates a second log so writes can continue before the
574     // flush finishes.
575     ASSERT_OK(Put("foo", "bar"));
576     ASSERT_OK(Flush());
577 
578     // The second flush can recycle the first log. Sync points enforce the
579     // full purge happens after choosing the log to recycle and before it is
580     // renamed.
581     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
582         {"DBImpl::CreateWAL:BeforeReuseWritableFile1",
583          "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
584         {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
585          "DBImpl::CreateWAL:BeforeReuseWritableFile2"},
586     });
587     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
588     ROCKSDB_NAMESPACE::port::Thread thread([&]() {
589       TEST_SYNC_POINT(
590           "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
591       ASSERT_OK(db_->EnableFileDeletions(true));
592       TEST_SYNC_POINT(
593           "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
594     });
595     ASSERT_OK(Put("foo", "bar"));
596     ASSERT_OK(Flush());
597     thread.join();
598   }
599 }
600 
TEST_F(DBWALTest,GetSortedWalFiles)601 TEST_F(DBWALTest, GetSortedWalFiles) {
602   do {
603     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
604     VectorLogPtr log_files;
605     ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
606     ASSERT_EQ(0, log_files.size());
607 
608     ASSERT_OK(Put(1, "foo", "v1"));
609     ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
610     ASSERT_EQ(1, log_files.size());
611   } while (ChangeWalOptions());
612 }
613 
TEST_F(DBWALTest,GetCurrentWalFile)614 TEST_F(DBWALTest, GetCurrentWalFile) {
615   do {
616     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
617 
618     std::unique_ptr<LogFile>* bad_log_file = nullptr;
619     ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file));
620 
621     std::unique_ptr<LogFile> log_file;
622     ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
623 
624     // nothing has been written to the log yet
625     ASSERT_EQ(log_file->StartSequence(), 0);
626     ASSERT_EQ(log_file->SizeFileBytes(), 0);
627     ASSERT_EQ(log_file->Type(), kAliveLogFile);
628     ASSERT_GT(log_file->LogNumber(), 0);
629 
630     // add some data and verify that the file size actually moves foward
631     ASSERT_OK(Put(0, "foo", "v1"));
632     ASSERT_OK(Put(0, "foo2", "v2"));
633     ASSERT_OK(Put(0, "foo3", "v3"));
634 
635     ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
636 
637     ASSERT_EQ(log_file->StartSequence(), 0);
638     ASSERT_GT(log_file->SizeFileBytes(), 0);
639     ASSERT_EQ(log_file->Type(), kAliveLogFile);
640     ASSERT_GT(log_file->LogNumber(), 0);
641 
642     // force log files to cycle and add some more data, then check if
643     // log number moves forward
644 
645     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
646     for (int i = 0; i < 10; i++) {
647       ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
648     }
649 
650     ASSERT_OK(Put(0, "foo4", "v4"));
651     ASSERT_OK(Put(0, "foo5", "v5"));
652     ASSERT_OK(Put(0, "foo6", "v6"));
653 
654     ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
655 
656     ASSERT_EQ(log_file->StartSequence(), 0);
657     ASSERT_GT(log_file->SizeFileBytes(), 0);
658     ASSERT_EQ(log_file->Type(), kAliveLogFile);
659     ASSERT_GT(log_file->LogNumber(), 0);
660 
661   } while (ChangeWalOptions());
662 }
663 
TEST_F(DBWALTest,RecoveryWithLogDataForSomeCFs)664 TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
665   // Test for regression of WAL cleanup missing files that don't contain data
666   // for every column family.
667   do {
668     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
669     ASSERT_OK(Put(1, "foo", "v1"));
670     ASSERT_OK(Put(1, "foo", "v2"));
671     uint64_t earliest_log_nums[2];
672     for (int i = 0; i < 2; ++i) {
673       if (i > 0) {
674         ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
675       }
676       VectorLogPtr log_files;
677       ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
678       if (log_files.size() > 0) {
679         earliest_log_nums[i] = log_files[0]->LogNumber();
680       } else {
681         earliest_log_nums[i] = port::kMaxUint64;
682       }
683     }
684     // Check at least the first WAL was cleaned up during the recovery.
685     ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
686   } while (ChangeWalOptions());
687 }
688 
TEST_F(DBWALTest,RecoverWithLargeLog)689 TEST_F(DBWALTest, RecoverWithLargeLog) {
690   do {
691     {
692       Options options = CurrentOptions();
693       CreateAndReopenWithCF({"pikachu"}, options);
694       ASSERT_OK(Put(1, "big1", std::string(200000, '1')));
695       ASSERT_OK(Put(1, "big2", std::string(200000, '2')));
696       ASSERT_OK(Put(1, "small3", std::string(10, '3')));
697       ASSERT_OK(Put(1, "small4", std::string(10, '4')));
698       ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
699     }
700 
701     // Make sure that if we re-open with a small write buffer size that
702     // we flush table files in the middle of a large log file.
703     Options options;
704     options.write_buffer_size = 100000;
705     options = CurrentOptions(options);
706     ReopenWithColumnFamilies({"default", "pikachu"}, options);
707     ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
708     ASSERT_EQ(std::string(200000, '1'), Get(1, "big1"));
709     ASSERT_EQ(std::string(200000, '2'), Get(1, "big2"));
710     ASSERT_EQ(std::string(10, '3'), Get(1, "small3"));
711     ASSERT_EQ(std::string(10, '4'), Get(1, "small4"));
712     ASSERT_GT(NumTableFilesAtLevel(0, 1), 1);
713   } while (ChangeWalOptions());
714 }
715 
716 // In https://reviews.facebook.net/D20661 we change
717 // recovery behavior: previously for each log file each column family
718 // memtable was flushed, even it was empty. Now it's changed:
719 // we try to create the smallest number of table files by merging
720 // updates from multiple logs
TEST_F(DBWALTest,RecoverCheckFileAmountWithSmallWriteBuffer)721 TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
722   Options options = CurrentOptions();
723   options.write_buffer_size = 5000000;
724   CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
725 
726   // Since we will reopen DB with smaller write_buffer_size,
727   // each key will go to new SST file
728   ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
729   ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
730   ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
731   ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
732 
733   ASSERT_OK(Put(3, Key(10), DummyString(1)));
734   // Make 'dobrynia' to be flushed and new WAL file to be created
735   ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
736   ASSERT_OK(Put(2, Key(1), DummyString(1)));
737   dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
738   {
739     auto tables = ListTableFiles(env_, dbname_);
740     ASSERT_EQ(tables.size(), static_cast<size_t>(1));
741     // Make sure 'dobrynia' was flushed: check sst files amount
742     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
743               static_cast<uint64_t>(1));
744   }
745   // New WAL file
746   ASSERT_OK(Put(1, Key(1), DummyString(1)));
747   ASSERT_OK(Put(1, Key(1), DummyString(1)));
748   ASSERT_OK(Put(3, Key(10), DummyString(1)));
749   ASSERT_OK(Put(3, Key(10), DummyString(1)));
750   ASSERT_OK(Put(3, Key(10), DummyString(1)));
751 
752   options.write_buffer_size = 4096;
753   options.arena_block_size = 4096;
754   ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
755                            options);
756   {
757     // No inserts => default is empty
758     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
759               static_cast<uint64_t>(0));
760     // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys
761     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
762               static_cast<uint64_t>(5));
763     // 1 SST for big key + 1 SST for small one
764     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
765               static_cast<uint64_t>(2));
766     // 1 SST for all keys
767     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
768               static_cast<uint64_t>(1));
769   }
770 }
771 
772 // In https://reviews.facebook.net/D20661 we change
773 // recovery behavior: previously for each log file each column family
774 // memtable was flushed, even it wasn't empty. Now it's changed:
775 // we try to create the smallest number of table files by merging
776 // updates from multiple logs
TEST_F(DBWALTest,RecoverCheckFileAmount)777 TEST_F(DBWALTest, RecoverCheckFileAmount) {
778   Options options = CurrentOptions();
779   options.write_buffer_size = 100000;
780   options.arena_block_size = 4 * 1024;
781   options.avoid_flush_during_recovery = false;
782   CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
783 
784   ASSERT_OK(Put(0, Key(1), DummyString(1)));
785   ASSERT_OK(Put(1, Key(1), DummyString(1)));
786   ASSERT_OK(Put(2, Key(1), DummyString(1)));
787 
788   // Make 'nikitich' memtable to be flushed
789   ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
790   ASSERT_OK(Put(3, Key(1), DummyString(1)));
791   dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
792   // 4 memtable are not flushed, 1 sst file
793   {
794     auto tables = ListTableFiles(env_, dbname_);
795     ASSERT_EQ(tables.size(), static_cast<size_t>(1));
796     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
797               static_cast<uint64_t>(1));
798   }
799   // Memtable for 'nikitich' has flushed, new WAL file has opened
800   // 4 memtable still not flushed
801 
802   // Write to new WAL file
803   ASSERT_OK(Put(0, Key(1), DummyString(1)));
804   ASSERT_OK(Put(1, Key(1), DummyString(1)));
805   ASSERT_OK(Put(2, Key(1), DummyString(1)));
806 
807   // Fill up 'nikitich' one more time
808   ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
809   // make it flush
810   ASSERT_OK(Put(3, Key(1), DummyString(1)));
811   dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
812   // There are still 4 memtable not flushed, and 2 sst tables
813   ASSERT_OK(Put(0, Key(1), DummyString(1)));
814   ASSERT_OK(Put(1, Key(1), DummyString(1)));
815   ASSERT_OK(Put(2, Key(1), DummyString(1)));
816 
817   {
818     auto tables = ListTableFiles(env_, dbname_);
819     ASSERT_EQ(tables.size(), static_cast<size_t>(2));
820     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
821               static_cast<uint64_t>(2));
822   }
823 
824   ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
825                            options);
826   {
827     std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_);
828     // Check, that records for 'default', 'dobrynia' and 'pikachu' from
829     // first, second and third WALs  went to the same SST.
830     // So, there is 6 SSTs: three  for 'nikitich', one for 'default', one for
831     // 'dobrynia', one for 'pikachu'
832     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
833               static_cast<uint64_t>(1));
834     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
835               static_cast<uint64_t>(3));
836     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
837               static_cast<uint64_t>(1));
838     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
839               static_cast<uint64_t>(1));
840   }
841 }
842 
TEST_F(DBWALTest,SyncMultipleLogs)843 TEST_F(DBWALTest, SyncMultipleLogs) {
844   const uint64_t kNumBatches = 2;
845   const int kBatchSize = 1000;
846 
847   Options options = CurrentOptions();
848   options.create_if_missing = true;
849   options.write_buffer_size = 4096;
850   Reopen(options);
851 
852   WriteBatch batch;
853   WriteOptions wo;
854   wo.sync = true;
855 
856   for (uint64_t b = 0; b < kNumBatches; b++) {
857     batch.Clear();
858     for (int i = 0; i < kBatchSize; i++) {
859       batch.Put(Key(i), DummyString(128));
860     }
861 
862     dbfull()->Write(wo, &batch);
863   }
864 
865   ASSERT_OK(dbfull()->SyncWAL());
866 }
867 
868 // Github issue 1339. Prior the fix we read sequence id from the first log to
869 // a local variable, then keep increase the variable as we replay logs,
870 // ignoring actual sequence id of the records. This is incorrect if some writes
871 // come with WAL disabled.
TEST_F(DBWALTest,PartOfWritesWithWALDisabled)872 TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
873   std::unique_ptr<FaultInjectionTestEnv> fault_env(
874       new FaultInjectionTestEnv(env_));
875   Options options = CurrentOptions();
876   options.env = fault_env.get();
877   options.disable_auto_compactions = true;
878   WriteOptions wal_on, wal_off;
879   wal_on.sync = true;
880   wal_on.disableWAL = false;
881   wal_off.disableWAL = true;
882   CreateAndReopenWithCF({"dummy"}, options);
883   ASSERT_OK(Put(1, "dummy", "d1", wal_on));  // seq id 1
884   ASSERT_OK(Put(1, "dummy", "d2", wal_off));
885   ASSERT_OK(Put(1, "dummy", "d3", wal_off));
886   ASSERT_OK(Put(0, "key", "v4", wal_on));  // seq id 4
887   ASSERT_OK(Flush(0));
888   ASSERT_OK(Put(0, "key", "v5", wal_on));  // seq id 5
889   ASSERT_EQ("v5", Get(0, "key"));
890   dbfull()->FlushWAL(false);
891   // Simulate a crash.
892   fault_env->SetFilesystemActive(false);
893   Close();
894   fault_env->ResetState();
895   ReopenWithColumnFamilies({"default", "dummy"}, options);
896   // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
897   ASSERT_EQ("v5", Get(0, "key"));
898   // Destroy DB before destruct fault_env.
899   Destroy(options);
900 }
901 
902 //
903 // Test WAL recovery for the various modes available
904 //
905 class RecoveryTestHelper {
906  public:
907   // Number of WAL files to generate
908   static const int kWALFilesCount = 10;
909   // Starting number for the WAL file name like 00010.log
910   static const int kWALFileOffset = 10;
911   // Keys to be written per WAL file
912   static const int kKeysPerWALFile = 133;
913   // Size of the value
914   static const int kValueSize = 96;
915 
916   // Create WAL files with values filled in
FillData(DBWALTest * test,const Options & options,const size_t wal_count,size_t * count)917   static void FillData(DBWALTest* test, const Options& options,
918                        const size_t wal_count, size_t* count) {
919     // Calling internal functions requires sanitized options.
920     Options sanitized_options = SanitizeOptions(test->dbname_, options);
921     const ImmutableDBOptions db_options(sanitized_options);
922 
923     *count = 0;
924 
925     std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
926     EnvOptions env_options;
927     WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
928 
929     std::unique_ptr<VersionSet> versions;
930     std::unique_ptr<WalManager> wal_manager;
931     WriteController write_controller;
932 
933     versions.reset(new VersionSet(test->dbname_, &db_options, env_options,
934                                   table_cache.get(), &write_buffer_manager,
935                                   &write_controller,
936                                   /*block_cache_tracer=*/nullptr));
937 
938     wal_manager.reset(new WalManager(db_options, env_options));
939 
940     std::unique_ptr<log::Writer> current_log_writer;
941 
942     for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
943       uint64_t current_log_number = j;
944       std::string fname = LogFileName(test->dbname_, current_log_number);
945       std::unique_ptr<WritableFile> file;
946       ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
947       std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
948           NewLegacyWritableFileWrapper(std::move(file)), fname, env_options));
949       current_log_writer.reset(
950           new log::Writer(std::move(file_writer), current_log_number,
951                           db_options.recycle_log_file_num > 0));
952 
953       WriteBatch batch;
954       for (int i = 0; i < kKeysPerWALFile; i++) {
955         std::string key = "key" + ToString((*count)++);
956         std::string value = test->DummyString(kValueSize);
957         assert(current_log_writer.get() != nullptr);
958         uint64_t seq = versions->LastSequence() + 1;
959         batch.Clear();
960         batch.Put(key, value);
961         WriteBatchInternal::SetSequence(&batch, seq);
962         current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
963         versions->SetLastAllocatedSequence(seq);
964         versions->SetLastPublishedSequence(seq);
965         versions->SetLastSequence(seq);
966       }
967     }
968   }
969 
970   // Recreate and fill the store with some data
FillData(DBWALTest * test,Options * options)971   static size_t FillData(DBWALTest* test, Options* options) {
972     options->create_if_missing = true;
973     test->DestroyAndReopen(*options);
974     test->Close();
975 
976     size_t count = 0;
977     FillData(test, *options, kWALFilesCount, &count);
978     return count;
979   }
980 
981   // Read back all the keys we wrote and return the number of keys found
GetData(DBWALTest * test)982   static size_t GetData(DBWALTest* test) {
983     size_t count = 0;
984     for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
985       if (test->Get("key" + ToString(i)) != "NOT_FOUND") {
986         ++count;
987       }
988     }
989     return count;
990   }
991 
992   // Manuall corrupt the specified WAL
CorruptWAL(DBWALTest * test,const Options & options,const double off,const double len,const int wal_file_id,const bool trunc=false)993   static void CorruptWAL(DBWALTest* test, const Options& options,
994                          const double off, const double len,
995                          const int wal_file_id, const bool trunc = false) {
996     Env* env = options.env;
997     std::string fname = LogFileName(test->dbname_, wal_file_id);
998     uint64_t size;
999     ASSERT_OK(env->GetFileSize(fname, &size));
1000     ASSERT_GT(size, 0);
1001 #ifdef OS_WIN
1002     // Windows disk cache behaves differently. When we truncate
1003     // the original content is still in the cache due to the original
1004     // handle is still open. Generally, in Windows, one prohibits
1005     // shared access to files and it is not needed for WAL but we allow
1006     // it to induce corruption at various tests.
1007     test->Close();
1008 #endif
1009     if (trunc) {
1010       ASSERT_EQ(0, truncate(fname.c_str(), static_cast<int64_t>(size * off)));
1011     } else {
1012       InduceCorruption(fname, static_cast<size_t>(size * off + 8),
1013                        static_cast<size_t>(size * len));
1014     }
1015   }
1016 
1017   // Overwrite data with 'a' from offset for length len
InduceCorruption(const std::string & filename,size_t offset,size_t len)1018   static void InduceCorruption(const std::string& filename, size_t offset,
1019                                size_t len) {
1020     ASSERT_GT(len, 0U);
1021 
1022     int fd = open(filename.c_str(), O_RDWR);
1023 
1024     // On windows long is 32-bit
1025     ASSERT_LE(offset, std::numeric_limits<long>::max());
1026 
1027     ASSERT_GT(fd, 0);
1028     ASSERT_EQ(offset, lseek(fd, static_cast<long>(offset), SEEK_SET));
1029 
1030     void* buf = alloca(len);
1031     memset(buf, 'b', len);
1032     ASSERT_EQ(len, write(fd, buf, static_cast<unsigned int>(len)));
1033 
1034     close(fd);
1035   }
1036 };
1037 
1038 // Test scope:
1039 // - We expect to open the data store when there is incomplete trailing writes
1040 // at the end of any of the logs
1041 // - We do not expect to open the data store for corruption
TEST_F(DBWALTest,kTolerateCorruptedTailRecords)1042 TEST_F(DBWALTest, kTolerateCorruptedTailRecords) {
1043   const int jstart = RecoveryTestHelper::kWALFileOffset;
1044   const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1045 
1046   for (auto trunc : {true, false}) {        /* Corruption style */
1047     for (int i = 0; i < 3; i++) {           /* Corruption offset position */
1048       for (int j = jstart; j < jend; j++) { /* WAL file */
1049         // Fill data for testing
1050         Options options = CurrentOptions();
1051         const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1052         // test checksum failure or parsing
1053         RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1054                                        /*len%=*/.1, /*wal=*/j, trunc);
1055 
1056         if (trunc) {
1057           options.wal_recovery_mode =
1058               WALRecoveryMode::kTolerateCorruptedTailRecords;
1059           options.create_if_missing = false;
1060           ASSERT_OK(TryReopen(options));
1061           const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
1062           ASSERT_TRUE(i == 0 || recovered_row_count > 0);
1063           ASSERT_LT(recovered_row_count, row_count);
1064         } else {
1065           options.wal_recovery_mode =
1066               WALRecoveryMode::kTolerateCorruptedTailRecords;
1067           ASSERT_NOK(TryReopen(options));
1068         }
1069       }
1070     }
1071   }
1072 }
1073 
1074 // Test scope:
1075 // We don't expect the data store to be opened if there is any corruption
1076 // (leading, middle or trailing -- incomplete writes or corruption)
TEST_F(DBWALTest,kAbsoluteConsistency)1077 TEST_F(DBWALTest, kAbsoluteConsistency) {
1078   const int jstart = RecoveryTestHelper::kWALFileOffset;
1079   const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1080 
1081   // Verify clean slate behavior
1082   Options options = CurrentOptions();
1083   const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1084   options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
1085   options.create_if_missing = false;
1086   ASSERT_OK(TryReopen(options));
1087   ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
1088 
1089   for (auto trunc : {true, false}) { /* Corruption style */
1090     for (int i = 0; i < 4; i++) {    /* Corruption offset position */
1091       if (trunc && i == 0) {
1092         continue;
1093       }
1094 
1095       for (int j = jstart; j < jend; j++) { /* wal files */
1096         // fill with new date
1097         RecoveryTestHelper::FillData(this, &options);
1098         // corrupt the wal
1099         RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1100                                        /*len%=*/.1, j, trunc);
1101         // verify
1102         options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
1103         options.create_if_missing = false;
1104         ASSERT_NOK(TryReopen(options));
1105       }
1106     }
1107   }
1108 }
1109 
1110 // Test scope:
1111 // We don't expect the data store to be opened if there is any inconsistency
1112 // between WAL and SST files
TEST_F(DBWALTest,kPointInTimeRecoveryCFConsistency)1113 TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
1114   Options options = CurrentOptions();
1115   options.avoid_flush_during_recovery = true;
1116 
1117   // Create DB with multiple column families.
1118   CreateAndReopenWithCF({"one", "two"}, options);
1119   ASSERT_OK(Put(1, "key1", "val1"));
1120   ASSERT_OK(Put(2, "key2", "val2"));
1121 
1122   // Record the offset at this point
1123   Env* env = options.env;
1124   uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
1125   std::string fname = LogFileName(dbname_, wal_file_id);
1126   uint64_t offset_to_corrupt;
1127   ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
1128   ASSERT_GT(offset_to_corrupt, 0);
1129 
1130   ASSERT_OK(Put(1, "key3", "val3"));
1131   // Corrupt WAL at location of key3
1132   RecoveryTestHelper::InduceCorruption(
1133       fname, static_cast<size_t>(offset_to_corrupt), static_cast<size_t>(4));
1134   ASSERT_OK(Put(2, "key4", "val4"));
1135   ASSERT_OK(Put(1, "key5", "val5"));
1136   Flush(2);
1137 
1138   // PIT recovery & verify
1139   options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
1140   ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
1141 }
1142 
1143 // Test scope:
1144 // - We expect to open data store under all circumstances
1145 // - We expect only data upto the point where the first error was encountered
TEST_F(DBWALTest,kPointInTimeRecovery)1146 TEST_F(DBWALTest, kPointInTimeRecovery) {
1147   const int jstart = RecoveryTestHelper::kWALFileOffset;
1148   const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1149   const int maxkeys =
1150       RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile;
1151 
1152   for (auto trunc : {true, false}) {        /* Corruption style */
1153     for (int i = 0; i < 4; i++) {           /* Offset of corruption */
1154       for (int j = jstart; j < jend; j++) { /* WAL file */
1155         // Fill data for testing
1156         Options options = CurrentOptions();
1157         const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1158 
1159         // Corrupt the wal
1160         RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1161                                        /*len%=*/.1, j, trunc);
1162 
1163         // Verify
1164         options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
1165         options.create_if_missing = false;
1166         ASSERT_OK(TryReopen(options));
1167 
1168         // Probe data for invariants
1169         size_t recovered_row_count = RecoveryTestHelper::GetData(this);
1170         ASSERT_LT(recovered_row_count, row_count);
1171 
1172         bool expect_data = true;
1173         for (size_t k = 0; k < maxkeys; ++k) {
1174           bool found = Get("key" + ToString(i)) != "NOT_FOUND";
1175           if (expect_data && !found) {
1176             expect_data = false;
1177           }
1178           ASSERT_EQ(found, expect_data);
1179         }
1180 
1181         const size_t min = RecoveryTestHelper::kKeysPerWALFile *
1182                            (j - RecoveryTestHelper::kWALFileOffset);
1183         ASSERT_GE(recovered_row_count, min);
1184         if (!trunc && i != 0) {
1185           const size_t max = RecoveryTestHelper::kKeysPerWALFile *
1186                              (j - RecoveryTestHelper::kWALFileOffset + 1);
1187           ASSERT_LE(recovered_row_count, max);
1188         }
1189       }
1190     }
1191   }
1192 }
1193 
1194 // Test scope:
1195 // - We expect to open the data store under all scenarios
1196 // - We expect to have recovered records past the corruption zone
TEST_F(DBWALTest,kSkipAnyCorruptedRecords)1197 TEST_F(DBWALTest, kSkipAnyCorruptedRecords) {
1198   const int jstart = RecoveryTestHelper::kWALFileOffset;
1199   const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1200 
1201   for (auto trunc : {true, false}) {        /* Corruption style */
1202     for (int i = 0; i < 4; i++) {           /* Corruption offset */
1203       for (int j = jstart; j < jend; j++) { /* wal files */
1204         // Fill data for testing
1205         Options options = CurrentOptions();
1206         const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1207 
1208         // Corrupt the WAL
1209         RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1210                                        /*len%=*/.1, j, trunc);
1211 
1212         // Verify behavior
1213         options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
1214         options.create_if_missing = false;
1215         ASSERT_OK(TryReopen(options));
1216 
1217         // Probe data for invariants
1218         size_t recovered_row_count = RecoveryTestHelper::GetData(this);
1219         ASSERT_LT(recovered_row_count, row_count);
1220 
1221         if (!trunc) {
1222           ASSERT_TRUE(i != 0 || recovered_row_count > 0);
1223         }
1224       }
1225     }
1226   }
1227 }
1228 
TEST_F(DBWALTest,AvoidFlushDuringRecovery)1229 TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
1230   Options options = CurrentOptions();
1231   options.disable_auto_compactions = true;
1232   options.avoid_flush_during_recovery = false;
1233 
1234   // Test with flush after recovery.
1235   Reopen(options);
1236   ASSERT_OK(Put("foo", "v1"));
1237   ASSERT_OK(Put("bar", "v2"));
1238   ASSERT_OK(Flush());
1239   ASSERT_OK(Put("foo", "v3"));
1240   ASSERT_OK(Put("bar", "v4"));
1241   ASSERT_EQ(1, TotalTableFiles());
1242   // Reopen DB. Check if WAL logs flushed.
1243   Reopen(options);
1244   ASSERT_EQ("v3", Get("foo"));
1245   ASSERT_EQ("v4", Get("bar"));
1246   ASSERT_EQ(2, TotalTableFiles());
1247 
1248   // Test without flush after recovery.
1249   options.avoid_flush_during_recovery = true;
1250   DestroyAndReopen(options);
1251   ASSERT_OK(Put("foo", "v5"));
1252   ASSERT_OK(Put("bar", "v6"));
1253   ASSERT_OK(Flush());
1254   ASSERT_OK(Put("foo", "v7"));
1255   ASSERT_OK(Put("bar", "v8"));
1256   ASSERT_EQ(1, TotalTableFiles());
1257   // Reopen DB. WAL logs should not be flushed this time.
1258   Reopen(options);
1259   ASSERT_EQ("v7", Get("foo"));
1260   ASSERT_EQ("v8", Get("bar"));
1261   ASSERT_EQ(1, TotalTableFiles());
1262 
1263   // Force flush with allow_2pc.
1264   options.avoid_flush_during_recovery = true;
1265   options.allow_2pc = true;
1266   ASSERT_OK(Put("foo", "v9"));
1267   ASSERT_OK(Put("bar", "v10"));
1268   ASSERT_OK(Flush());
1269   ASSERT_OK(Put("foo", "v11"));
1270   ASSERT_OK(Put("bar", "v12"));
1271   Reopen(options);
1272   ASSERT_EQ("v11", Get("foo"));
1273   ASSERT_EQ("v12", Get("bar"));
1274   ASSERT_EQ(3, TotalTableFiles());
1275 }
1276 
TEST_F(DBWALTest,WalCleanupAfterAvoidFlushDuringRecovery)1277 TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
1278   // Verifies WAL files that were present during recovery, but not flushed due
1279   // to avoid_flush_during_recovery, will be considered for deletion at a later
1280   // stage. We check at least one such file is deleted during Flush().
1281   Options options = CurrentOptions();
1282   options.disable_auto_compactions = true;
1283   options.avoid_flush_during_recovery = true;
1284   Reopen(options);
1285 
1286   ASSERT_OK(Put("foo", "v1"));
1287   Reopen(options);
1288   for (int i = 0; i < 2; ++i) {
1289     if (i > 0) {
1290       // Flush() triggers deletion of obsolete tracked files
1291       Flush();
1292     }
1293     VectorLogPtr log_files;
1294     ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
1295     if (i == 0) {
1296       ASSERT_GT(log_files.size(), 0);
1297     } else {
1298       ASSERT_EQ(0, log_files.size());
1299     }
1300   }
1301 }
1302 
TEST_F(DBWALTest,RecoverWithoutFlush)1303 TEST_F(DBWALTest, RecoverWithoutFlush) {
1304   Options options = CurrentOptions();
1305   options.avoid_flush_during_recovery = true;
1306   options.create_if_missing = false;
1307   options.disable_auto_compactions = true;
1308   options.write_buffer_size = 64 * 1024 * 1024;
1309 
1310   size_t count = RecoveryTestHelper::FillData(this, &options);
1311   auto validateData = [this, count]() {
1312     for (size_t i = 0; i < count; i++) {
1313       ASSERT_NE(Get("key" + ToString(i)), "NOT_FOUND");
1314     }
1315   };
1316   Reopen(options);
1317   validateData();
1318   // Insert some data without flush
1319   ASSERT_OK(Put("foo", "foo_v1"));
1320   ASSERT_OK(Put("bar", "bar_v1"));
1321   Reopen(options);
1322   validateData();
1323   ASSERT_EQ(Get("foo"), "foo_v1");
1324   ASSERT_EQ(Get("bar"), "bar_v1");
1325   // Insert again and reopen
1326   ASSERT_OK(Put("foo", "foo_v2"));
1327   ASSERT_OK(Put("bar", "bar_v2"));
1328   Reopen(options);
1329   validateData();
1330   ASSERT_EQ(Get("foo"), "foo_v2");
1331   ASSERT_EQ(Get("bar"), "bar_v2");
1332   // manual flush and insert again
1333   Flush();
1334   ASSERT_EQ(Get("foo"), "foo_v2");
1335   ASSERT_EQ(Get("bar"), "bar_v2");
1336   ASSERT_OK(Put("foo", "foo_v3"));
1337   ASSERT_OK(Put("bar", "bar_v3"));
1338   Reopen(options);
1339   validateData();
1340   ASSERT_EQ(Get("foo"), "foo_v3");
1341   ASSERT_EQ(Get("bar"), "bar_v3");
1342 }
1343 
TEST_F(DBWALTest,RecoverWithoutFlushMultipleCF)1344 TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
1345   const std::string kSmallValue = "v";
1346   const std::string kLargeValue = DummyString(1024);
1347   Options options = CurrentOptions();
1348   options.avoid_flush_during_recovery = true;
1349   options.create_if_missing = false;
1350   options.disable_auto_compactions = true;
1351 
1352   auto countWalFiles = [this]() {
1353     VectorLogPtr log_files;
1354     dbfull()->GetSortedWalFiles(log_files);
1355     return log_files.size();
1356   };
1357 
1358   // Create DB with multiple column families and multiple log files.
1359   CreateAndReopenWithCF({"one", "two"}, options);
1360   ASSERT_OK(Put(0, "key1", kSmallValue));
1361   ASSERT_OK(Put(1, "key2", kLargeValue));
1362   Flush(1);
1363   ASSERT_EQ(1, countWalFiles());
1364   ASSERT_OK(Put(0, "key3", kSmallValue));
1365   ASSERT_OK(Put(2, "key4", kLargeValue));
1366   Flush(2);
1367   ASSERT_EQ(2, countWalFiles());
1368 
1369   // Reopen, insert and flush.
1370   options.db_write_buffer_size = 64 * 1024 * 1024;
1371   ReopenWithColumnFamilies({"default", "one", "two"}, options);
1372   ASSERT_EQ(Get(0, "key1"), kSmallValue);
1373   ASSERT_EQ(Get(1, "key2"), kLargeValue);
1374   ASSERT_EQ(Get(0, "key3"), kSmallValue);
1375   ASSERT_EQ(Get(2, "key4"), kLargeValue);
1376   // Insert more data.
1377   ASSERT_OK(Put(0, "key5", kLargeValue));
1378   ASSERT_OK(Put(1, "key6", kLargeValue));
1379   ASSERT_EQ(3, countWalFiles());
1380   Flush(1);
1381   ASSERT_OK(Put(2, "key7", kLargeValue));
1382   dbfull()->FlushWAL(false);
1383   ASSERT_EQ(4, countWalFiles());
1384 
1385   // Reopen twice and validate.
1386   for (int i = 0; i < 2; i++) {
1387     ReopenWithColumnFamilies({"default", "one", "two"}, options);
1388     ASSERT_EQ(Get(0, "key1"), kSmallValue);
1389     ASSERT_EQ(Get(1, "key2"), kLargeValue);
1390     ASSERT_EQ(Get(0, "key3"), kSmallValue);
1391     ASSERT_EQ(Get(2, "key4"), kLargeValue);
1392     ASSERT_EQ(Get(0, "key5"), kLargeValue);
1393     ASSERT_EQ(Get(1, "key6"), kLargeValue);
1394     ASSERT_EQ(Get(2, "key7"), kLargeValue);
1395     ASSERT_EQ(4, countWalFiles());
1396   }
1397 }
1398 
1399 // In this test we are trying to do the following:
1400 //   1. Create a DB with corrupted WAL log;
1401 //   2. Open with avoid_flush_during_recovery = true;
1402 //   3. Append more data without flushing, which creates new WAL log.
1403 //   4. Open again. See if it can correctly handle previous corruption.
TEST_F(DBWALTest,RecoverFromCorruptedWALWithoutFlush)1404 TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) {
1405   const int jstart = RecoveryTestHelper::kWALFileOffset;
1406   const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1407   const int kAppendKeys = 100;
1408   Options options = CurrentOptions();
1409   options.avoid_flush_during_recovery = true;
1410   options.create_if_missing = false;
1411   options.disable_auto_compactions = true;
1412   options.write_buffer_size = 64 * 1024 * 1024;
1413 
1414   auto getAll = [this]() {
1415     std::vector<std::pair<std::string, std::string>> data;
1416     ReadOptions ropt;
1417     Iterator* iter = dbfull()->NewIterator(ropt);
1418     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1419       data.push_back(
1420           std::make_pair(iter->key().ToString(), iter->value().ToString()));
1421     }
1422     delete iter;
1423     return data;
1424   };
1425   for (auto& mode : wal_recovery_mode_string_map) {
1426     options.wal_recovery_mode = mode.second;
1427     for (auto trunc : {true, false}) {
1428       for (int i = 0; i < 4; i++) {
1429         for (int j = jstart; j < jend; j++) {
1430           // Create corrupted WAL
1431           RecoveryTestHelper::FillData(this, &options);
1432           RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1433                                          /*len%=*/.1, /*wal=*/j, trunc);
1434           // Skip the test if DB won't open.
1435           if (!TryReopen(options).ok()) {
1436             ASSERT_TRUE(options.wal_recovery_mode ==
1437                             WALRecoveryMode::kAbsoluteConsistency ||
1438                         (!trunc &&
1439                          options.wal_recovery_mode ==
1440                              WALRecoveryMode::kTolerateCorruptedTailRecords));
1441             continue;
1442           }
1443           ASSERT_OK(TryReopen(options));
1444           // Append some more data.
1445           for (int k = 0; k < kAppendKeys; k++) {
1446             std::string key = "extra_key" + ToString(k);
1447             std::string value = DummyString(RecoveryTestHelper::kValueSize);
1448             ASSERT_OK(Put(key, value));
1449           }
1450           // Save data for comparison.
1451           auto data = getAll();
1452           // Reopen. Verify data.
1453           ASSERT_OK(TryReopen(options));
1454           auto actual_data = getAll();
1455           ASSERT_EQ(data, actual_data);
1456         }
1457       }
1458     }
1459   }
1460 }
1461 
1462 // Tests that total log size is recovered if we set
1463 // avoid_flush_during_recovery=true.
1464 // Flush should trigger if max_total_wal_size is reached.
TEST_F(DBWALTest,RestoreTotalLogSizeAfterRecoverWithoutFlush)1465 TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
1466   class TestFlushListener : public EventListener {
1467    public:
1468     std::atomic<int> count{0};
1469 
1470     TestFlushListener() = default;
1471 
1472     void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override {
1473       count++;
1474       assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason);
1475     }
1476   };
1477   std::shared_ptr<TestFlushListener> test_listener =
1478       std::make_shared<TestFlushListener>();
1479 
1480   constexpr size_t kKB = 1024;
1481   constexpr size_t kMB = 1024 * 1024;
1482   Options options = CurrentOptions();
1483   options.avoid_flush_during_recovery = true;
1484   options.max_total_wal_size = 1 * kMB;
1485   options.listeners.push_back(test_listener);
1486   // Have to open DB in multi-CF mode to trigger flush when
1487   // max_total_wal_size is reached.
1488   CreateAndReopenWithCF({"one"}, options);
1489   // Write some keys and we will end up with one log file which is slightly
1490   // smaller than 1MB.
1491   std::string value_100k(100 * kKB, 'v');
1492   std::string value_300k(300 * kKB, 'v');
1493   ASSERT_OK(Put(0, "foo", "v1"));
1494   for (int i = 0; i < 9; i++) {
1495     ASSERT_OK(Put(1, "key" + ToString(i), value_100k));
1496   }
1497   // Get log files before reopen.
1498   VectorLogPtr log_files_before;
1499   ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
1500   ASSERT_EQ(1, log_files_before.size());
1501   uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
1502   ASSERT_GT(log_size_before, 900 * kKB);
1503   ASSERT_LT(log_size_before, 1 * kMB);
1504   ReopenWithColumnFamilies({"default", "one"}, options);
1505   // Write one more value to make log larger than 1MB.
1506   ASSERT_OK(Put(1, "bar", value_300k));
1507   // Get log files again. A new log file will be opened.
1508   VectorLogPtr log_files_after_reopen;
1509   ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
1510   ASSERT_EQ(2, log_files_after_reopen.size());
1511   ASSERT_EQ(log_files_before[0]->LogNumber(),
1512             log_files_after_reopen[0]->LogNumber());
1513   ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
1514                 log_files_after_reopen[1]->SizeFileBytes(),
1515             1 * kMB);
1516   // Write one more key to trigger flush.
1517   ASSERT_OK(Put(0, "foo", "v2"));
1518   dbfull()->TEST_WaitForFlushMemTable();
1519   // Flushed two column families.
1520   ASSERT_EQ(2, test_listener->count.load());
1521 }
1522 
1523 #if defined(ROCKSDB_PLATFORM_POSIX)
1524 #if defined(ROCKSDB_FALLOCATE_PRESENT)
1525 // Tests that we will truncate the preallocated space of the last log from
1526 // previous.
TEST_F(DBWALTest,TruncateLastLogAfterRecoverWithoutFlush)1527 TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
1528   constexpr size_t kKB = 1024;
1529   Options options = CurrentOptions();
1530   options.avoid_flush_during_recovery = true;
1531   // Test fallocate support of running file system.
1532   // Skip this test if fallocate is not supported.
1533   std::string fname_test_fallocate = dbname_ + "/preallocate_testfile";
1534   int fd = -1;
1535   do {
1536     fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
1537   } while (fd < 0 && errno == EINTR);
1538   ASSERT_GT(fd, 0);
1539   int alloc_status = fallocate(fd, 0, 0, 1);
1540   int err_number = errno;
1541   close(fd);
1542   ASSERT_OK(options.env->DeleteFile(fname_test_fallocate));
1543   if (err_number == ENOSYS || err_number == EOPNOTSUPP) {
1544     fprintf(stderr, "Skipped preallocated space check: %s\n", strerror(err_number));
1545     return;
1546   }
1547   ASSERT_EQ(0, alloc_status);
1548 
1549   DestroyAndReopen(options);
1550   size_t preallocated_size =
1551       dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
1552   ASSERT_OK(Put("foo", "v1"));
1553   VectorLogPtr log_files_before;
1554   ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
1555   ASSERT_EQ(1, log_files_before.size());
1556   auto& file_before = log_files_before[0];
1557   ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
1558   // The log file has preallocated space.
1559   ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
1560             preallocated_size);
1561   Reopen(options);
1562   VectorLogPtr log_files_after;
1563   ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
1564   ASSERT_EQ(1, log_files_after.size());
1565   ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
1566   // The preallocated space should be truncated.
1567   ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
1568             preallocated_size);
1569 }
1570 #endif  // ROCKSDB_FALLOCATE_PRESENT
1571 #endif  // ROCKSDB_PLATFORM_POSIX
1572 
1573 #endif  // ROCKSDB_LITE
1574 
TEST_F(DBWALTest,WalTermTest)1575 TEST_F(DBWALTest, WalTermTest) {
1576   Options options = CurrentOptions();
1577   options.env = env_;
1578   CreateAndReopenWithCF({"pikachu"}, options);
1579 
1580   ASSERT_OK(Put(1, "foo", "bar"));
1581 
1582   WriteOptions wo;
1583   wo.sync = true;
1584   wo.disableWAL = false;
1585 
1586   WriteBatch batch;
1587   batch.Put("foo", "bar");
1588   batch.MarkWalTerminationPoint();
1589   batch.Put("foo2", "bar2");
1590 
1591   ASSERT_OK(dbfull()->Write(wo, &batch));
1592 
1593   // make sure we can re-open it.
1594   ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
1595   ASSERT_EQ("bar", Get(1, "foo"));
1596   ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
1597 }
1598 }  // namespace ROCKSDB_NAMESPACE
1599 
main(int argc,char ** argv)1600 int main(int argc, char** argv) {
1601   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1602   ::testing::InitGoogleTest(&argc, argv);
1603   return RUN_ALL_TESTS();
1604 }
1605