1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_impl/db_impl_secondary.h"
11 #include "db/db_test_util.h"
12 #include "port/stack_trace.h"
13 #include "test_util/fault_injection_test_env.h"
14 #include "test_util/sync_point.h"
15 
16 namespace ROCKSDB_NAMESPACE {
17 
18 #ifndef ROCKSDB_LITE
19 class DBSecondaryTest : public DBTestBase {
20  public:
DBSecondaryTest()21   DBSecondaryTest()
22       : DBTestBase("/db_secondary_test"),
23         secondary_path_(),
24         handles_secondary_(),
25         db_secondary_(nullptr) {
26     secondary_path_ =
27         test::PerThreadDBPath(env_, "/db_secondary_test_secondary");
28   }
29 
~DBSecondaryTest()30   ~DBSecondaryTest() override {
31     CloseSecondary();
32     if (getenv("KEEP_DB") != nullptr) {
33       fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str());
34     } else {
35       Options options;
36       options.env = env_;
37       EXPECT_OK(DestroyDB(secondary_path_, options));
38     }
39   }
40 
41  protected:
ReopenAsSecondary(const Options & options)42   Status ReopenAsSecondary(const Options& options) {
43     return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_);
44   }
45 
46   void OpenSecondary(const Options& options);
47 
48   void OpenSecondaryWithColumnFamilies(
49       const std::vector<std::string>& column_families, const Options& options);
50 
CloseSecondary()51   void CloseSecondary() {
52     for (auto h : handles_secondary_) {
53       db_secondary_->DestroyColumnFamilyHandle(h);
54     }
55     handles_secondary_.clear();
56     delete db_secondary_;
57     db_secondary_ = nullptr;
58   }
59 
db_secondary_full()60   DBImplSecondary* db_secondary_full() {
61     return static_cast<DBImplSecondary*>(db_secondary_);
62   }
63 
64   void CheckFileTypeCounts(const std::string& dir, int expected_log,
65                            int expected_sst, int expected_manifest) const;
66 
67   std::string secondary_path_;
68   std::vector<ColumnFamilyHandle*> handles_secondary_;
69   DB* db_secondary_;
70 };
71 
OpenSecondary(const Options & options)72 void DBSecondaryTest::OpenSecondary(const Options& options) {
73   Status s =
74       DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
75   ASSERT_OK(s);
76 }
77 
OpenSecondaryWithColumnFamilies(const std::vector<std::string> & column_families,const Options & options)78 void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
79     const std::vector<std::string>& column_families, const Options& options) {
80   std::vector<ColumnFamilyDescriptor> cf_descs;
81   cf_descs.emplace_back(kDefaultColumnFamilyName, options);
82   for (const auto& cf_name : column_families) {
83     cf_descs.emplace_back(cf_name, options);
84   }
85   Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs,
86                                  &handles_secondary_, &db_secondary_);
87   ASSERT_OK(s);
88 }
89 
CheckFileTypeCounts(const std::string & dir,int expected_log,int expected_sst,int expected_manifest) const90 void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir,
91                                           int expected_log, int expected_sst,
92                                           int expected_manifest) const {
93   std::vector<std::string> filenames;
94   env_->GetChildren(dir, &filenames);
95 
96   int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0;
97   for (auto file : filenames) {
98     uint64_t number;
99     FileType type;
100     if (ParseFileName(file, &number, &type)) {
101       log_cnt += (type == kLogFile);
102       sst_cnt += (type == kTableFile);
103       manifest_cnt += (type == kDescriptorFile);
104     }
105   }
106   ASSERT_EQ(expected_log, log_cnt);
107   ASSERT_EQ(expected_sst, sst_cnt);
108   ASSERT_EQ(expected_manifest, manifest_cnt);
109 }
110 
TEST_F(DBSecondaryTest,ReopenAsSecondary)111 TEST_F(DBSecondaryTest, ReopenAsSecondary) {
112   Options options;
113   options.env = env_;
114   Reopen(options);
115   ASSERT_OK(Put("foo", "foo_value"));
116   ASSERT_OK(Put("bar", "bar_value"));
117   ASSERT_OK(dbfull()->Flush(FlushOptions()));
118   Close();
119 
120   ASSERT_OK(ReopenAsSecondary(options));
121   ASSERT_EQ("foo_value", Get("foo"));
122   ASSERT_EQ("bar_value", Get("bar"));
123   ReadOptions ropts;
124   ropts.verify_checksums = true;
125   auto db1 = static_cast<DBImplSecondary*>(db_);
126   ASSERT_NE(nullptr, db1);
127   Iterator* iter = db1->NewIterator(ropts);
128   ASSERT_NE(nullptr, iter);
129   size_t count = 0;
130   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
131     if (0 == count) {
132       ASSERT_EQ("bar", iter->key().ToString());
133       ASSERT_EQ("bar_value", iter->value().ToString());
134     } else if (1 == count) {
135       ASSERT_EQ("foo", iter->key().ToString());
136       ASSERT_EQ("foo_value", iter->value().ToString());
137     }
138     ++count;
139   }
140   delete iter;
141   ASSERT_EQ(2, count);
142 }
143 
TEST_F(DBSecondaryTest,OpenAsSecondary)144 TEST_F(DBSecondaryTest, OpenAsSecondary) {
145   Options options;
146   options.env = env_;
147   options.level0_file_num_compaction_trigger = 4;
148   Reopen(options);
149   for (int i = 0; i < 3; ++i) {
150     ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
151     ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
152     ASSERT_OK(Flush());
153   }
154   Options options1;
155   options1.env = env_;
156   options1.max_open_files = -1;
157   OpenSecondary(options1);
158   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
159   ASSERT_OK(dbfull()->TEST_WaitForCompact());
160 
161   ReadOptions ropts;
162   ropts.verify_checksums = true;
163   const auto verify_db_func = [&](const std::string& foo_val,
164                                   const std::string& bar_val) {
165     std::string value;
166     ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
167     ASSERT_EQ(foo_val, value);
168     ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
169     ASSERT_EQ(bar_val, value);
170     Iterator* iter = db_secondary_->NewIterator(ropts);
171     ASSERT_NE(nullptr, iter);
172     iter->Seek("foo");
173     ASSERT_TRUE(iter->Valid());
174     ASSERT_EQ("foo", iter->key().ToString());
175     ASSERT_EQ(foo_val, iter->value().ToString());
176     iter->Seek("bar");
177     ASSERT_TRUE(iter->Valid());
178     ASSERT_EQ("bar", iter->key().ToString());
179     ASSERT_EQ(bar_val, iter->value().ToString());
180     size_t count = 0;
181     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
182       ++count;
183     }
184     ASSERT_EQ(2, count);
185     delete iter;
186   };
187 
188   verify_db_func("foo_value2", "bar_value2");
189 
190   ASSERT_OK(Put("foo", "new_foo_value"));
191   ASSERT_OK(Put("bar", "new_bar_value"));
192   ASSERT_OK(Flush());
193 
194   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
195   verify_db_func("new_foo_value", "new_bar_value");
196 }
197 
198 namespace {
199 class TraceFileEnv : public EnvWrapper {
200  public:
TraceFileEnv(Env * _target)201   explicit TraceFileEnv(Env* _target) : EnvWrapper(_target) {}
NewRandomAccessFile(const std::string & f,std::unique_ptr<RandomAccessFile> * r,const EnvOptions & env_options)202   Status NewRandomAccessFile(const std::string& f,
203                              std::unique_ptr<RandomAccessFile>* r,
204                              const EnvOptions& env_options) override {
205     class TracedRandomAccessFile : public RandomAccessFile {
206      public:
207       TracedRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target,
208                              std::atomic<int>& counter)
209           : target_(std::move(target)), files_closed_(counter) {}
210       ~TracedRandomAccessFile() override {
211         files_closed_.fetch_add(1, std::memory_order_relaxed);
212       }
213       Status Read(uint64_t offset, size_t n, Slice* result,
214                   char* scratch) const override {
215         return target_->Read(offset, n, result, scratch);
216       }
217 
218      private:
219       std::unique_ptr<RandomAccessFile> target_;
220       std::atomic<int>& files_closed_;
221     };
222     Status s = target()->NewRandomAccessFile(f, r, env_options);
223     if (s.ok()) {
224       r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_));
225     }
226     return s;
227   }
228 
files_closed() const229   int files_closed() const {
230     return files_closed_.load(std::memory_order_relaxed);
231   }
232 
233  private:
234   std::atomic<int> files_closed_{0};
235 };
236 }  // namespace
237 
TEST_F(DBSecondaryTest,SecondaryCloseFiles)238 TEST_F(DBSecondaryTest, SecondaryCloseFiles) {
239   Options options;
240   options.env = env_;
241   options.max_open_files = 1;
242   options.disable_auto_compactions = true;
243   Reopen(options);
244   Options options1;
245   std::unique_ptr<Env> traced_env(new TraceFileEnv(env_));
246   options1.env = traced_env.get();
247   OpenSecondary(options1);
248 
249   static const auto verify_db = [&]() {
250     std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
251     std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(ReadOptions()));
252     for (iter1->SeekToFirst(), iter2->SeekToFirst();
253          iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
254       ASSERT_EQ(iter1->key(), iter2->key());
255       ASSERT_EQ(iter1->value(), iter2->value());
256     }
257     ASSERT_FALSE(iter1->Valid());
258     ASSERT_FALSE(iter2->Valid());
259   };
260 
261   ASSERT_OK(Put("a", "value"));
262   ASSERT_OK(Put("c", "value"));
263   ASSERT_OK(Flush());
264   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
265   verify_db();
266 
267   ASSERT_OK(Put("b", "value"));
268   ASSERT_OK(Put("d", "value"));
269   ASSERT_OK(Flush());
270   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
271   verify_db();
272 
273   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
274   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
275   ASSERT_EQ(2, static_cast<TraceFileEnv*>(traced_env.get())->files_closed());
276 
277   Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}});
278   ASSERT_TRUE(s.IsNotSupported());
279   CloseSecondary();
280 }
281 
TEST_F(DBSecondaryTest,OpenAsSecondaryWALTailing)282 TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
283   Options options;
284   options.env = env_;
285   options.level0_file_num_compaction_trigger = 4;
286   Reopen(options);
287   for (int i = 0; i < 3; ++i) {
288     ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
289     ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
290   }
291   Options options1;
292   options1.env = env_;
293   options1.max_open_files = -1;
294   OpenSecondary(options1);
295 
296   ReadOptions ropts;
297   ropts.verify_checksums = true;
298   const auto verify_db_func = [&](const std::string& foo_val,
299                                   const std::string& bar_val) {
300     std::string value;
301     ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
302     ASSERT_EQ(foo_val, value);
303     ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
304     ASSERT_EQ(bar_val, value);
305     Iterator* iter = db_secondary_->NewIterator(ropts);
306     ASSERT_NE(nullptr, iter);
307     iter->Seek("foo");
308     ASSERT_TRUE(iter->Valid());
309     ASSERT_EQ("foo", iter->key().ToString());
310     ASSERT_EQ(foo_val, iter->value().ToString());
311     iter->Seek("bar");
312     ASSERT_TRUE(iter->Valid());
313     ASSERT_EQ("bar", iter->key().ToString());
314     ASSERT_EQ(bar_val, iter->value().ToString());
315     size_t count = 0;
316     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
317       ++count;
318     }
319     ASSERT_EQ(2, count);
320     delete iter;
321   };
322 
323   verify_db_func("foo_value2", "bar_value2");
324 
325   ASSERT_OK(Put("foo", "new_foo_value"));
326   ASSERT_OK(Put("bar", "new_bar_value"));
327 
328   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
329   verify_db_func("new_foo_value", "new_bar_value");
330 
331   ASSERT_OK(Flush());
332   ASSERT_OK(Put("foo", "new_foo_value_1"));
333   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
334   verify_db_func("new_foo_value_1", "new_bar_value");
335 }
336 
TEST_F(DBSecondaryTest,OpenWithNonExistColumnFamily)337 TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
338   Options options;
339   options.env = env_;
340   CreateAndReopenWithCF({"pikachu"}, options);
341 
342   Options options1;
343   options1.env = env_;
344   options1.max_open_files = -1;
345   std::vector<ColumnFamilyDescriptor> cf_descs;
346   cf_descs.emplace_back(kDefaultColumnFamilyName, options1);
347   cf_descs.emplace_back("pikachu", options1);
348   cf_descs.emplace_back("eevee", options1);
349   Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs,
350                                  &handles_secondary_, &db_secondary_);
351   ASSERT_NOK(s);
352 }
353 
TEST_F(DBSecondaryTest,OpenWithSubsetOfColumnFamilies)354 TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) {
355   Options options;
356   options.env = env_;
357   CreateAndReopenWithCF({"pikachu"}, options);
358   Options options1;
359   options1.env = env_;
360   options1.max_open_files = -1;
361   OpenSecondary(options1);
362   ASSERT_EQ(0, handles_secondary_.size());
363   ASSERT_NE(nullptr, db_secondary_);
364 
365   ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value"));
366   ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value"));
367   ASSERT_OK(Flush(0 /*cf*/));
368   ASSERT_OK(Flush(1 /*cf*/));
369   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
370   ReadOptions ropts;
371   ropts.verify_checksums = true;
372   std::string value;
373   ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
374   ASSERT_EQ("foo_value", value);
375 }
376 
TEST_F(DBSecondaryTest,SwitchToNewManifestDuringOpen)377 TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) {
378   Options options;
379   options.env = env_;
380   Reopen(options);
381   Close();
382 
383   SyncPoint::GetInstance()->DisableProcessing();
384   SyncPoint::GetInstance()->ClearAllCallBacks();
385   SyncPoint::GetInstance()->LoadDependency(
386       {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
387         "VersionSet::ProcessManifestWrites:BeforeNewManifest"},
388        {"VersionSet::ProcessManifestWrites:AfterNewManifest",
389         "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
390         "1"}});
391   SyncPoint::GetInstance()->EnableProcessing();
392 
393   // Make sure db calls RecoverLogFiles so as to trigger a manifest write,
394   // which causes the db to switch to a new MANIFEST upon start.
395   port::Thread ro_db_thread([&]() {
396     Options options1;
397     options1.env = env_;
398     options1.max_open_files = -1;
399     OpenSecondary(options1);
400     CloseSecondary();
401   });
402   Reopen(options);
403   ro_db_thread.join();
404 }
405 
TEST_F(DBSecondaryTest,MissingTableFileDuringOpen)406 TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) {
407   Options options;
408   options.env = env_;
409   options.level0_file_num_compaction_trigger = 4;
410   Reopen(options);
411   for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
412     ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
413     ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
414     ASSERT_OK(dbfull()->Flush(FlushOptions()));
415   }
416   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
417   ASSERT_OK(dbfull()->TEST_WaitForCompact());
418   Options options1;
419   options1.env = env_;
420   options1.max_open_files = -1;
421   OpenSecondary(options1);
422   ReadOptions ropts;
423   ropts.verify_checksums = true;
424   std::string value;
425   ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
426   ASSERT_EQ("foo_value" +
427                 std::to_string(options.level0_file_num_compaction_trigger - 1),
428             value);
429   ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
430   ASSERT_EQ("bar_value" +
431                 std::to_string(options.level0_file_num_compaction_trigger - 1),
432             value);
433   Iterator* iter = db_secondary_->NewIterator(ropts);
434   ASSERT_NE(nullptr, iter);
435   iter->Seek("bar");
436   ASSERT_TRUE(iter->Valid());
437   ASSERT_EQ("bar", iter->key().ToString());
438   ASSERT_EQ("bar_value" +
439                 std::to_string(options.level0_file_num_compaction_trigger - 1),
440             iter->value().ToString());
441   iter->Seek("foo");
442   ASSERT_TRUE(iter->Valid());
443   ASSERT_EQ("foo", iter->key().ToString());
444   ASSERT_EQ("foo_value" +
445                 std::to_string(options.level0_file_num_compaction_trigger - 1),
446             iter->value().ToString());
447   size_t count = 0;
448   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
449     ++count;
450   }
451   ASSERT_EQ(2, count);
452   delete iter;
453 }
454 
TEST_F(DBSecondaryTest,MissingTableFile)455 TEST_F(DBSecondaryTest, MissingTableFile) {
456   int table_files_not_exist = 0;
457   SyncPoint::GetInstance()->DisableProcessing();
458   SyncPoint::GetInstance()->ClearAllCallBacks();
459   SyncPoint::GetInstance()->SetCallBack(
460       "ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers",
461       [&](void* arg) {
462         Status s = *reinterpret_cast<Status*>(arg);
463         if (s.IsPathNotFound()) {
464           ++table_files_not_exist;
465         } else if (!s.ok()) {
466           assert(false);  // Should not reach here
467         }
468       });
469   SyncPoint::GetInstance()->EnableProcessing();
470   Options options;
471   options.env = env_;
472   options.level0_file_num_compaction_trigger = 4;
473   Reopen(options);
474 
475   Options options1;
476   options1.env = env_;
477   options1.max_open_files = -1;
478   OpenSecondary(options1);
479 
480   for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
481     ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
482     ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
483     ASSERT_OK(dbfull()->Flush(FlushOptions()));
484   }
485   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
486   ASSERT_OK(dbfull()->TEST_WaitForCompact());
487 
488   ASSERT_NE(nullptr, db_secondary_full());
489   ReadOptions ropts;
490   ropts.verify_checksums = true;
491   std::string value;
492   ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value));
493   ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value));
494 
495   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
496   ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist);
497   ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
498   ASSERT_EQ("foo_value" +
499                 std::to_string(options.level0_file_num_compaction_trigger - 1),
500             value);
501   ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
502   ASSERT_EQ("bar_value" +
503                 std::to_string(options.level0_file_num_compaction_trigger - 1),
504             value);
505   Iterator* iter = db_secondary_->NewIterator(ropts);
506   ASSERT_NE(nullptr, iter);
507   iter->Seek("bar");
508   ASSERT_TRUE(iter->Valid());
509   ASSERT_EQ("bar", iter->key().ToString());
510   ASSERT_EQ("bar_value" +
511                 std::to_string(options.level0_file_num_compaction_trigger - 1),
512             iter->value().ToString());
513   iter->Seek("foo");
514   ASSERT_TRUE(iter->Valid());
515   ASSERT_EQ("foo", iter->key().ToString());
516   ASSERT_EQ("foo_value" +
517                 std::to_string(options.level0_file_num_compaction_trigger - 1),
518             iter->value().ToString());
519   size_t count = 0;
520   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
521     ++count;
522   }
523   ASSERT_EQ(2, count);
524   delete iter;
525 }
526 
TEST_F(DBSecondaryTest,PrimaryDropColumnFamily)527 TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) {
528   Options options;
529   options.env = env_;
530   const std::string kCfName1 = "pikachu";
531   CreateAndReopenWithCF({kCfName1}, options);
532 
533   Options options1;
534   options1.env = env_;
535   options1.max_open_files = -1;
536   OpenSecondaryWithColumnFamilies({kCfName1}, options1);
537   ASSERT_EQ(2, handles_secondary_.size());
538 
539   ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1"));
540   ASSERT_OK(Flush(1 /*cf*/));
541 
542   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
543   ReadOptions ropts;
544   ropts.verify_checksums = true;
545   std::string value;
546   ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
547   ASSERT_EQ("foo_val_1", value);
548 
549   ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
550   Close();
551   CheckFileTypeCounts(dbname_, 1, 0, 1);
552   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
553   value.clear();
554   ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
555   ASSERT_EQ("foo_val_1", value);
556 }
557 
TEST_F(DBSecondaryTest,SwitchManifest)558 TEST_F(DBSecondaryTest, SwitchManifest) {
559   Options options;
560   options.env = env_;
561   options.level0_file_num_compaction_trigger = 4;
562   Reopen(options);
563 
564   Options options1;
565   options1.env = env_;
566   options1.max_open_files = -1;
567   OpenSecondary(options1);
568 
569   const int kNumFiles = options.level0_file_num_compaction_trigger - 1;
570   // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1,
571   // ..., 9.
572   const int kNumKeys = 10;
573   // Create two sst
574   for (int i = 0; i != kNumFiles; ++i) {
575     for (int j = 0; j != kNumKeys; ++j) {
576       ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i)));
577     }
578     ASSERT_OK(Flush());
579   }
580 
581   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
582   const auto& range_scan_db = [&]() {
583     ReadOptions tmp_ropts;
584     tmp_ropts.total_order_seek = true;
585     tmp_ropts.verify_checksums = true;
586     std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts));
587     int cnt = 0;
588     for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) {
589       ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString());
590       ASSERT_EQ("value_" + std::to_string(kNumFiles - 1),
591                 iter->value().ToString());
592     }
593   };
594 
595   range_scan_db();
596 
597   // While secondary instance still keeps old MANIFEST open, we close primary,
598   // restart primary, performs full compaction, close again, restart again so
599   // that next time secondary tries to catch up with primary, the secondary
600   // will skip the MANIFEST in middle.
601   Reopen(options);
602   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
603   ASSERT_OK(dbfull()->TEST_WaitForCompact());
604 
605   Reopen(options);
606   ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
607 
608   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
609   range_scan_db();
610 }
611 
612 // Here, "Snapshot" refers to the version edits written by
613 // VersionSet::WriteSnapshot() at the beginning of the new MANIFEST after
614 // switching from the old one.
TEST_F(DBSecondaryTest,SkipSnapshotAfterManifestSwitch)615 TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) {
616   Options options;
617   options.env = env_;
618   options.disable_auto_compactions = true;
619   Reopen(options);
620 
621   Options options1;
622   options1.env = env_;
623   options1.max_open_files = -1;
624   OpenSecondary(options1);
625 
626   ASSERT_OK(Put("0", "value0"));
627   ASSERT_OK(Flush());
628   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
629   std::string value;
630   ReadOptions ropts;
631   ropts.verify_checksums = true;
632   ASSERT_OK(db_secondary_->Get(ropts, "0", &value));
633   ASSERT_EQ("value0", value);
634 
635   Reopen(options);
636   ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
637   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
638 }
639 
TEST_F(DBSecondaryTest,SwitchWAL)640 TEST_F(DBSecondaryTest, SwitchWAL) {
641   const int kNumKeysPerMemtable = 1;
642   Options options;
643   options.env = env_;
644   options.max_write_buffer_number = 4;
645   options.min_write_buffer_number_to_merge = 2;
646   options.memtable_factory.reset(
647       new SpecialSkipListFactory(kNumKeysPerMemtable));
648   Reopen(options);
649 
650   Options options1;
651   options1.env = env_;
652   options1.max_open_files = -1;
653   OpenSecondary(options1);
654 
655   const auto& verify_db = [](DB* db1, DB* db2) {
656     ASSERT_NE(nullptr, db1);
657     ASSERT_NE(nullptr, db2);
658     ReadOptions read_opts;
659     read_opts.verify_checksums = true;
660     std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts));
661     std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts));
662     it1->SeekToFirst();
663     it2->SeekToFirst();
664     for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) {
665       ASSERT_EQ(it1->key(), it2->key());
666       ASSERT_EQ(it1->value(), it2->value());
667     }
668     ASSERT_FALSE(it1->Valid());
669     ASSERT_FALSE(it2->Valid());
670 
671     for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
672       std::string value;
673       ASSERT_OK(db2->Get(read_opts, it1->key(), &value));
674       ASSERT_EQ(it1->value(), value);
675     }
676     for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
677       std::string value;
678       ASSERT_OK(db1->Get(read_opts, it2->key(), &value));
679       ASSERT_EQ(it2->value(), value);
680     }
681   };
682   for (int k = 0; k != 16; ++k) {
683     ASSERT_OK(Put("key" + std::to_string(k), "value" + std::to_string(k)));
684     ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
685     verify_db(dbfull(), db_secondary_);
686   }
687 }
688 
TEST_F(DBSecondaryTest,SwitchWALMultiColumnFamilies)689 TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) {
690   const int kNumKeysPerMemtable = 1;
691   SyncPoint::GetInstance()->DisableProcessing();
692   SyncPoint::GetInstance()->LoadDependency(
693       {{"DBImpl::BackgroundCallFlush:ContextCleanedUp",
694         "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}});
695   SyncPoint::GetInstance()->EnableProcessing();
696   const std::string kCFName1 = "pikachu";
697   Options options;
698   options.env = env_;
699   options.max_write_buffer_number = 4;
700   options.min_write_buffer_number_to_merge = 2;
701   options.memtable_factory.reset(
702       new SpecialSkipListFactory(kNumKeysPerMemtable));
703   CreateAndReopenWithCF({kCFName1}, options);
704 
705   Options options1;
706   options1.env = env_;
707   options1.max_open_files = -1;
708   OpenSecondaryWithColumnFamilies({kCFName1}, options1);
709   ASSERT_EQ(2, handles_secondary_.size());
710 
711   const auto& verify_db = [](DB* db1,
712                              const std::vector<ColumnFamilyHandle*>& handles1,
713                              DB* db2,
714                              const std::vector<ColumnFamilyHandle*>& handles2) {
715     ASSERT_NE(nullptr, db1);
716     ASSERT_NE(nullptr, db2);
717     ReadOptions read_opts;
718     read_opts.verify_checksums = true;
719     ASSERT_EQ(handles1.size(), handles2.size());
720     for (size_t i = 0; i != handles1.size(); ++i) {
721       std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts, handles1[i]));
722       std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts, handles2[i]));
723       it1->SeekToFirst();
724       it2->SeekToFirst();
725       for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) {
726         ASSERT_EQ(it1->key(), it2->key());
727         ASSERT_EQ(it1->value(), it2->value());
728       }
729       ASSERT_FALSE(it1->Valid());
730       ASSERT_FALSE(it2->Valid());
731 
732       for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
733         std::string value;
734         ASSERT_OK(db2->Get(read_opts, handles2[i], it1->key(), &value));
735         ASSERT_EQ(it1->value(), value);
736       }
737       for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
738         std::string value;
739         ASSERT_OK(db1->Get(read_opts, handles1[i], it2->key(), &value));
740         ASSERT_EQ(it2->value(), value);
741       }
742     }
743   };
744   for (int k = 0; k != 8; ++k) {
745     ASSERT_OK(
746         Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
747     ASSERT_OK(
748         Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
749     TEST_SYNC_POINT(
750         "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp");
751     ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
752     verify_db(dbfull(), handles_, db_secondary_, handles_secondary_);
753     SyncPoint::GetInstance()->ClearTrace();
754   }
755 }
756 
TEST_F(DBSecondaryTest,CatchUpAfterFlush)757 TEST_F(DBSecondaryTest, CatchUpAfterFlush) {
758   const int kNumKeysPerMemtable = 16;
759   Options options;
760   options.env = env_;
761   options.max_write_buffer_number = 4;
762   options.min_write_buffer_number_to_merge = 2;
763   options.memtable_factory.reset(
764       new SpecialSkipListFactory(kNumKeysPerMemtable));
765   Reopen(options);
766 
767   Options options1;
768   options1.env = env_;
769   options1.max_open_files = -1;
770   OpenSecondary(options1);
771 
772   WriteOptions write_opts;
773   WriteBatch wb;
774   wb.Put("key0", "value0");
775   wb.Put("key1", "value1");
776   ASSERT_OK(dbfull()->Write(write_opts, &wb));
777   ReadOptions read_opts;
778   std::unique_ptr<Iterator> iter1(db_secondary_->NewIterator(read_opts));
779   iter1->Seek("key0");
780   ASSERT_FALSE(iter1->Valid());
781   iter1->Seek("key1");
782   ASSERT_FALSE(iter1->Valid());
783   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
784   iter1->Seek("key0");
785   ASSERT_FALSE(iter1->Valid());
786   iter1->Seek("key1");
787   ASSERT_FALSE(iter1->Valid());
788   std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(read_opts));
789   iter2->Seek("key0");
790   ASSERT_TRUE(iter2->Valid());
791   ASSERT_EQ("value0", iter2->value());
792   iter2->Seek("key1");
793   ASSERT_TRUE(iter2->Valid());
794   ASSERT_EQ("value1", iter2->value());
795 
796   {
797     WriteBatch wb1;
798     wb1.Put("key0", "value01");
799     wb1.Put("key1", "value11");
800     ASSERT_OK(dbfull()->Write(write_opts, &wb1));
801   }
802 
803   {
804     WriteBatch wb2;
805     wb2.Put("key0", "new_value0");
806     wb2.Delete("key1");
807     ASSERT_OK(dbfull()->Write(write_opts, &wb2));
808   }
809 
810   ASSERT_OK(Flush());
811 
812   ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
813   std::unique_ptr<Iterator> iter3(db_secondary_->NewIterator(read_opts));
814   // iter3 should not see value01 and value11 at all.
815   iter3->Seek("key0");
816   ASSERT_TRUE(iter3->Valid());
817   ASSERT_EQ("new_value0", iter3->value());
818   iter3->Seek("key1");
819   ASSERT_FALSE(iter3->Valid());
820 }
821 
TEST_F(DBSecondaryTest,CheckConsistencyWhenOpen)822 TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) {
823   bool called = false;
824   Options options;
825   options.env = env_;
826   options.disable_auto_compactions = true;
827   Reopen(options);
828   SyncPoint::GetInstance()->DisableProcessing();
829   SyncPoint::GetInstance()->ClearAllCallBacks();
830   SyncPoint::GetInstance()->SetCallBack(
831       "DBImplSecondary::CheckConsistency:AfterFirstAttempt", [&](void* arg) {
832         ASSERT_NE(nullptr, arg);
833         called = true;
834         auto* s = reinterpret_cast<Status*>(arg);
835         ASSERT_NOK(*s);
836       });
837   SyncPoint::GetInstance()->LoadDependency(
838       {{"DBImpl::CheckConsistency:AfterGetLiveFilesMetaData",
839         "BackgroundCallCompaction:0"},
840        {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
841         "DBImpl::CheckConsistency:BeforeGetFileSize"}});
842   SyncPoint::GetInstance()->EnableProcessing();
843 
844   ASSERT_OK(Put("a", "value0"));
845   ASSERT_OK(Put("c", "value0"));
846   ASSERT_OK(Flush());
847   ASSERT_OK(Put("b", "value1"));
848   ASSERT_OK(Put("d", "value1"));
849   ASSERT_OK(Flush());
850   port::Thread thread([this]() {
851     Options opts;
852     opts.env = env_;
853     opts.max_open_files = -1;
854     OpenSecondary(opts);
855   });
856   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
857   ASSERT_OK(dbfull()->TEST_WaitForCompact());
858   thread.join();
859   ASSERT_TRUE(called);
860 }
861 #endif  //! ROCKSDB_LITE
862 
863 }  // namespace ROCKSDB_NAMESPACE
864 
main(int argc,char ** argv)865 int main(int argc, char** argv) {
866   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
867   ::testing::InitGoogleTest(&argc, argv);
868   return RUN_ALL_TESTS();
869 }
870