1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include <map>
9 #include <string>
10
11 #include "rocksdb/cache.h"
12 #include "rocksdb/write_batch.h"
13 #include "rocksdb/write_buffer_manager.h"
14
15 #include "db/column_family.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/log_writer.h"
18 #include "db/version_set.h"
19 #include "db/wal_manager.h"
20 #include "env/mock_env.h"
21 #include "file/writable_file_writer.h"
22 #include "table/mock_table.h"
23 #include "test_util/testharness.h"
24 #include "test_util/testutil.h"
25 #include "util/string_util.h"
26
27 namespace ROCKSDB_NAMESPACE {
28
29 // TODO(icanadi) mock out VersionSet
30 // TODO(icanadi) move other WalManager-specific tests from db_test here
31 class WalManagerTest : public testing::Test {
32 public:
WalManagerTest()33 WalManagerTest()
34 : env_(new MockEnv(Env::Default())),
35 dbname_(test::PerThreadDBPath("wal_manager_test")),
36 db_options_(),
37 table_cache_(NewLRUCache(50000, 16)),
38 write_buffer_manager_(db_options_.db_write_buffer_size),
39 current_log_number_(0) {
40 DestroyDB(dbname_, Options());
41 }
42
Init()43 void Init() {
44 ASSERT_OK(env_->CreateDirIfMissing(dbname_));
45 ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_)));
46 db_options_.db_paths.emplace_back(dbname_,
47 std::numeric_limits<uint64_t>::max());
48 db_options_.wal_dir = dbname_;
49 db_options_.env = env_.get();
50 fs_.reset(new LegacyFileSystemWrapper(env_.get()));
51 db_options_.fs = fs_;
52
53 versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
54 table_cache_.get(), &write_buffer_manager_,
55 &write_controller_,
56 /*block_cache_tracer=*/nullptr));
57
58 wal_manager_.reset(new WalManager(db_options_, env_options_));
59 }
60
Reopen()61 void Reopen() {
62 wal_manager_.reset(new WalManager(db_options_, env_options_));
63 }
64
65 // NOT thread safe
Put(const std::string & key,const std::string & value)66 void Put(const std::string& key, const std::string& value) {
67 assert(current_log_writer_.get() != nullptr);
68 uint64_t seq = versions_->LastSequence() + 1;
69 WriteBatch batch;
70 batch.Put(key, value);
71 WriteBatchInternal::SetSequence(&batch, seq);
72 current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch));
73 versions_->SetLastAllocatedSequence(seq);
74 versions_->SetLastPublishedSequence(seq);
75 versions_->SetLastSequence(seq);
76 }
77
78 // NOT thread safe
RollTheLog(bool)79 void RollTheLog(bool /*archived*/) {
80 current_log_number_++;
81 std::string fname = ArchivedLogFileName(dbname_, current_log_number_);
82 std::unique_ptr<WritableFile> file;
83 ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_));
84 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
85 NewLegacyWritableFileWrapper(std::move(file)), fname, env_options_));
86 current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
87 }
88
CreateArchiveLogs(int num_logs,int entries_per_log)89 void CreateArchiveLogs(int num_logs, int entries_per_log) {
90 for (int i = 1; i <= num_logs; ++i) {
91 RollTheLog(true);
92 for (int k = 0; k < entries_per_log; ++k) {
93 Put(ToString(k), std::string(1024, 'a'));
94 }
95 }
96 }
97
OpenTransactionLogIter(const SequenceNumber seq)98 std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
99 const SequenceNumber seq) {
100 std::unique_ptr<TransactionLogIterator> iter;
101 Status status = wal_manager_->GetUpdatesSince(
102 seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get());
103 EXPECT_OK(status);
104 return iter;
105 }
106
107 std::unique_ptr<MockEnv> env_;
108 std::string dbname_;
109 ImmutableDBOptions db_options_;
110 WriteController write_controller_;
111 EnvOptions env_options_;
112 std::shared_ptr<Cache> table_cache_;
113 WriteBufferManager write_buffer_manager_;
114 std::unique_ptr<VersionSet> versions_;
115 std::unique_ptr<WalManager> wal_manager_;
116 std::shared_ptr<LegacyFileSystemWrapper> fs_;
117
118 std::unique_ptr<log::Writer> current_log_writer_;
119 uint64_t current_log_number_;
120 };
121
TEST_F(WalManagerTest,ReadFirstRecordCache)122 TEST_F(WalManagerTest, ReadFirstRecordCache) {
123 Init();
124 std::string path = dbname_ + "/000001.log";
125 std::unique_ptr<WritableFile> file;
126 ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
127
128 SequenceNumber s;
129 ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
130 ASSERT_EQ(s, 0U);
131
132 ASSERT_OK(
133 wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
134 ASSERT_EQ(s, 0U);
135
136 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
137 NewLegacyWritableFileWrapper(std::move(file)), path, EnvOptions()));
138 log::Writer writer(std::move(file_writer), 1,
139 db_options_.recycle_log_file_num > 0);
140 WriteBatch batch;
141 batch.Put("foo", "bar");
142 WriteBatchInternal::SetSequence(&batch, 10);
143 writer.AddRecord(WriteBatchInternal::Contents(&batch));
144
145 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here.
146 // Waiting for lei to finish with db_test
147 // env_->count_sequential_reads_ = true;
148 // sequential_read_counter_ sanity test
149 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
150
151 ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
152 ASSERT_EQ(s, 10U);
153 // did a read
154 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
155 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
156
157 ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
158 ASSERT_EQ(s, 10U);
159 // no new reads since the value is cached
160 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
161 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
162 }
163
164 namespace {
GetLogDirSize(std::string dir_path,Env * env)165 uint64_t GetLogDirSize(std::string dir_path, Env* env) {
166 uint64_t dir_size = 0;
167 std::vector<std::string> files;
168 env->GetChildren(dir_path, &files);
169 for (auto& f : files) {
170 uint64_t number;
171 FileType type;
172 if (ParseFileName(f, &number, &type) && type == kLogFile) {
173 std::string const file_path = dir_path + "/" + f;
174 uint64_t file_size;
175 env->GetFileSize(file_path, &file_size);
176 dir_size += file_size;
177 }
178 }
179 return dir_size;
180 }
ListSpecificFiles(Env * env,const std::string & path,const FileType expected_file_type)181 std::vector<std::uint64_t> ListSpecificFiles(
182 Env* env, const std::string& path, const FileType expected_file_type) {
183 std::vector<std::string> files;
184 std::vector<uint64_t> file_numbers;
185 env->GetChildren(path, &files);
186 uint64_t number;
187 FileType type;
188 for (size_t i = 0; i < files.size(); ++i) {
189 if (ParseFileName(files[i], &number, &type)) {
190 if (type == expected_file_type) {
191 file_numbers.push_back(number);
192 }
193 }
194 }
195 return file_numbers;
196 }
197
CountRecords(TransactionLogIterator * iter)198 int CountRecords(TransactionLogIterator* iter) {
199 int count = 0;
200 SequenceNumber lastSequence = 0;
201 BatchResult res;
202 while (iter->Valid()) {
203 res = iter->GetBatch();
204 EXPECT_TRUE(res.sequence > lastSequence);
205 ++count;
206 lastSequence = res.sequence;
207 EXPECT_OK(iter->status());
208 iter->Next();
209 }
210 return count;
211 }
212 } // namespace
213
TEST_F(WalManagerTest,WALArchivalSizeLimit)214 TEST_F(WalManagerTest, WALArchivalSizeLimit) {
215 db_options_.wal_ttl_seconds = 0;
216 db_options_.wal_size_limit_mb = 1000;
217 Init();
218
219 // TEST : Create WalManager with huge size limit and no ttl.
220 // Create some archived files and call PurgeObsoleteWALFiles().
221 // Count the archived log files that survived.
222 // Assert that all of them did.
223 // Change size limit. Re-open WalManager.
224 // Assert that archive is not greater than wal_size_limit_mb after
225 // PurgeObsoleteWALFiles()
226 // Set ttl and time_to_check_ to small values. Re-open db.
227 // Assert that there are no archived logs left.
228
229 std::string archive_dir = ArchivalDirectory(dbname_);
230 CreateArchiveLogs(20, 5000);
231
232 std::vector<std::uint64_t> log_files =
233 ListSpecificFiles(env_.get(), archive_dir, kLogFile);
234 ASSERT_EQ(log_files.size(), 20U);
235
236 db_options_.wal_size_limit_mb = 8;
237 Reopen();
238 wal_manager_->PurgeObsoleteWALFiles();
239
240 uint64_t archive_size = GetLogDirSize(archive_dir, env_.get());
241 ASSERT_TRUE(archive_size <= db_options_.wal_size_limit_mb * 1024 * 1024);
242
243 db_options_.wal_ttl_seconds = 1;
244 env_->FakeSleepForMicroseconds(2 * 1000 * 1000);
245 Reopen();
246 wal_manager_->PurgeObsoleteWALFiles();
247
248 log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile);
249 ASSERT_TRUE(log_files.empty());
250 }
251
TEST_F(WalManagerTest,WALArchivalTtl)252 TEST_F(WalManagerTest, WALArchivalTtl) {
253 db_options_.wal_ttl_seconds = 1000;
254 Init();
255
256 // TEST : Create WalManager with a ttl and no size limit.
257 // Create some archived log files and call PurgeObsoleteWALFiles().
258 // Assert that files are not deleted
259 // Reopen db with small ttl.
260 // Assert that all archived logs was removed.
261
262 std::string archive_dir = ArchivalDirectory(dbname_);
263 CreateArchiveLogs(20, 5000);
264
265 std::vector<uint64_t> log_files =
266 ListSpecificFiles(env_.get(), archive_dir, kLogFile);
267 ASSERT_GT(log_files.size(), 0U);
268
269 db_options_.wal_ttl_seconds = 1;
270 env_->FakeSleepForMicroseconds(3 * 1000 * 1000);
271 Reopen();
272 wal_manager_->PurgeObsoleteWALFiles();
273
274 log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile);
275 ASSERT_TRUE(log_files.empty());
276 }
277
TEST_F(WalManagerTest,TransactionLogIteratorMoveOverZeroFiles)278 TEST_F(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) {
279 Init();
280 RollTheLog(false);
281 Put("key1", std::string(1024, 'a'));
282 // Create a zero record WAL file.
283 RollTheLog(false);
284 RollTheLog(false);
285
286 Put("key2", std::string(1024, 'a'));
287
288 auto iter = OpenTransactionLogIter(0);
289 ASSERT_EQ(2, CountRecords(iter.get()));
290 }
291
TEST_F(WalManagerTest,TransactionLogIteratorJustEmptyFile)292 TEST_F(WalManagerTest, TransactionLogIteratorJustEmptyFile) {
293 Init();
294 RollTheLog(false);
295 auto iter = OpenTransactionLogIter(0);
296 // Check that an empty iterator is returned
297 ASSERT_TRUE(!iter->Valid());
298 }
299
TEST_F(WalManagerTest,TransactionLogIteratorNewFileWhileScanning)300 TEST_F(WalManagerTest, TransactionLogIteratorNewFileWhileScanning) {
301 Init();
302 CreateArchiveLogs(2, 100);
303 auto iter = OpenTransactionLogIter(0);
304 CreateArchiveLogs(1, 100);
305 int i = 0;
306 for (; iter->Valid(); iter->Next()) {
307 i++;
308 }
309 ASSERT_EQ(i, 200);
310 // A new log file was added after the iterator was created.
311 // TryAgain indicates a new iterator is needed to fetch the new data
312 ASSERT_TRUE(iter->status().IsTryAgain());
313
314 iter = OpenTransactionLogIter(0);
315 i = 0;
316 for (; iter->Valid(); iter->Next()) {
317 i++;
318 }
319 ASSERT_EQ(i, 300);
320 ASSERT_TRUE(iter->status().ok());
321 }
322
323 } // namespace ROCKSDB_NAMESPACE
324
main(int argc,char ** argv)325 int main(int argc, char** argv) {
326 ::testing::InitGoogleTest(&argc, argv);
327 return RUN_ALL_TESTS();
328 }
329
330 #else
331 #include <stdio.h>
332
main(int,char **)333 int main(int /*argc*/, char** /*argv*/) {
334 fprintf(stderr, "SKIPPED as WalManager is not supported in ROCKSDB_LITE\n");
335 return 0;
336 }
337
338 #endif // !ROCKSDB_LITE
339