1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/db_impl/db_impl_secondary.h"
7 
8 #include <cinttypes>
9 
10 #include "db/arena_wrapped_db_iter.h"
11 #include "db/merge_context.h"
12 #include "logging/auto_roll_logger.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "util/cast_util.h"
15 
16 namespace ROCKSDB_NAMESPACE {
17 
18 #ifndef ROCKSDB_LITE
DBImplSecondary(const DBOptions & db_options,const std::string & dbname)19 DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
20                                  const std::string& dbname)
21     : DBImpl(db_options, dbname) {
22   ROCKS_LOG_INFO(immutable_db_options_.info_log,
23                  "Opening the db in secondary mode");
24   LogFlush(immutable_db_options_.info_log);
25 }
26 
~DBImplSecondary()27 DBImplSecondary::~DBImplSecondary() {}
28 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool,bool,bool,uint64_t *)29 Status DBImplSecondary::Recover(
30     const std::vector<ColumnFamilyDescriptor>& column_families,
31     bool /*readonly*/, bool /*error_if_log_file_exist*/,
32     bool /*error_if_data_exists_in_logs*/, uint64_t*) {
33   mutex_.AssertHeld();
34 
35   JobContext job_context(0);
36   Status s;
37   s = static_cast<ReactiveVersionSet*>(versions_.get())
38           ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
39                     &manifest_reader_status_);
40   if (!s.ok()) {
41     return s;
42   }
43   if (immutable_db_options_.paranoid_checks && s.ok()) {
44     s = CheckConsistency();
45   }
46   // Initial max_total_in_memory_state_ before recovery logs.
47   max_total_in_memory_state_ = 0;
48   for (auto cfd : *versions_->GetColumnFamilySet()) {
49     auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
50     max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
51                                   mutable_cf_options->max_write_buffer_number;
52   }
53   if (s.ok()) {
54     default_cf_handle_ = new ColumnFamilyHandleImpl(
55         versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
56     default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
57     single_column_family_mode_ =
58         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
59 
60     std::unordered_set<ColumnFamilyData*> cfds_changed;
61     s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
62   }
63 
64   if (s.IsPathNotFound()) {
65     ROCKS_LOG_INFO(immutable_db_options_.info_log,
66                    "Secondary tries to read WAL, but WAL file(s) have already "
67                    "been purged by primary.");
68     s = Status::OK();
69   }
70   // TODO: update options_file_number_ needed?
71 
72   job_context.Clean();
73   return s;
74 }
75 
76 // find new WAL and apply them in order to the secondary instance
FindAndRecoverLogFiles(std::unordered_set<ColumnFamilyData * > * cfds_changed,JobContext * job_context)77 Status DBImplSecondary::FindAndRecoverLogFiles(
78     std::unordered_set<ColumnFamilyData*>* cfds_changed,
79     JobContext* job_context) {
80   assert(nullptr != cfds_changed);
81   assert(nullptr != job_context);
82   Status s;
83   std::vector<uint64_t> logs;
84   s = FindNewLogNumbers(&logs);
85   if (s.ok() && !logs.empty()) {
86     SequenceNumber next_sequence(kMaxSequenceNumber);
87     s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context);
88   }
89   return s;
90 }
91 
92 // List wal_dir and find all new WALs, return these log numbers
FindNewLogNumbers(std::vector<uint64_t> * logs)93 Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
94   assert(logs != nullptr);
95   std::vector<std::string> filenames;
96   Status s;
97   s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
98   if (s.IsNotFound()) {
99     return Status::InvalidArgument("Failed to open wal_dir",
100                                    immutable_db_options_.wal_dir);
101   } else if (!s.ok()) {
102     return s;
103   }
104 
105   // if log_readers_ is non-empty, it means we have applied all logs with log
106   // numbers smaller than the smallest log in log_readers_, so there is no
107   // need to pass these logs to RecoverLogFiles
108   uint64_t log_number_min = 0;
109   if (!log_readers_.empty()) {
110     log_number_min = log_readers_.begin()->first;
111   }
112   for (size_t i = 0; i < filenames.size(); i++) {
113     uint64_t number;
114     FileType type;
115     if (ParseFileName(filenames[i], &number, &type) && type == kLogFile &&
116         number >= log_number_min) {
117       logs->push_back(number);
118     }
119   }
120   // Recover logs in the order that they were generated
121   if (!logs->empty()) {
122     std::sort(logs->begin(), logs->end());
123   }
124   return s;
125 }
126 
MaybeInitLogReader(uint64_t log_number,log::FragmentBufferedReader ** log_reader)127 Status DBImplSecondary::MaybeInitLogReader(
128     uint64_t log_number, log::FragmentBufferedReader** log_reader) {
129   auto iter = log_readers_.find(log_number);
130   // make sure the log file is still present
131   if (iter == log_readers_.end() ||
132       iter->second->reader_->GetLogNumber() != log_number) {
133     // delete the obsolete log reader if log number mismatch
134     if (iter != log_readers_.end()) {
135       log_readers_.erase(iter);
136     }
137     // initialize log reader from log_number
138     // TODO: min_log_number_to_keep_2pc check needed?
139     // Open the log file
140     std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
141     ROCKS_LOG_INFO(immutable_db_options_.info_log,
142                    "Recovering log #%" PRIu64 " mode %d", log_number,
143                    static_cast<int>(immutable_db_options_.wal_recovery_mode));
144 
145     std::unique_ptr<SequentialFileReader> file_reader;
146     {
147       std::unique_ptr<FSSequentialFile> file;
148       Status status = fs_->NewSequentialFile(
149           fname, fs_->OptimizeForLogRead(file_options_), &file,
150           nullptr);
151       if (!status.ok()) {
152         *log_reader = nullptr;
153         return status;
154       }
155       file_reader.reset(new SequentialFileReader(
156           std::move(file), fname, immutable_db_options_.log_readahead_size));
157     }
158 
159     // Create the log reader.
160     LogReaderContainer* log_reader_container = new LogReaderContainer(
161         env_, immutable_db_options_.info_log, std::move(fname),
162         std::move(file_reader), log_number);
163     log_readers_.insert(std::make_pair(
164         log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
165   }
166   iter = log_readers_.find(log_number);
167   assert(iter != log_readers_.end());
168   *log_reader = iter->second->reader_;
169   return Status::OK();
170 }
171 
172 // After manifest recovery, replay WALs and refresh log_readers_ if necessary
173 // REQUIRES: log_numbers are sorted in ascending order
RecoverLogFiles(const std::vector<uint64_t> & log_numbers,SequenceNumber * next_sequence,std::unordered_set<ColumnFamilyData * > * cfds_changed,JobContext * job_context)174 Status DBImplSecondary::RecoverLogFiles(
175     const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
176     std::unordered_set<ColumnFamilyData*>* cfds_changed,
177     JobContext* job_context) {
178   assert(nullptr != cfds_changed);
179   assert(nullptr != job_context);
180   mutex_.AssertHeld();
181   Status status;
182   for (auto log_number : log_numbers) {
183     log::FragmentBufferedReader* reader = nullptr;
184     status = MaybeInitLogReader(log_number, &reader);
185     if (!status.ok()) {
186       return status;
187     }
188     assert(reader != nullptr);
189   }
190   for (auto log_number : log_numbers) {
191     auto it  = log_readers_.find(log_number);
192     assert(it != log_readers_.end());
193     log::FragmentBufferedReader* reader = it->second->reader_;
194     // Manually update the file number allocation counter in VersionSet.
195     versions_->MarkFileNumberUsed(log_number);
196 
197     // Determine if we should tolerate incomplete records at the tail end of the
198     // Read all the records and add to a memtable
199     std::string scratch;
200     Slice record;
201     WriteBatch batch;
202 
203     while (reader->ReadRecord(&record, &scratch,
204                               immutable_db_options_.wal_recovery_mode) &&
205            status.ok()) {
206       if (record.size() < WriteBatchInternal::kHeader) {
207         reader->GetReporter()->Corruption(
208             record.size(), Status::Corruption("log record too small"));
209         continue;
210       }
211       WriteBatchInternal::SetContents(&batch, record);
212       SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
213       std::vector<uint32_t> column_family_ids;
214       status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
215       if (status.ok()) {
216         for (const auto id : column_family_ids) {
217           ColumnFamilyData* cfd =
218               versions_->GetColumnFamilySet()->GetColumnFamily(id);
219           if (cfd == nullptr) {
220             continue;
221           }
222           if (cfds_changed->count(cfd) == 0) {
223             cfds_changed->insert(cfd);
224           }
225           const std::vector<FileMetaData*>& l0_files =
226               cfd->current()->storage_info()->LevelFiles(0);
227           SequenceNumber seq =
228               l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno;
229           // If the write batch's sequence number is smaller than the last
230           // sequence number of the largest sequence persisted for this column
231           // family, then its data must reside in an SST that has already been
232           // added in the prior MANIFEST replay.
233           if (seq_of_batch <= seq) {
234             continue;
235           }
236           auto curr_log_num = port::kMaxUint64;
237           if (cfd_to_current_log_.count(cfd) > 0) {
238             curr_log_num = cfd_to_current_log_[cfd];
239           }
240           // If the active memtable contains records added by replaying an
241           // earlier WAL, then we need to seal the memtable, add it to the
242           // immutable memtable list and create a new active memtable.
243           if (!cfd->mem()->IsEmpty() && (curr_log_num == port::kMaxUint64 ||
244                                          curr_log_num != log_number)) {
245             const MutableCFOptions mutable_cf_options =
246                 *cfd->GetLatestMutableCFOptions();
247             MemTable* new_mem =
248                 cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
249             cfd->mem()->SetNextLogNumber(log_number);
250             cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
251             new_mem->Ref();
252             cfd->SetMemtable(new_mem);
253           }
254         }
255         bool has_valid_writes = false;
256         status = WriteBatchInternal::InsertInto(
257             &batch, column_family_memtables_.get(),
258             nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
259             true, log_number, this, false /* concurrent_memtable_writes */,
260             next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
261       }
262       // If column family was not found, it might mean that the WAL write
263       // batch references to the column family that was dropped after the
264       // insert. We don't want to fail the whole write batch in that case --
265       // we just ignore the update.
266       // That's why we set ignore missing column families to true
267       // passing null flush_scheduler will disable memtable flushing which is
268       // needed for secondary instances
269       if (status.ok()) {
270         for (const auto id : column_family_ids) {
271           ColumnFamilyData* cfd =
272               versions_->GetColumnFamilySet()->GetColumnFamily(id);
273           if (cfd == nullptr) {
274             continue;
275           }
276           std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter =
277               cfd_to_current_log_.find(cfd);
278           if (iter == cfd_to_current_log_.end()) {
279             cfd_to_current_log_.insert({cfd, log_number});
280           } else if (log_number > iter->second) {
281             iter->second = log_number;
282           }
283         }
284         auto last_sequence = *next_sequence - 1;
285         if ((*next_sequence != kMaxSequenceNumber) &&
286             (versions_->LastSequence() <= last_sequence)) {
287           versions_->SetLastAllocatedSequence(last_sequence);
288           versions_->SetLastPublishedSequence(last_sequence);
289           versions_->SetLastSequence(last_sequence);
290         }
291       } else {
292         // We are treating this as a failure while reading since we read valid
293         // blocks that do not form coherent data
294         reader->GetReporter()->Corruption(record.size(), status);
295       }
296     }
297     if (!status.ok()) {
298       return status;
299     }
300   }
301   // remove logreaders from map after successfully recovering the WAL
302   if (log_readers_.size() > 1) {
303     auto erase_iter = log_readers_.begin();
304     std::advance(erase_iter, log_readers_.size() - 1);
305     log_readers_.erase(log_readers_.begin(), erase_iter);
306   }
307   return status;
308 }
309 
310 // Implementation of the DB interface
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)311 Status DBImplSecondary::Get(const ReadOptions& read_options,
312                             ColumnFamilyHandle* column_family, const Slice& key,
313                             PinnableSlice* value) {
314   return GetImpl(read_options, column_family, key, value);
315 }
316 
GetImpl(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)317 Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
318                                 ColumnFamilyHandle* column_family,
319                                 const Slice& key, PinnableSlice* pinnable_val) {
320   assert(pinnable_val != nullptr);
321   PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
322   StopWatch sw(env_, stats_, DB_GET);
323   PERF_TIMER_GUARD(get_snapshot_time);
324 
325   auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
326   ColumnFamilyData* cfd = cfh->cfd();
327   if (tracer_) {
328     InstrumentedMutexLock lock(&trace_mutex_);
329     if (tracer_) {
330       tracer_->Get(column_family, key);
331     }
332   }
333   // Acquire SuperVersion
334   SuperVersion* super_version = GetAndRefSuperVersion(cfd);
335   SequenceNumber snapshot = versions_->LastSequence();
336   MergeContext merge_context;
337   SequenceNumber max_covering_tombstone_seq = 0;
338   Status s;
339   LookupKey lkey(key, snapshot);
340   PERF_TIMER_STOP(get_snapshot_time);
341 
342   bool done = false;
343   if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
344                               /*timestamp=*/nullptr, &s, &merge_context,
345                               &max_covering_tombstone_seq, read_options)) {
346     done = true;
347     pinnable_val->PinSelf();
348     RecordTick(stats_, MEMTABLE_HIT);
349   } else if ((s.ok() || s.IsMergeInProgress()) &&
350              super_version->imm->Get(
351                  lkey, pinnable_val->GetSelf(), /*timestamp=*/nullptr, &s,
352                  &merge_context, &max_covering_tombstone_seq, read_options)) {
353     done = true;
354     pinnable_val->PinSelf();
355     RecordTick(stats_, MEMTABLE_HIT);
356   }
357   if (!done && !s.ok() && !s.IsMergeInProgress()) {
358     ReturnAndCleanupSuperVersion(cfd, super_version);
359     return s;
360   }
361   if (!done) {
362     PERF_TIMER_GUARD(get_from_output_files_time);
363     super_version->current->Get(read_options, lkey, pinnable_val,
364                                 /*timestamp=*/nullptr, &s, &merge_context,
365                                 &max_covering_tombstone_seq);
366     RecordTick(stats_, MEMTABLE_MISS);
367   }
368   {
369     PERF_TIMER_GUARD(get_post_process_time);
370     ReturnAndCleanupSuperVersion(cfd, super_version);
371     RecordTick(stats_, NUMBER_KEYS_READ);
372     size_t size = pinnable_val->size();
373     RecordTick(stats_, BYTES_READ, size);
374     RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
375     PERF_COUNTER_ADD(get_read_bytes, size);
376   }
377   return s;
378 }
379 
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)380 Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
381                                        ColumnFamilyHandle* column_family) {
382   if (read_options.managed) {
383     return NewErrorIterator(
384         Status::NotSupported("Managed iterator is not supported anymore."));
385   }
386   if (read_options.read_tier == kPersistedTier) {
387     return NewErrorIterator(Status::NotSupported(
388         "ReadTier::kPersistedData is not yet supported in iterators."));
389   }
390   Iterator* result = nullptr;
391   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
392   auto cfd = cfh->cfd();
393   ReadCallback* read_callback = nullptr;  // No read callback provided.
394   if (read_options.tailing) {
395     return NewErrorIterator(Status::NotSupported(
396         "tailing iterator not supported in secondary mode"));
397   } else if (read_options.snapshot != nullptr) {
398     // TODO (yanqin) support snapshot.
399     return NewErrorIterator(
400         Status::NotSupported("snapshot not supported in secondary mode"));
401   } else {
402     auto snapshot = versions_->LastSequence();
403     result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
404   }
405   return result;
406 }
407 
NewIteratorImpl(const ReadOptions & read_options,ColumnFamilyData * cfd,SequenceNumber snapshot,ReadCallback * read_callback)408 ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
409     const ReadOptions& read_options, ColumnFamilyData* cfd,
410     SequenceNumber snapshot, ReadCallback* read_callback) {
411   assert(nullptr != cfd);
412   SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
413   auto db_iter = NewArenaWrappedDbIterator(
414       env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
415       snapshot,
416       super_version->mutable_cf_options.max_sequential_skip_in_iterations,
417       super_version->version_number, read_callback);
418   auto internal_iter =
419       NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
420                           db_iter->GetRangeDelAggregator(), snapshot);
421   db_iter->SetIterUnderDBIter(internal_iter);
422   return db_iter;
423 }
424 
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)425 Status DBImplSecondary::NewIterators(
426     const ReadOptions& read_options,
427     const std::vector<ColumnFamilyHandle*>& column_families,
428     std::vector<Iterator*>* iterators) {
429   if (read_options.managed) {
430     return Status::NotSupported("Managed iterator is not supported anymore.");
431   }
432   if (read_options.read_tier == kPersistedTier) {
433     return Status::NotSupported(
434         "ReadTier::kPersistedData is not yet supported in iterators.");
435   }
436   ReadCallback* read_callback = nullptr;  // No read callback provided.
437   if (iterators == nullptr) {
438     return Status::InvalidArgument("iterators not allowed to be nullptr");
439   }
440   iterators->clear();
441   iterators->reserve(column_families.size());
442   if (read_options.tailing) {
443     return Status::NotSupported(
444         "tailing iterator not supported in secondary mode");
445   } else if (read_options.snapshot != nullptr) {
446     // TODO (yanqin) support snapshot.
447     return Status::NotSupported("snapshot not supported in secondary mode");
448   } else {
449     SequenceNumber read_seq = versions_->LastSequence();
450     for (auto cfh : column_families) {
451       ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
452       iterators->push_back(
453           NewIteratorImpl(read_options, cfd, read_seq, read_callback));
454     }
455   }
456   return Status::OK();
457 }
458 
CheckConsistency()459 Status DBImplSecondary::CheckConsistency() {
460   mutex_.AssertHeld();
461   Status s = DBImpl::CheckConsistency();
462   // If DBImpl::CheckConsistency() which is stricter returns success, then we
463   // do not need to give a second chance.
464   if (s.ok()) {
465     return s;
466   }
467   // It's possible that DBImpl::CheckConssitency() can fail because the primary
468   // may have removed certain files, causing the GetFileSize(name) call to
469   // fail and returning a PathNotFound. In this case, we take a best-effort
470   // approach and just proceed.
471   TEST_SYNC_POINT_CALLBACK(
472       "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s);
473 
474   if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
475     return Status::OK();
476   }
477 
478   std::vector<LiveFileMetaData> metadata;
479   versions_->GetLiveFilesMetaData(&metadata);
480 
481   std::string corruption_messages;
482   for (const auto& md : metadata) {
483     // md.name has a leading "/".
484     std::string file_path = md.db_path + md.name;
485 
486     uint64_t fsize = 0;
487     s = env_->GetFileSize(file_path, &fsize);
488     if (!s.ok() &&
489         (env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok() ||
490          s.IsPathNotFound())) {
491       s = Status::OK();
492     }
493     if (!s.ok()) {
494       corruption_messages +=
495           "Can't access " + md.name + ": " + s.ToString() + "\n";
496     }
497   }
498   return corruption_messages.empty() ? Status::OK()
499                                      : Status::Corruption(corruption_messages);
500 }
501 
TryCatchUpWithPrimary()502 Status DBImplSecondary::TryCatchUpWithPrimary() {
503   assert(versions_.get() != nullptr);
504   assert(manifest_reader_.get() != nullptr);
505   Status s;
506   // read the manifest and apply new changes to the secondary instance
507   std::unordered_set<ColumnFamilyData*> cfds_changed;
508   JobContext job_context(0, true /*create_superversion*/);
509   {
510     InstrumentedMutexLock lock_guard(&mutex_);
511     s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
512             ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
513 
514     ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
515                    static_cast<uint64_t>(versions_->LastSequence()));
516     for (ColumnFamilyData* cfd : cfds_changed) {
517       if (cfd->IsDropped()) {
518         ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
519                         cfd->GetName().c_str());
520         continue;
521       }
522       VersionStorageInfo::LevelSummaryStorage tmp;
523       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
524                       "[%s] Level summary: %s\n", cfd->GetName().c_str(),
525                       cfd->current()->storage_info()->LevelSummary(&tmp));
526     }
527 
528     // list wal_dir to discover new WALs and apply new changes to the secondary
529     // instance
530     if (s.ok()) {
531       s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
532     }
533     if (s.IsPathNotFound()) {
534       ROCKS_LOG_INFO(
535           immutable_db_options_.info_log,
536           "Secondary tries to read WAL, but WAL file(s) have already "
537           "been purged by primary.");
538       s = Status::OK();
539     }
540     if (s.ok()) {
541       for (auto cfd : cfds_changed) {
542         cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
543                                        &job_context.memtables_to_free);
544         auto& sv_context = job_context.superversion_contexts.back();
545         cfd->InstallSuperVersion(&sv_context, &mutex_);
546         sv_context.NewSuperVersion();
547       }
548     }
549   }
550   job_context.Clean();
551 
552   // Cleanup unused, obsolete files.
553   JobContext purge_files_job_context(0);
554   {
555     InstrumentedMutexLock lock_guard(&mutex_);
556     // Currently, secondary instance does not own the database files, thus it
557     // is unnecessary for the secondary to force full scan.
558     FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
559   }
560   if (purge_files_job_context.HaveSomethingToDelete()) {
561     PurgeObsoleteFiles(purge_files_job_context);
562   }
563   purge_files_job_context.Clean();
564   return s;
565 }
566 
OpenAsSecondary(const Options & options,const std::string & dbname,const std::string & secondary_path,DB ** dbptr)567 Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
568                            const std::string& secondary_path, DB** dbptr) {
569   *dbptr = nullptr;
570 
571   DBOptions db_options(options);
572   ColumnFamilyOptions cf_options(options);
573   std::vector<ColumnFamilyDescriptor> column_families;
574   column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
575   std::vector<ColumnFamilyHandle*> handles;
576 
577   Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
578                                  column_families, &handles, dbptr);
579   if (s.ok()) {
580     assert(handles.size() == 1);
581     delete handles[0];
582   }
583   return s;
584 }
585 
OpenAsSecondary(const DBOptions & db_options,const std::string & dbname,const std::string & secondary_path,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr)586 Status DB::OpenAsSecondary(
587     const DBOptions& db_options, const std::string& dbname,
588     const std::string& secondary_path,
589     const std::vector<ColumnFamilyDescriptor>& column_families,
590     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
591   *dbptr = nullptr;
592   if (db_options.max_open_files != -1) {
593     // TODO (yanqin) maybe support max_open_files != -1 by creating hard links
594     // on SST files so that db secondary can still have access to old SSTs
595     // while primary instance may delete original.
596     return Status::InvalidArgument("require max_open_files to be -1");
597   }
598 
599   DBOptions tmp_opts(db_options);
600   Status s;
601   if (nullptr == tmp_opts.info_log) {
602     s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log);
603     if (!s.ok()) {
604       tmp_opts.info_log = nullptr;
605     }
606   }
607 
608   handles->clear();
609   DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname);
610   impl->versions_.reset(new ReactiveVersionSet(
611       dbname, &impl->immutable_db_options_, impl->file_options_,
612       impl->table_cache_.get(), impl->write_buffer_manager_,
613       &impl->write_controller_));
614   impl->column_family_memtables_.reset(
615       new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
616   impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
617 
618   impl->mutex_.Lock();
619   s = impl->Recover(column_families, true, false, false);
620   if (s.ok()) {
621     for (auto cf : column_families) {
622       auto cfd =
623           impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
624       if (nullptr == cfd) {
625         s = Status::InvalidArgument("Column family not found: ", cf.name);
626         break;
627       }
628       handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
629     }
630   }
631   SuperVersionContext sv_context(true /* create_superversion */);
632   if (s.ok()) {
633     for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
634       sv_context.NewSuperVersion();
635       cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
636     }
637   }
638   impl->mutex_.Unlock();
639   sv_context.Clean();
640   if (s.ok()) {
641     *dbptr = impl;
642     for (auto h : *handles) {
643       impl->NewThreadStatusCfInfo(
644           reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
645     }
646   } else {
647     for (auto h : *handles) {
648       delete h;
649     }
650     handles->clear();
651     delete impl;
652   }
653   return s;
654 }
655 #else   // !ROCKSDB_LITE
656 
657 Status DB::OpenAsSecondary(const Options& /*options*/,
658                            const std::string& /*name*/,
659                            const std::string& /*secondary_path*/,
660                            DB** /*dbptr*/) {
661   return Status::NotSupported("Not supported in ROCKSDB_LITE.");
662 }
663 
664 Status DB::OpenAsSecondary(
665     const DBOptions& /*db_options*/, const std::string& /*dbname*/,
666     const std::string& /*secondary_path*/,
667     const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
668     std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) {
669   return Status::NotSupported("Not supported in ROCKSDB_LITE.");
670 }
671 #endif  // !ROCKSDB_LITE
672 
673 }  // namespace ROCKSDB_NAMESPACE
674