1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_impl/db_impl_secondary.h"
11 #include "db/db_test_util.h"
12 #include "port/stack_trace.h"
13 #include "test_util/fault_injection_test_env.h"
14 #include "test_util/sync_point.h"
15
16 namespace ROCKSDB_NAMESPACE {
17
18 #ifndef ROCKSDB_LITE
19 class DBSecondaryTest : public DBTestBase {
20 public:
DBSecondaryTest()21 DBSecondaryTest()
22 : DBTestBase("/db_secondary_test"),
23 secondary_path_(),
24 handles_secondary_(),
25 db_secondary_(nullptr) {
26 secondary_path_ =
27 test::PerThreadDBPath(env_, "/db_secondary_test_secondary");
28 }
29
~DBSecondaryTest()30 ~DBSecondaryTest() override {
31 CloseSecondary();
32 if (getenv("KEEP_DB") != nullptr) {
33 fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str());
34 } else {
35 Options options;
36 options.env = env_;
37 EXPECT_OK(DestroyDB(secondary_path_, options));
38 }
39 }
40
41 protected:
ReopenAsSecondary(const Options & options)42 Status ReopenAsSecondary(const Options& options) {
43 return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_);
44 }
45
46 void OpenSecondary(const Options& options);
47
48 void OpenSecondaryWithColumnFamilies(
49 const std::vector<std::string>& column_families, const Options& options);
50
CloseSecondary()51 void CloseSecondary() {
52 for (auto h : handles_secondary_) {
53 db_secondary_->DestroyColumnFamilyHandle(h);
54 }
55 handles_secondary_.clear();
56 delete db_secondary_;
57 db_secondary_ = nullptr;
58 }
59
db_secondary_full()60 DBImplSecondary* db_secondary_full() {
61 return static_cast<DBImplSecondary*>(db_secondary_);
62 }
63
64 void CheckFileTypeCounts(const std::string& dir, int expected_log,
65 int expected_sst, int expected_manifest) const;
66
67 std::string secondary_path_;
68 std::vector<ColumnFamilyHandle*> handles_secondary_;
69 DB* db_secondary_;
70 };
71
OpenSecondary(const Options & options)72 void DBSecondaryTest::OpenSecondary(const Options& options) {
73 Status s =
74 DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
75 ASSERT_OK(s);
76 }
77
OpenSecondaryWithColumnFamilies(const std::vector<std::string> & column_families,const Options & options)78 void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
79 const std::vector<std::string>& column_families, const Options& options) {
80 std::vector<ColumnFamilyDescriptor> cf_descs;
81 cf_descs.emplace_back(kDefaultColumnFamilyName, options);
82 for (const auto& cf_name : column_families) {
83 cf_descs.emplace_back(cf_name, options);
84 }
85 Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs,
86 &handles_secondary_, &db_secondary_);
87 ASSERT_OK(s);
88 }
89
CheckFileTypeCounts(const std::string & dir,int expected_log,int expected_sst,int expected_manifest) const90 void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir,
91 int expected_log, int expected_sst,
92 int expected_manifest) const {
93 std::vector<std::string> filenames;
94 env_->GetChildren(dir, &filenames);
95
96 int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0;
97 for (auto file : filenames) {
98 uint64_t number;
99 FileType type;
100 if (ParseFileName(file, &number, &type)) {
101 log_cnt += (type == kLogFile);
102 sst_cnt += (type == kTableFile);
103 manifest_cnt += (type == kDescriptorFile);
104 }
105 }
106 ASSERT_EQ(expected_log, log_cnt);
107 ASSERT_EQ(expected_sst, sst_cnt);
108 ASSERT_EQ(expected_manifest, manifest_cnt);
109 }
110
TEST_F(DBSecondaryTest,ReopenAsSecondary)111 TEST_F(DBSecondaryTest, ReopenAsSecondary) {
112 Options options;
113 options.env = env_;
114 Reopen(options);
115 ASSERT_OK(Put("foo", "foo_value"));
116 ASSERT_OK(Put("bar", "bar_value"));
117 ASSERT_OK(dbfull()->Flush(FlushOptions()));
118 Close();
119
120 ASSERT_OK(ReopenAsSecondary(options));
121 ASSERT_EQ("foo_value", Get("foo"));
122 ASSERT_EQ("bar_value", Get("bar"));
123 ReadOptions ropts;
124 ropts.verify_checksums = true;
125 auto db1 = static_cast<DBImplSecondary*>(db_);
126 ASSERT_NE(nullptr, db1);
127 Iterator* iter = db1->NewIterator(ropts);
128 ASSERT_NE(nullptr, iter);
129 size_t count = 0;
130 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
131 if (0 == count) {
132 ASSERT_EQ("bar", iter->key().ToString());
133 ASSERT_EQ("bar_value", iter->value().ToString());
134 } else if (1 == count) {
135 ASSERT_EQ("foo", iter->key().ToString());
136 ASSERT_EQ("foo_value", iter->value().ToString());
137 }
138 ++count;
139 }
140 delete iter;
141 ASSERT_EQ(2, count);
142 }
143
TEST_F(DBSecondaryTest,OpenAsSecondary)144 TEST_F(DBSecondaryTest, OpenAsSecondary) {
145 Options options;
146 options.env = env_;
147 options.level0_file_num_compaction_trigger = 4;
148 Reopen(options);
149 for (int i = 0; i < 3; ++i) {
150 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
151 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
152 ASSERT_OK(Flush());
153 }
154 Options options1;
155 options1.env = env_;
156 options1.max_open_files = -1;
157 OpenSecondary(options1);
158 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
159 ASSERT_OK(dbfull()->TEST_WaitForCompact());
160
161 ReadOptions ropts;
162 ropts.verify_checksums = true;
163 const auto verify_db_func = [&](const std::string& foo_val,
164 const std::string& bar_val) {
165 std::string value;
166 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
167 ASSERT_EQ(foo_val, value);
168 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
169 ASSERT_EQ(bar_val, value);
170 Iterator* iter = db_secondary_->NewIterator(ropts);
171 ASSERT_NE(nullptr, iter);
172 iter->Seek("foo");
173 ASSERT_TRUE(iter->Valid());
174 ASSERT_EQ("foo", iter->key().ToString());
175 ASSERT_EQ(foo_val, iter->value().ToString());
176 iter->Seek("bar");
177 ASSERT_TRUE(iter->Valid());
178 ASSERT_EQ("bar", iter->key().ToString());
179 ASSERT_EQ(bar_val, iter->value().ToString());
180 size_t count = 0;
181 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
182 ++count;
183 }
184 ASSERT_EQ(2, count);
185 delete iter;
186 };
187
188 verify_db_func("foo_value2", "bar_value2");
189
190 ASSERT_OK(Put("foo", "new_foo_value"));
191 ASSERT_OK(Put("bar", "new_bar_value"));
192 ASSERT_OK(Flush());
193
194 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
195 verify_db_func("new_foo_value", "new_bar_value");
196 }
197
198 namespace {
199 class TraceFileEnv : public EnvWrapper {
200 public:
TraceFileEnv(Env * _target)201 explicit TraceFileEnv(Env* _target) : EnvWrapper(_target) {}
NewRandomAccessFile(const std::string & f,std::unique_ptr<RandomAccessFile> * r,const EnvOptions & env_options)202 Status NewRandomAccessFile(const std::string& f,
203 std::unique_ptr<RandomAccessFile>* r,
204 const EnvOptions& env_options) override {
205 class TracedRandomAccessFile : public RandomAccessFile {
206 public:
207 TracedRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target,
208 std::atomic<int>& counter)
209 : target_(std::move(target)), files_closed_(counter) {}
210 ~TracedRandomAccessFile() override {
211 files_closed_.fetch_add(1, std::memory_order_relaxed);
212 }
213 Status Read(uint64_t offset, size_t n, Slice* result,
214 char* scratch) const override {
215 return target_->Read(offset, n, result, scratch);
216 }
217
218 private:
219 std::unique_ptr<RandomAccessFile> target_;
220 std::atomic<int>& files_closed_;
221 };
222 Status s = target()->NewRandomAccessFile(f, r, env_options);
223 if (s.ok()) {
224 r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_));
225 }
226 return s;
227 }
228
files_closed() const229 int files_closed() const {
230 return files_closed_.load(std::memory_order_relaxed);
231 }
232
233 private:
234 std::atomic<int> files_closed_{0};
235 };
236 } // namespace
237
TEST_F(DBSecondaryTest,SecondaryCloseFiles)238 TEST_F(DBSecondaryTest, SecondaryCloseFiles) {
239 Options options;
240 options.env = env_;
241 options.max_open_files = 1;
242 options.disable_auto_compactions = true;
243 Reopen(options);
244 Options options1;
245 std::unique_ptr<Env> traced_env(new TraceFileEnv(env_));
246 options1.env = traced_env.get();
247 OpenSecondary(options1);
248
249 static const auto verify_db = [&]() {
250 std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
251 std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(ReadOptions()));
252 for (iter1->SeekToFirst(), iter2->SeekToFirst();
253 iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
254 ASSERT_EQ(iter1->key(), iter2->key());
255 ASSERT_EQ(iter1->value(), iter2->value());
256 }
257 ASSERT_FALSE(iter1->Valid());
258 ASSERT_FALSE(iter2->Valid());
259 };
260
261 ASSERT_OK(Put("a", "value"));
262 ASSERT_OK(Put("c", "value"));
263 ASSERT_OK(Flush());
264 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
265 verify_db();
266
267 ASSERT_OK(Put("b", "value"));
268 ASSERT_OK(Put("d", "value"));
269 ASSERT_OK(Flush());
270 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
271 verify_db();
272
273 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
274 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
275 ASSERT_EQ(2, static_cast<TraceFileEnv*>(traced_env.get())->files_closed());
276
277 Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}});
278 ASSERT_TRUE(s.IsNotSupported());
279 CloseSecondary();
280 }
281
TEST_F(DBSecondaryTest,OpenAsSecondaryWALTailing)282 TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
283 Options options;
284 options.env = env_;
285 options.level0_file_num_compaction_trigger = 4;
286 Reopen(options);
287 for (int i = 0; i < 3; ++i) {
288 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
289 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
290 }
291 Options options1;
292 options1.env = env_;
293 options1.max_open_files = -1;
294 OpenSecondary(options1);
295
296 ReadOptions ropts;
297 ropts.verify_checksums = true;
298 const auto verify_db_func = [&](const std::string& foo_val,
299 const std::string& bar_val) {
300 std::string value;
301 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
302 ASSERT_EQ(foo_val, value);
303 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
304 ASSERT_EQ(bar_val, value);
305 Iterator* iter = db_secondary_->NewIterator(ropts);
306 ASSERT_NE(nullptr, iter);
307 iter->Seek("foo");
308 ASSERT_TRUE(iter->Valid());
309 ASSERT_EQ("foo", iter->key().ToString());
310 ASSERT_EQ(foo_val, iter->value().ToString());
311 iter->Seek("bar");
312 ASSERT_TRUE(iter->Valid());
313 ASSERT_EQ("bar", iter->key().ToString());
314 ASSERT_EQ(bar_val, iter->value().ToString());
315 size_t count = 0;
316 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
317 ++count;
318 }
319 ASSERT_EQ(2, count);
320 delete iter;
321 };
322
323 verify_db_func("foo_value2", "bar_value2");
324
325 ASSERT_OK(Put("foo", "new_foo_value"));
326 ASSERT_OK(Put("bar", "new_bar_value"));
327
328 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
329 verify_db_func("new_foo_value", "new_bar_value");
330
331 ASSERT_OK(Flush());
332 ASSERT_OK(Put("foo", "new_foo_value_1"));
333 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
334 verify_db_func("new_foo_value_1", "new_bar_value");
335 }
336
TEST_F(DBSecondaryTest,OpenWithNonExistColumnFamily)337 TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
338 Options options;
339 options.env = env_;
340 CreateAndReopenWithCF({"pikachu"}, options);
341
342 Options options1;
343 options1.env = env_;
344 options1.max_open_files = -1;
345 std::vector<ColumnFamilyDescriptor> cf_descs;
346 cf_descs.emplace_back(kDefaultColumnFamilyName, options1);
347 cf_descs.emplace_back("pikachu", options1);
348 cf_descs.emplace_back("eevee", options1);
349 Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs,
350 &handles_secondary_, &db_secondary_);
351 ASSERT_NOK(s);
352 }
353
TEST_F(DBSecondaryTest,OpenWithSubsetOfColumnFamilies)354 TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) {
355 Options options;
356 options.env = env_;
357 CreateAndReopenWithCF({"pikachu"}, options);
358 Options options1;
359 options1.env = env_;
360 options1.max_open_files = -1;
361 OpenSecondary(options1);
362 ASSERT_EQ(0, handles_secondary_.size());
363 ASSERT_NE(nullptr, db_secondary_);
364
365 ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value"));
366 ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value"));
367 ASSERT_OK(Flush(0 /*cf*/));
368 ASSERT_OK(Flush(1 /*cf*/));
369 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
370 ReadOptions ropts;
371 ropts.verify_checksums = true;
372 std::string value;
373 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
374 ASSERT_EQ("foo_value", value);
375 }
376
TEST_F(DBSecondaryTest,SwitchToNewManifestDuringOpen)377 TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) {
378 Options options;
379 options.env = env_;
380 Reopen(options);
381 Close();
382
383 SyncPoint::GetInstance()->DisableProcessing();
384 SyncPoint::GetInstance()->ClearAllCallBacks();
385 SyncPoint::GetInstance()->LoadDependency(
386 {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
387 "VersionSet::ProcessManifestWrites:BeforeNewManifest"},
388 {"VersionSet::ProcessManifestWrites:AfterNewManifest",
389 "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
390 "1"}});
391 SyncPoint::GetInstance()->EnableProcessing();
392
393 // Make sure db calls RecoverLogFiles so as to trigger a manifest write,
394 // which causes the db to switch to a new MANIFEST upon start.
395 port::Thread ro_db_thread([&]() {
396 Options options1;
397 options1.env = env_;
398 options1.max_open_files = -1;
399 OpenSecondary(options1);
400 CloseSecondary();
401 });
402 Reopen(options);
403 ro_db_thread.join();
404 }
405
TEST_F(DBSecondaryTest,MissingTableFileDuringOpen)406 TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) {
407 Options options;
408 options.env = env_;
409 options.level0_file_num_compaction_trigger = 4;
410 Reopen(options);
411 for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
412 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
413 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
414 ASSERT_OK(dbfull()->Flush(FlushOptions()));
415 }
416 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
417 ASSERT_OK(dbfull()->TEST_WaitForCompact());
418 Options options1;
419 options1.env = env_;
420 options1.max_open_files = -1;
421 OpenSecondary(options1);
422 ReadOptions ropts;
423 ropts.verify_checksums = true;
424 std::string value;
425 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
426 ASSERT_EQ("foo_value" +
427 std::to_string(options.level0_file_num_compaction_trigger - 1),
428 value);
429 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
430 ASSERT_EQ("bar_value" +
431 std::to_string(options.level0_file_num_compaction_trigger - 1),
432 value);
433 Iterator* iter = db_secondary_->NewIterator(ropts);
434 ASSERT_NE(nullptr, iter);
435 iter->Seek("bar");
436 ASSERT_TRUE(iter->Valid());
437 ASSERT_EQ("bar", iter->key().ToString());
438 ASSERT_EQ("bar_value" +
439 std::to_string(options.level0_file_num_compaction_trigger - 1),
440 iter->value().ToString());
441 iter->Seek("foo");
442 ASSERT_TRUE(iter->Valid());
443 ASSERT_EQ("foo", iter->key().ToString());
444 ASSERT_EQ("foo_value" +
445 std::to_string(options.level0_file_num_compaction_trigger - 1),
446 iter->value().ToString());
447 size_t count = 0;
448 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
449 ++count;
450 }
451 ASSERT_EQ(2, count);
452 delete iter;
453 }
454
TEST_F(DBSecondaryTest,MissingTableFile)455 TEST_F(DBSecondaryTest, MissingTableFile) {
456 int table_files_not_exist = 0;
457 SyncPoint::GetInstance()->DisableProcessing();
458 SyncPoint::GetInstance()->ClearAllCallBacks();
459 SyncPoint::GetInstance()->SetCallBack(
460 "ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers",
461 [&](void* arg) {
462 Status s = *reinterpret_cast<Status*>(arg);
463 if (s.IsPathNotFound()) {
464 ++table_files_not_exist;
465 } else if (!s.ok()) {
466 assert(false); // Should not reach here
467 }
468 });
469 SyncPoint::GetInstance()->EnableProcessing();
470 Options options;
471 options.env = env_;
472 options.level0_file_num_compaction_trigger = 4;
473 Reopen(options);
474
475 Options options1;
476 options1.env = env_;
477 options1.max_open_files = -1;
478 OpenSecondary(options1);
479
480 for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
481 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
482 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
483 ASSERT_OK(dbfull()->Flush(FlushOptions()));
484 }
485 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
486 ASSERT_OK(dbfull()->TEST_WaitForCompact());
487
488 ASSERT_NE(nullptr, db_secondary_full());
489 ReadOptions ropts;
490 ropts.verify_checksums = true;
491 std::string value;
492 ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value));
493 ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value));
494
495 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
496 ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist);
497 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
498 ASSERT_EQ("foo_value" +
499 std::to_string(options.level0_file_num_compaction_trigger - 1),
500 value);
501 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
502 ASSERT_EQ("bar_value" +
503 std::to_string(options.level0_file_num_compaction_trigger - 1),
504 value);
505 Iterator* iter = db_secondary_->NewIterator(ropts);
506 ASSERT_NE(nullptr, iter);
507 iter->Seek("bar");
508 ASSERT_TRUE(iter->Valid());
509 ASSERT_EQ("bar", iter->key().ToString());
510 ASSERT_EQ("bar_value" +
511 std::to_string(options.level0_file_num_compaction_trigger - 1),
512 iter->value().ToString());
513 iter->Seek("foo");
514 ASSERT_TRUE(iter->Valid());
515 ASSERT_EQ("foo", iter->key().ToString());
516 ASSERT_EQ("foo_value" +
517 std::to_string(options.level0_file_num_compaction_trigger - 1),
518 iter->value().ToString());
519 size_t count = 0;
520 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
521 ++count;
522 }
523 ASSERT_EQ(2, count);
524 delete iter;
525 }
526
TEST_F(DBSecondaryTest,PrimaryDropColumnFamily)527 TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) {
528 Options options;
529 options.env = env_;
530 const std::string kCfName1 = "pikachu";
531 CreateAndReopenWithCF({kCfName1}, options);
532
533 Options options1;
534 options1.env = env_;
535 options1.max_open_files = -1;
536 OpenSecondaryWithColumnFamilies({kCfName1}, options1);
537 ASSERT_EQ(2, handles_secondary_.size());
538
539 ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1"));
540 ASSERT_OK(Flush(1 /*cf*/));
541
542 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
543 ReadOptions ropts;
544 ropts.verify_checksums = true;
545 std::string value;
546 ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
547 ASSERT_EQ("foo_val_1", value);
548
549 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
550 Close();
551 CheckFileTypeCounts(dbname_, 1, 0, 1);
552 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
553 value.clear();
554 ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
555 ASSERT_EQ("foo_val_1", value);
556 }
557
TEST_F(DBSecondaryTest,SwitchManifest)558 TEST_F(DBSecondaryTest, SwitchManifest) {
559 Options options;
560 options.env = env_;
561 options.level0_file_num_compaction_trigger = 4;
562 Reopen(options);
563
564 Options options1;
565 options1.env = env_;
566 options1.max_open_files = -1;
567 OpenSecondary(options1);
568
569 const int kNumFiles = options.level0_file_num_compaction_trigger - 1;
570 // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1,
571 // ..., 9.
572 const int kNumKeys = 10;
573 // Create two sst
574 for (int i = 0; i != kNumFiles; ++i) {
575 for (int j = 0; j != kNumKeys; ++j) {
576 ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i)));
577 }
578 ASSERT_OK(Flush());
579 }
580
581 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
582 const auto& range_scan_db = [&]() {
583 ReadOptions tmp_ropts;
584 tmp_ropts.total_order_seek = true;
585 tmp_ropts.verify_checksums = true;
586 std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts));
587 int cnt = 0;
588 for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) {
589 ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString());
590 ASSERT_EQ("value_" + std::to_string(kNumFiles - 1),
591 iter->value().ToString());
592 }
593 };
594
595 range_scan_db();
596
597 // While secondary instance still keeps old MANIFEST open, we close primary,
598 // restart primary, performs full compaction, close again, restart again so
599 // that next time secondary tries to catch up with primary, the secondary
600 // will skip the MANIFEST in middle.
601 Reopen(options);
602 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
603 ASSERT_OK(dbfull()->TEST_WaitForCompact());
604
605 Reopen(options);
606 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
607
608 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
609 range_scan_db();
610 }
611
612 // Here, "Snapshot" refers to the version edits written by
613 // VersionSet::WriteSnapshot() at the beginning of the new MANIFEST after
614 // switching from the old one.
TEST_F(DBSecondaryTest,SkipSnapshotAfterManifestSwitch)615 TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) {
616 Options options;
617 options.env = env_;
618 options.disable_auto_compactions = true;
619 Reopen(options);
620
621 Options options1;
622 options1.env = env_;
623 options1.max_open_files = -1;
624 OpenSecondary(options1);
625
626 ASSERT_OK(Put("0", "value0"));
627 ASSERT_OK(Flush());
628 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
629 std::string value;
630 ReadOptions ropts;
631 ropts.verify_checksums = true;
632 ASSERT_OK(db_secondary_->Get(ropts, "0", &value));
633 ASSERT_EQ("value0", value);
634
635 Reopen(options);
636 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
637 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
638 }
639
TEST_F(DBSecondaryTest,SwitchWAL)640 TEST_F(DBSecondaryTest, SwitchWAL) {
641 const int kNumKeysPerMemtable = 1;
642 Options options;
643 options.env = env_;
644 options.max_write_buffer_number = 4;
645 options.min_write_buffer_number_to_merge = 2;
646 options.memtable_factory.reset(
647 new SpecialSkipListFactory(kNumKeysPerMemtable));
648 Reopen(options);
649
650 Options options1;
651 options1.env = env_;
652 options1.max_open_files = -1;
653 OpenSecondary(options1);
654
655 const auto& verify_db = [](DB* db1, DB* db2) {
656 ASSERT_NE(nullptr, db1);
657 ASSERT_NE(nullptr, db2);
658 ReadOptions read_opts;
659 read_opts.verify_checksums = true;
660 std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts));
661 std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts));
662 it1->SeekToFirst();
663 it2->SeekToFirst();
664 for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) {
665 ASSERT_EQ(it1->key(), it2->key());
666 ASSERT_EQ(it1->value(), it2->value());
667 }
668 ASSERT_FALSE(it1->Valid());
669 ASSERT_FALSE(it2->Valid());
670
671 for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
672 std::string value;
673 ASSERT_OK(db2->Get(read_opts, it1->key(), &value));
674 ASSERT_EQ(it1->value(), value);
675 }
676 for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
677 std::string value;
678 ASSERT_OK(db1->Get(read_opts, it2->key(), &value));
679 ASSERT_EQ(it2->value(), value);
680 }
681 };
682 for (int k = 0; k != 16; ++k) {
683 ASSERT_OK(Put("key" + std::to_string(k), "value" + std::to_string(k)));
684 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
685 verify_db(dbfull(), db_secondary_);
686 }
687 }
688
TEST_F(DBSecondaryTest,SwitchWALMultiColumnFamilies)689 TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) {
690 const int kNumKeysPerMemtable = 1;
691 SyncPoint::GetInstance()->DisableProcessing();
692 SyncPoint::GetInstance()->LoadDependency(
693 {{"DBImpl::BackgroundCallFlush:ContextCleanedUp",
694 "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}});
695 SyncPoint::GetInstance()->EnableProcessing();
696 const std::string kCFName1 = "pikachu";
697 Options options;
698 options.env = env_;
699 options.max_write_buffer_number = 4;
700 options.min_write_buffer_number_to_merge = 2;
701 options.memtable_factory.reset(
702 new SpecialSkipListFactory(kNumKeysPerMemtable));
703 CreateAndReopenWithCF({kCFName1}, options);
704
705 Options options1;
706 options1.env = env_;
707 options1.max_open_files = -1;
708 OpenSecondaryWithColumnFamilies({kCFName1}, options1);
709 ASSERT_EQ(2, handles_secondary_.size());
710
711 const auto& verify_db = [](DB* db1,
712 const std::vector<ColumnFamilyHandle*>& handles1,
713 DB* db2,
714 const std::vector<ColumnFamilyHandle*>& handles2) {
715 ASSERT_NE(nullptr, db1);
716 ASSERT_NE(nullptr, db2);
717 ReadOptions read_opts;
718 read_opts.verify_checksums = true;
719 ASSERT_EQ(handles1.size(), handles2.size());
720 for (size_t i = 0; i != handles1.size(); ++i) {
721 std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts, handles1[i]));
722 std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts, handles2[i]));
723 it1->SeekToFirst();
724 it2->SeekToFirst();
725 for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) {
726 ASSERT_EQ(it1->key(), it2->key());
727 ASSERT_EQ(it1->value(), it2->value());
728 }
729 ASSERT_FALSE(it1->Valid());
730 ASSERT_FALSE(it2->Valid());
731
732 for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
733 std::string value;
734 ASSERT_OK(db2->Get(read_opts, handles2[i], it1->key(), &value));
735 ASSERT_EQ(it1->value(), value);
736 }
737 for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
738 std::string value;
739 ASSERT_OK(db1->Get(read_opts, handles1[i], it2->key(), &value));
740 ASSERT_EQ(it2->value(), value);
741 }
742 }
743 };
744 for (int k = 0; k != 8; ++k) {
745 ASSERT_OK(
746 Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
747 ASSERT_OK(
748 Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
749 TEST_SYNC_POINT(
750 "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp");
751 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
752 verify_db(dbfull(), handles_, db_secondary_, handles_secondary_);
753 SyncPoint::GetInstance()->ClearTrace();
754 }
755 }
756
TEST_F(DBSecondaryTest,CatchUpAfterFlush)757 TEST_F(DBSecondaryTest, CatchUpAfterFlush) {
758 const int kNumKeysPerMemtable = 16;
759 Options options;
760 options.env = env_;
761 options.max_write_buffer_number = 4;
762 options.min_write_buffer_number_to_merge = 2;
763 options.memtable_factory.reset(
764 new SpecialSkipListFactory(kNumKeysPerMemtable));
765 Reopen(options);
766
767 Options options1;
768 options1.env = env_;
769 options1.max_open_files = -1;
770 OpenSecondary(options1);
771
772 WriteOptions write_opts;
773 WriteBatch wb;
774 wb.Put("key0", "value0");
775 wb.Put("key1", "value1");
776 ASSERT_OK(dbfull()->Write(write_opts, &wb));
777 ReadOptions read_opts;
778 std::unique_ptr<Iterator> iter1(db_secondary_->NewIterator(read_opts));
779 iter1->Seek("key0");
780 ASSERT_FALSE(iter1->Valid());
781 iter1->Seek("key1");
782 ASSERT_FALSE(iter1->Valid());
783 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
784 iter1->Seek("key0");
785 ASSERT_FALSE(iter1->Valid());
786 iter1->Seek("key1");
787 ASSERT_FALSE(iter1->Valid());
788 std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(read_opts));
789 iter2->Seek("key0");
790 ASSERT_TRUE(iter2->Valid());
791 ASSERT_EQ("value0", iter2->value());
792 iter2->Seek("key1");
793 ASSERT_TRUE(iter2->Valid());
794 ASSERT_EQ("value1", iter2->value());
795
796 {
797 WriteBatch wb1;
798 wb1.Put("key0", "value01");
799 wb1.Put("key1", "value11");
800 ASSERT_OK(dbfull()->Write(write_opts, &wb1));
801 }
802
803 {
804 WriteBatch wb2;
805 wb2.Put("key0", "new_value0");
806 wb2.Delete("key1");
807 ASSERT_OK(dbfull()->Write(write_opts, &wb2));
808 }
809
810 ASSERT_OK(Flush());
811
812 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
813 std::unique_ptr<Iterator> iter3(db_secondary_->NewIterator(read_opts));
814 // iter3 should not see value01 and value11 at all.
815 iter3->Seek("key0");
816 ASSERT_TRUE(iter3->Valid());
817 ASSERT_EQ("new_value0", iter3->value());
818 iter3->Seek("key1");
819 ASSERT_FALSE(iter3->Valid());
820 }
821
TEST_F(DBSecondaryTest,CheckConsistencyWhenOpen)822 TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) {
823 bool called = false;
824 Options options;
825 options.env = env_;
826 options.disable_auto_compactions = true;
827 Reopen(options);
828 SyncPoint::GetInstance()->DisableProcessing();
829 SyncPoint::GetInstance()->ClearAllCallBacks();
830 SyncPoint::GetInstance()->SetCallBack(
831 "DBImplSecondary::CheckConsistency:AfterFirstAttempt", [&](void* arg) {
832 ASSERT_NE(nullptr, arg);
833 called = true;
834 auto* s = reinterpret_cast<Status*>(arg);
835 ASSERT_NOK(*s);
836 });
837 SyncPoint::GetInstance()->LoadDependency(
838 {{"DBImpl::CheckConsistency:AfterGetLiveFilesMetaData",
839 "BackgroundCallCompaction:0"},
840 {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
841 "DBImpl::CheckConsistency:BeforeGetFileSize"}});
842 SyncPoint::GetInstance()->EnableProcessing();
843
844 ASSERT_OK(Put("a", "value0"));
845 ASSERT_OK(Put("c", "value0"));
846 ASSERT_OK(Flush());
847 ASSERT_OK(Put("b", "value1"));
848 ASSERT_OK(Put("d", "value1"));
849 ASSERT_OK(Flush());
850 port::Thread thread([this]() {
851 Options opts;
852 opts.env = env_;
853 opts.max_open_files = -1;
854 OpenSecondary(opts);
855 });
856 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
857 ASSERT_OK(dbfull()->TEST_WaitForCompact());
858 thread.join();
859 ASSERT_TRUE(called);
860 }
861 #endif //! ROCKSDB_LITE
862
863 } // namespace ROCKSDB_NAMESPACE
864
main(int argc,char ** argv)865 int main(int argc, char** argv) {
866 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
867 ::testing::InitGoogleTest(&argc, argv);
868 return RUN_ALL_TESTS();
869 }
870