1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include <map>
9 #include <string>
10 
11 #include "rocksdb/cache.h"
12 #include "rocksdb/write_batch.h"
13 #include "rocksdb/write_buffer_manager.h"
14 
15 #include "db/column_family.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/log_writer.h"
18 #include "db/version_set.h"
19 #include "db/wal_manager.h"
20 #include "env/mock_env.h"
21 #include "file/writable_file_writer.h"
22 #include "table/mock_table.h"
23 #include "test_util/testharness.h"
24 #include "test_util/testutil.h"
25 #include "util/string_util.h"
26 
27 namespace ROCKSDB_NAMESPACE {
28 
29 // TODO(icanadi) mock out VersionSet
30 // TODO(icanadi) move other WalManager-specific tests from db_test here
31 class WalManagerTest : public testing::Test {
32  public:
WalManagerTest()33   WalManagerTest()
34       : env_(new MockEnv(Env::Default())),
35         dbname_(test::PerThreadDBPath("wal_manager_test")),
36         db_options_(),
37         table_cache_(NewLRUCache(50000, 16)),
38         write_buffer_manager_(db_options_.db_write_buffer_size),
39         current_log_number_(0) {
40     DestroyDB(dbname_, Options());
41   }
42 
Init()43   void Init() {
44     ASSERT_OK(env_->CreateDirIfMissing(dbname_));
45     ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_)));
46     db_options_.db_paths.emplace_back(dbname_,
47                                       std::numeric_limits<uint64_t>::max());
48     db_options_.wal_dir = dbname_;
49     db_options_.env = env_.get();
50     fs_.reset(new LegacyFileSystemWrapper(env_.get()));
51     db_options_.fs = fs_;
52 
53     versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
54                                    table_cache_.get(), &write_buffer_manager_,
55                                    &write_controller_,
56                                    /*block_cache_tracer=*/nullptr));
57 
58     wal_manager_.reset(new WalManager(db_options_, env_options_));
59   }
60 
Reopen()61   void Reopen() {
62     wal_manager_.reset(new WalManager(db_options_, env_options_));
63   }
64 
65   // NOT thread safe
Put(const std::string & key,const std::string & value)66   void Put(const std::string& key, const std::string& value) {
67     assert(current_log_writer_.get() != nullptr);
68     uint64_t seq =  versions_->LastSequence() + 1;
69     WriteBatch batch;
70     batch.Put(key, value);
71     WriteBatchInternal::SetSequence(&batch, seq);
72     current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch));
73     versions_->SetLastAllocatedSequence(seq);
74     versions_->SetLastPublishedSequence(seq);
75     versions_->SetLastSequence(seq);
76   }
77 
78   // NOT thread safe
RollTheLog(bool)79   void RollTheLog(bool /*archived*/) {
80     current_log_number_++;
81     std::string fname = ArchivedLogFileName(dbname_, current_log_number_);
82     std::unique_ptr<WritableFile> file;
83     ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_));
84     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
85         NewLegacyWritableFileWrapper(std::move(file)), fname, env_options_));
86     current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
87   }
88 
CreateArchiveLogs(int num_logs,int entries_per_log)89   void CreateArchiveLogs(int num_logs, int entries_per_log) {
90     for (int i = 1; i <= num_logs; ++i) {
91       RollTheLog(true);
92       for (int k = 0; k < entries_per_log; ++k) {
93         Put(ToString(k), std::string(1024, 'a'));
94       }
95     }
96   }
97 
OpenTransactionLogIter(const SequenceNumber seq)98   std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
99       const SequenceNumber seq) {
100     std::unique_ptr<TransactionLogIterator> iter;
101     Status status = wal_manager_->GetUpdatesSince(
102         seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get());
103     EXPECT_OK(status);
104     return iter;
105   }
106 
107   std::unique_ptr<MockEnv> env_;
108   std::string dbname_;
109   ImmutableDBOptions db_options_;
110   WriteController write_controller_;
111   EnvOptions env_options_;
112   std::shared_ptr<Cache> table_cache_;
113   WriteBufferManager write_buffer_manager_;
114   std::unique_ptr<VersionSet> versions_;
115   std::unique_ptr<WalManager> wal_manager_;
116   std::shared_ptr<LegacyFileSystemWrapper> fs_;
117 
118   std::unique_ptr<log::Writer> current_log_writer_;
119   uint64_t current_log_number_;
120 };
121 
TEST_F(WalManagerTest,ReadFirstRecordCache)122 TEST_F(WalManagerTest, ReadFirstRecordCache) {
123   Init();
124   std::string path = dbname_ + "/000001.log";
125   std::unique_ptr<WritableFile> file;
126   ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
127 
128   SequenceNumber s;
129   ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
130   ASSERT_EQ(s, 0U);
131 
132   ASSERT_OK(
133       wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
134   ASSERT_EQ(s, 0U);
135 
136   std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
137       NewLegacyWritableFileWrapper(std::move(file)), path, EnvOptions()));
138   log::Writer writer(std::move(file_writer), 1,
139                      db_options_.recycle_log_file_num > 0);
140   WriteBatch batch;
141   batch.Put("foo", "bar");
142   WriteBatchInternal::SetSequence(&batch, 10);
143   writer.AddRecord(WriteBatchInternal::Contents(&batch));
144 
145   // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here.
146   // Waiting for lei to finish with db_test
147   // env_->count_sequential_reads_ = true;
148   // sequential_read_counter_ sanity test
149   // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
150 
151   ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
152   ASSERT_EQ(s, 10U);
153   // did a read
154   // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
155   // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
156 
157   ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
158   ASSERT_EQ(s, 10U);
159   // no new reads since the value is cached
160   // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
161   // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
162 }
163 
164 namespace {
GetLogDirSize(std::string dir_path,Env * env)165 uint64_t GetLogDirSize(std::string dir_path, Env* env) {
166   uint64_t dir_size = 0;
167   std::vector<std::string> files;
168   env->GetChildren(dir_path, &files);
169   for (auto& f : files) {
170     uint64_t number;
171     FileType type;
172     if (ParseFileName(f, &number, &type) && type == kLogFile) {
173       std::string const file_path = dir_path + "/" + f;
174       uint64_t file_size;
175       env->GetFileSize(file_path, &file_size);
176       dir_size += file_size;
177     }
178   }
179   return dir_size;
180 }
ListSpecificFiles(Env * env,const std::string & path,const FileType expected_file_type)181 std::vector<std::uint64_t> ListSpecificFiles(
182     Env* env, const std::string& path, const FileType expected_file_type) {
183   std::vector<std::string> files;
184   std::vector<uint64_t> file_numbers;
185   env->GetChildren(path, &files);
186   uint64_t number;
187   FileType type;
188   for (size_t i = 0; i < files.size(); ++i) {
189     if (ParseFileName(files[i], &number, &type)) {
190       if (type == expected_file_type) {
191         file_numbers.push_back(number);
192       }
193     }
194   }
195   return file_numbers;
196 }
197 
CountRecords(TransactionLogIterator * iter)198 int CountRecords(TransactionLogIterator* iter) {
199   int count = 0;
200   SequenceNumber lastSequence = 0;
201   BatchResult res;
202   while (iter->Valid()) {
203     res = iter->GetBatch();
204     EXPECT_TRUE(res.sequence > lastSequence);
205     ++count;
206     lastSequence = res.sequence;
207     EXPECT_OK(iter->status());
208     iter->Next();
209   }
210   return count;
211 }
212 }  // namespace
213 
TEST_F(WalManagerTest,WALArchivalSizeLimit)214 TEST_F(WalManagerTest, WALArchivalSizeLimit) {
215   db_options_.wal_ttl_seconds = 0;
216   db_options_.wal_size_limit_mb = 1000;
217   Init();
218 
219   // TEST : Create WalManager with huge size limit and no ttl.
220   // Create some archived files and call PurgeObsoleteWALFiles().
221   // Count the archived log files that survived.
222   // Assert that all of them did.
223   // Change size limit. Re-open WalManager.
224   // Assert that archive is not greater than wal_size_limit_mb after
225   // PurgeObsoleteWALFiles()
226   // Set ttl and time_to_check_ to small values. Re-open db.
227   // Assert that there are no archived logs left.
228 
229   std::string archive_dir = ArchivalDirectory(dbname_);
230   CreateArchiveLogs(20, 5000);
231 
232   std::vector<std::uint64_t> log_files =
233       ListSpecificFiles(env_.get(), archive_dir, kLogFile);
234   ASSERT_EQ(log_files.size(), 20U);
235 
236   db_options_.wal_size_limit_mb = 8;
237   Reopen();
238   wal_manager_->PurgeObsoleteWALFiles();
239 
240   uint64_t archive_size = GetLogDirSize(archive_dir, env_.get());
241   ASSERT_TRUE(archive_size <= db_options_.wal_size_limit_mb * 1024 * 1024);
242 
243   db_options_.wal_ttl_seconds = 1;
244   env_->FakeSleepForMicroseconds(2 * 1000 * 1000);
245   Reopen();
246   wal_manager_->PurgeObsoleteWALFiles();
247 
248   log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile);
249   ASSERT_TRUE(log_files.empty());
250 }
251 
TEST_F(WalManagerTest,WALArchivalTtl)252 TEST_F(WalManagerTest, WALArchivalTtl) {
253   db_options_.wal_ttl_seconds = 1000;
254   Init();
255 
256   // TEST : Create WalManager with a ttl and no size limit.
257   // Create some archived log files and call PurgeObsoleteWALFiles().
258   // Assert that files are not deleted
259   // Reopen db with small ttl.
260   // Assert that all archived logs was removed.
261 
262   std::string archive_dir = ArchivalDirectory(dbname_);
263   CreateArchiveLogs(20, 5000);
264 
265   std::vector<uint64_t> log_files =
266       ListSpecificFiles(env_.get(), archive_dir, kLogFile);
267   ASSERT_GT(log_files.size(), 0U);
268 
269   db_options_.wal_ttl_seconds = 1;
270   env_->FakeSleepForMicroseconds(3 * 1000 * 1000);
271   Reopen();
272   wal_manager_->PurgeObsoleteWALFiles();
273 
274   log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile);
275   ASSERT_TRUE(log_files.empty());
276 }
277 
TEST_F(WalManagerTest,TransactionLogIteratorMoveOverZeroFiles)278 TEST_F(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) {
279   Init();
280   RollTheLog(false);
281   Put("key1", std::string(1024, 'a'));
282   // Create a zero record WAL file.
283   RollTheLog(false);
284   RollTheLog(false);
285 
286   Put("key2", std::string(1024, 'a'));
287 
288   auto iter = OpenTransactionLogIter(0);
289   ASSERT_EQ(2, CountRecords(iter.get()));
290 }
291 
TEST_F(WalManagerTest,TransactionLogIteratorJustEmptyFile)292 TEST_F(WalManagerTest, TransactionLogIteratorJustEmptyFile) {
293   Init();
294   RollTheLog(false);
295   auto iter = OpenTransactionLogIter(0);
296   // Check that an empty iterator is returned
297   ASSERT_TRUE(!iter->Valid());
298 }
299 
TEST_F(WalManagerTest,TransactionLogIteratorNewFileWhileScanning)300 TEST_F(WalManagerTest, TransactionLogIteratorNewFileWhileScanning) {
301   Init();
302   CreateArchiveLogs(2, 100);
303   auto iter = OpenTransactionLogIter(0);
304   CreateArchiveLogs(1, 100);
305   int i = 0;
306   for (; iter->Valid(); iter->Next()) {
307     i++;
308   }
309   ASSERT_EQ(i, 200);
310   // A new log file was added after the iterator was created.
311   // TryAgain indicates a new iterator is needed to fetch the new data
312   ASSERT_TRUE(iter->status().IsTryAgain());
313 
314   iter = OpenTransactionLogIter(0);
315   i = 0;
316   for (; iter->Valid(); iter->Next()) {
317     i++;
318   }
319   ASSERT_EQ(i, 300);
320   ASSERT_TRUE(iter->status().ok());
321 }
322 
323 }  // namespace ROCKSDB_NAMESPACE
324 
main(int argc,char ** argv)325 int main(int argc, char** argv) {
326   ::testing::InitGoogleTest(&argc, argv);
327   return RUN_ALL_TESTS();
328 }
329 
330 #else
331 #include <stdio.h>
332 
main(int,char **)333 int main(int /*argc*/, char** /*argv*/) {
334   fprintf(stderr, "SKIPPED as WalManager is not supported in ROCKSDB_LITE\n");
335   return 0;
336 }
337 
338 #endif  // !ROCKSDB_LITE
339