1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10 
11 #include <cinttypes>
12 
13 #include "db/builder.h"
14 #include "db/error_handler.h"
15 #include "env/composite_env_wrapper.h"
16 #include "file/read_write_util.h"
17 #include "file/sst_file_manager_impl.h"
18 #include "file/writable_file_writer.h"
19 #include "monitoring/persistent_stats_history.h"
20 #include "options/options_helper.h"
21 #include "rocksdb/wal_filter.h"
22 #include "table/block_based/block_based_table_factory.h"
23 #include "test_util/sync_point.h"
24 #include "util/rate_limiter.h"
25 
26 namespace ROCKSDB_NAMESPACE {
SanitizeOptions(const std::string & dbname,const Options & src)27 Options SanitizeOptions(const std::string& dbname, const Options& src) {
28   auto db_options = SanitizeOptions(dbname, DBOptions(src));
29   ImmutableDBOptions immutable_db_options(db_options);
30   auto cf_options =
31       SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
32   return Options(db_options, cf_options);
33 }
34 
SanitizeOptions(const std::string & dbname,const DBOptions & src)35 DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
36   DBOptions result(src);
37 
38   if (result.env == nullptr) {
39     result.env = Env::Default();
40   }
41 
42   // result.max_open_files means an "infinite" open files.
43   if (result.max_open_files != -1) {
44     int max_max_open_files = port::GetMaxOpenFiles();
45     if (max_max_open_files == -1) {
46       max_max_open_files = 0x400000;
47     }
48     ClipToRange(&result.max_open_files, 20, max_max_open_files);
49     TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
50                              &result.max_open_files);
51   }
52 
53   if (result.info_log == nullptr) {
54     Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
55     if (!s.ok()) {
56       // No place suitable for logging
57       result.info_log = nullptr;
58     }
59   }
60 
61   if (!result.write_buffer_manager) {
62     result.write_buffer_manager.reset(
63         new WriteBufferManager(result.db_write_buffer_size));
64   }
65   auto bg_job_limits = DBImpl::GetBGJobLimits(
66       result.max_background_flushes, result.max_background_compactions,
67       result.max_background_jobs, true /* parallelize_compactions */);
68   result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
69                                            Env::Priority::LOW);
70   result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
71                                            Env::Priority::HIGH);
72 
73   if (result.rate_limiter.get() != nullptr) {
74     if (result.bytes_per_sync == 0) {
75       result.bytes_per_sync = 1024 * 1024;
76     }
77   }
78 
79   if (result.delayed_write_rate == 0) {
80     if (result.rate_limiter.get() != nullptr) {
81       result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
82     }
83     if (result.delayed_write_rate == 0) {
84       result.delayed_write_rate = 16 * 1024 * 1024;
85     }
86   }
87 
88   if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
89     result.recycle_log_file_num = false;
90   }
91 
92   if (result.recycle_log_file_num &&
93       (result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
94        result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
95     // kPointInTimeRecovery is inconsistent with recycle log file feature since
96     // we define the "end" of the log as the first corrupt record we encounter.
97     // kAbsoluteConsistency doesn't make sense because even a clean
98     // shutdown leaves old junk at the end of the log file.
99     result.recycle_log_file_num = 0;
100   }
101 
102   if (result.wal_dir.empty()) {
103     // Use dbname as default
104     result.wal_dir = dbname;
105   }
106   if (result.wal_dir.back() == '/') {
107     result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
108   }
109 
110   if (result.db_paths.size() == 0) {
111     result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
112   }
113 
114   if (result.use_direct_reads && result.compaction_readahead_size == 0) {
115     TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
116     result.compaction_readahead_size = 1024 * 1024 * 2;
117   }
118 
119   if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
120     result.new_table_reader_for_compaction_inputs = true;
121   }
122 
123   // Force flush on DB open if 2PC is enabled, since with 2PC we have no
124   // guarantee that consecutive log files have consecutive sequence id, which
125   // make recovery complicated.
126   if (result.allow_2pc) {
127     result.avoid_flush_during_recovery = false;
128   }
129 
130 #ifndef ROCKSDB_LITE
131   ImmutableDBOptions immutable_db_options(result);
132   if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
133     // Either the WAL dir and db_paths[0]/db_name are not the same, or we
134     // cannot tell for sure. In either case, assume they're different and
135     // explicitly cleanup the trash log files (bypass DeleteScheduler)
136     // Do this first so even if we end up calling
137     // DeleteScheduler::CleanupDirectory on the same dir later, it will be
138     // safe
139     std::vector<std::string> filenames;
140     result.env->GetChildren(result.wal_dir, &filenames);
141     for (std::string& filename : filenames) {
142       if (filename.find(".log.trash", filename.length() -
143                                           std::string(".log.trash").length()) !=
144           std::string::npos) {
145         std::string trash_file = result.wal_dir + "/" + filename;
146         result.env->DeleteFile(trash_file);
147       }
148     }
149   }
150   // When the DB is stopped, it's possible that there are some .trash files that
151   // were not deleted yet, when we open the DB we will find these .trash files
152   // and schedule them to be deleted (or delete immediately if SstFileManager
153   // was not used)
154   auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
155   for (size_t i = 0; i < result.db_paths.size(); i++) {
156     DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path);
157   }
158 
159   // Create a default SstFileManager for purposes of tracking compaction size
160   // and facilitating recovery from out of space errors.
161   if (result.sst_file_manager.get() == nullptr) {
162     std::shared_ptr<SstFileManager> sst_file_manager(
163         NewSstFileManager(result.env, result.info_log));
164     result.sst_file_manager = sst_file_manager;
165   }
166 #endif
167 
168   if (!result.paranoid_checks) {
169     result.skip_checking_sst_file_sizes_on_db_open = true;
170     ROCKS_LOG_INFO(result.info_log,
171                    "file size check will be skipped during open.");
172   }
173 
174   return result;
175 }
176 
177 namespace {
SanitizeOptionsByTable(const DBOptions & db_opts,const std::vector<ColumnFamilyDescriptor> & column_families)178 Status SanitizeOptionsByTable(
179     const DBOptions& db_opts,
180     const std::vector<ColumnFamilyDescriptor>& column_families) {
181   Status s;
182   for (auto cf : column_families) {
183     s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
184     if (!s.ok()) {
185       return s;
186     }
187   }
188   return Status::OK();
189 }
190 }  // namespace
191 
ValidateOptions(const DBOptions & db_options,const std::vector<ColumnFamilyDescriptor> & column_families)192 Status DBImpl::ValidateOptions(
193     const DBOptions& db_options,
194     const std::vector<ColumnFamilyDescriptor>& column_families) {
195   Status s;
196   for (auto& cfd : column_families) {
197     s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
198     if (!s.ok()) {
199       return s;
200     }
201   }
202   s = ValidateOptions(db_options);
203   return s;
204 }
205 
ValidateOptions(const DBOptions & db_options)206 Status DBImpl::ValidateOptions(const DBOptions& db_options) {
207   if (db_options.db_paths.size() > 4) {
208     return Status::NotSupported(
209         "More than four DB paths are not supported yet. ");
210   }
211 
212   if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
213     // Protect against assert in PosixMMapReadableFile constructor
214     return Status::NotSupported(
215         "If memory mapped reads (allow_mmap_reads) are enabled "
216         "then direct I/O reads (use_direct_reads) must be disabled. ");
217   }
218 
219   if (db_options.allow_mmap_writes &&
220       db_options.use_direct_io_for_flush_and_compaction) {
221     return Status::NotSupported(
222         "If memory mapped writes (allow_mmap_writes) are enabled "
223         "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
224         "be disabled. ");
225   }
226 
227   if (db_options.keep_log_file_num == 0) {
228     return Status::InvalidArgument("keep_log_file_num must be greater than 0");
229   }
230 
231   if (db_options.unordered_write &&
232       !db_options.allow_concurrent_memtable_write) {
233     return Status::InvalidArgument(
234         "unordered_write is incompatible with !allow_concurrent_memtable_write");
235   }
236 
237   if (db_options.unordered_write && db_options.enable_pipelined_write) {
238     return Status::InvalidArgument(
239         "unordered_write is incompatible with enable_pipelined_write");
240   }
241 
242   if (db_options.atomic_flush && db_options.enable_pipelined_write) {
243     return Status::InvalidArgument(
244         "atomic_flush is incompatible with enable_pipelined_write");
245   }
246 
247   // TODO remove this restriction
248   if (db_options.atomic_flush && db_options.best_efforts_recovery) {
249     return Status::InvalidArgument(
250         "atomic_flush is currently incompatible with best-efforts recovery");
251   }
252 
253   return Status::OK();
254 }
255 
NewDB()256 Status DBImpl::NewDB() {
257   VersionEdit new_db;
258   Status s = SetIdentityFile(env_, dbname_);
259   if (!s.ok()) {
260     return s;
261   }
262   if (immutable_db_options_.write_dbid_to_manifest) {
263     std::string temp_db_id;
264     GetDbIdentityFromIdentityFile(&temp_db_id);
265     new_db.SetDBId(temp_db_id);
266   }
267   new_db.SetLogNumber(0);
268   new_db.SetNextFile(2);
269   new_db.SetLastSequence(0);
270 
271   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
272   const std::string manifest = DescriptorFileName(dbname_, 1);
273   {
274     std::unique_ptr<FSWritableFile> file;
275     FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
276     s = NewWritableFile(fs_.get(), manifest, &file, file_options);
277     if (!s.ok()) {
278       return s;
279     }
280     file->SetPreallocationBlockSize(
281         immutable_db_options_.manifest_preallocation_size);
282     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
283         std::move(file), manifest, file_options, env_, nullptr /* stats */,
284         immutable_db_options_.listeners));
285     log::Writer log(std::move(file_writer), 0, false);
286     std::string record;
287     new_db.EncodeTo(&record);
288     s = log.AddRecord(record);
289     if (s.ok()) {
290       s = SyncManifest(env_, &immutable_db_options_, log.file());
291     }
292   }
293   if (s.ok()) {
294     // Make "CURRENT" file that points to the new manifest file.
295     s = SetCurrentFile(fs_.get(), dbname_, 1, directories_.GetDbDir());
296   } else {
297     fs_->DeleteFile(manifest, IOOptions(), nullptr);
298   }
299   return s;
300 }
301 
CreateAndNewDirectory(FileSystem * fs,const std::string & dirname,std::unique_ptr<FSDirectory> * directory)302 IOStatus DBImpl::CreateAndNewDirectory(
303     FileSystem* fs, const std::string& dirname,
304     std::unique_ptr<FSDirectory>* directory) {
305   // We call CreateDirIfMissing() as the directory may already exist (if we
306   // are reopening a DB), when this happens we don't want creating the
307   // directory to cause an error. However, we need to check if creating the
308   // directory fails or else we may get an obscure message about the lock
309   // file not existing. One real-world example of this occurring is if
310   // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
311   // when dbname_ is "dir/db" but when "dir" doesn't exist.
312   IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
313   if (!io_s.ok()) {
314     return io_s;
315   }
316   return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
317 }
318 
SetDirectories(FileSystem * fs,const std::string & dbname,const std::string & wal_dir,const std::vector<DbPath> & data_paths)319 IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
320                                      const std::string& wal_dir,
321                                      const std::vector<DbPath>& data_paths) {
322   IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
323   if (!io_s.ok()) {
324     return io_s;
325   }
326   if (!wal_dir.empty() && dbname != wal_dir) {
327     io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
328     if (!io_s.ok()) {
329       return io_s;
330     }
331   }
332 
333   data_dirs_.clear();
334   for (auto& p : data_paths) {
335     const std::string db_path = p.path;
336     if (db_path == dbname) {
337       data_dirs_.emplace_back(nullptr);
338     } else {
339       std::unique_ptr<FSDirectory> path_directory;
340       io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
341       if (!io_s.ok()) {
342         return io_s;
343       }
344       data_dirs_.emplace_back(path_directory.release());
345     }
346   }
347   assert(data_dirs_.size() == data_paths.size());
348   return IOStatus::OK();
349 }
350 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,bool error_if_log_file_exist,bool error_if_data_exists_in_logs,uint64_t * recovered_seq)351 Status DBImpl::Recover(
352     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
353     bool error_if_log_file_exist, bool error_if_data_exists_in_logs,
354     uint64_t* recovered_seq) {
355   mutex_.AssertHeld();
356 
357   bool is_new_db = false;
358   assert(db_lock_ == nullptr);
359   if (!read_only) {
360     Status s = directories_.SetDirectories(fs_.get(), dbname_,
361                                            immutable_db_options_.wal_dir,
362                                            immutable_db_options_.db_paths);
363     if (!s.ok()) {
364       return s;
365     }
366 
367     s = env_->LockFile(LockFileName(dbname_), &db_lock_);
368     if (!s.ok()) {
369       return s;
370     }
371 
372     std::string current_fname = CurrentFileName(dbname_);
373     s = env_->FileExists(current_fname);
374     if (s.IsNotFound()) {
375       if (immutable_db_options_.create_if_missing) {
376         s = NewDB();
377         is_new_db = true;
378         if (!s.ok()) {
379           return s;
380         }
381       } else {
382         return Status::InvalidArgument(
383             current_fname, "does not exist (create_if_missing is false)");
384       }
385     } else if (s.ok()) {
386       if (immutable_db_options_.error_if_exists) {
387         return Status::InvalidArgument(dbname_,
388                                        "exists (error_if_exists is true)");
389       }
390     } else {
391       // Unexpected error reading file
392       assert(s.IsIOError());
393       return s;
394     }
395     // Verify compatibility of file_options_ and filesystem
396     {
397       std::unique_ptr<FSRandomAccessFile> idfile;
398       FileOptions customized_fs(file_options_);
399       customized_fs.use_direct_reads |=
400           immutable_db_options_.use_direct_io_for_flush_and_compaction;
401       s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
402                                    nullptr);
403       if (!s.ok()) {
404         std::string error_str = s.ToString();
405         // Check if unsupported Direct I/O is the root cause
406         customized_fs.use_direct_reads = false;
407         s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
408                                      nullptr);
409         if (s.ok()) {
410           return Status::InvalidArgument(
411               "Direct I/O is not supported by the specified DB.");
412         } else {
413           return Status::InvalidArgument(
414               "Found options incompatible with filesystem", error_str.c_str());
415         }
416       }
417     }
418   }
419   assert(db_id_.empty());
420   Status s;
421   bool missing_table_file = false;
422   if (!immutable_db_options_.best_efforts_recovery) {
423     s = versions_->Recover(column_families, read_only, &db_id_);
424   } else {
425     s = versions_->TryRecover(column_families, read_only, &db_id_,
426                               &missing_table_file);
427     if (s.ok()) {
428       s = CleanupFilesAfterRecovery();
429     }
430   }
431   if (!s.ok()) {
432     return s;
433   }
434   // Happens when immutable_db_options_.write_dbid_to_manifest is set to true
435   // the very first time.
436   if (db_id_.empty()) {
437     // Check for the IDENTITY file and create it if not there.
438     s = fs_->FileExists(IdentityFileName(dbname_), IOOptions(), nullptr);
439     // Typically Identity file is created in NewDB() and for some reason if
440     // it is no longer available then at this point DB ID is not in Identity
441     // file or Manifest.
442     if (s.IsNotFound()) {
443       s = SetIdentityFile(env_, dbname_);
444       if (!s.ok()) {
445         return s;
446       }
447     } else if (!s.ok()) {
448       assert(s.IsIOError());
449       return s;
450     }
451     s = GetDbIdentityFromIdentityFile(&db_id_);
452     if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
453       VersionEdit edit;
454       edit.SetDBId(db_id_);
455       Options options;
456       MutableCFOptions mutable_cf_options(options);
457       versions_->db_id_ = db_id_;
458       s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
459                              mutable_cf_options, &edit, &mutex_, nullptr,
460                              false);
461     }
462   } else {
463     s = SetIdentityFile(env_, dbname_, db_id_);
464   }
465 
466   if (immutable_db_options_.paranoid_checks && s.ok()) {
467     s = CheckConsistency();
468   }
469   if (s.ok() && !read_only) {
470     std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
471     for (auto cfd : *versions_->GetColumnFamilySet()) {
472       s = cfd->AddDirectories(&created_dirs);
473       if (!s.ok()) {
474         return s;
475       }
476     }
477   }
478   // DB mutex is already held
479   if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
480     s = InitPersistStatsColumnFamily();
481   }
482 
483   if (s.ok()) {
484     // Initial max_total_in_memory_state_ before recovery logs. Log recovery
485     // may check this value to decide whether to flush.
486     max_total_in_memory_state_ = 0;
487     for (auto cfd : *versions_->GetColumnFamilySet()) {
488       auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
489       max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
490                                     mutable_cf_options->max_write_buffer_number;
491     }
492 
493     SequenceNumber next_sequence(kMaxSequenceNumber);
494     default_cf_handle_ = new ColumnFamilyHandleImpl(
495         versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
496     default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
497     // TODO(Zhongyi): handle single_column_family_mode_ when
498     // persistent_stats is enabled
499     single_column_family_mode_ =
500         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
501 
502     // Recover from all newer log files than the ones named in the
503     // descriptor (new log files may have been added by the previous
504     // incarnation without registering them in the descriptor).
505     //
506     // Note that prev_log_number() is no longer used, but we pay
507     // attention to it in case we are recovering a database
508     // produced by an older version of rocksdb.
509     std::vector<std::string> filenames;
510     if (!immutable_db_options_.best_efforts_recovery) {
511       s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
512     }
513     if (s.IsNotFound()) {
514       return Status::InvalidArgument("wal_dir not found",
515                                      immutable_db_options_.wal_dir);
516     } else if (!s.ok()) {
517       return s;
518     }
519 
520     std::vector<uint64_t> logs;
521     for (size_t i = 0; i < filenames.size(); i++) {
522       uint64_t number;
523       FileType type;
524       if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
525         if (is_new_db) {
526           return Status::Corruption(
527               "While creating a new Db, wal_dir contains "
528               "existing log file: ",
529               filenames[i]);
530         } else {
531           logs.push_back(number);
532         }
533       }
534     }
535 
536     if (logs.size() > 0) {
537       if (error_if_log_file_exist) {
538         return Status::Corruption(
539             "The db was opened in readonly mode with error_if_log_file_exist"
540             "flag but a log file already exists");
541       } else if (error_if_data_exists_in_logs) {
542         for (auto& log : logs) {
543           std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
544           uint64_t bytes;
545           s = env_->GetFileSize(fname, &bytes);
546           if (s.ok()) {
547             if (bytes > 0) {
548               return Status::Corruption(
549                   "error_if_data_exists_in_logs is set but there are data "
550                   " in log files.");
551             }
552           }
553         }
554       }
555     }
556 
557     if (!logs.empty()) {
558       // Recover in the order in which the logs were generated
559       std::sort(logs.begin(), logs.end());
560       bool corrupted_log_found = false;
561       s = RecoverLogFiles(logs, &next_sequence, read_only,
562                           &corrupted_log_found);
563       if (corrupted_log_found && recovered_seq != nullptr) {
564         *recovered_seq = next_sequence;
565       }
566       if (!s.ok()) {
567         // Clear memtables if recovery failed
568         for (auto cfd : *versions_->GetColumnFamilySet()) {
569           cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
570                                  kMaxSequenceNumber);
571         }
572       }
573     }
574   }
575 
576   if (read_only) {
577     // If we are opening as read-only, we need to update options_file_number_
578     // to reflect the most recent OPTIONS file. It does not matter for regular
579     // read-write db instance because options_file_number_ will later be
580     // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
581     std::vector<std::string> file_names;
582     if (s.ok()) {
583       s = env_->GetChildren(GetName(), &file_names);
584     }
585     if (s.ok()) {
586       uint64_t number = 0;
587       uint64_t options_file_number = 0;
588       FileType type;
589       for (const auto& fname : file_names) {
590         if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
591           options_file_number = std::max(number, options_file_number);
592         }
593       }
594       versions_->options_file_number_ = options_file_number;
595     }
596   }
597 
598   return s;
599 }
600 
PersistentStatsProcessFormatVersion()601 Status DBImpl::PersistentStatsProcessFormatVersion() {
602   mutex_.AssertHeld();
603   Status s;
604   // persist version when stats CF doesn't exist
605   bool should_persist_format_version = !persistent_stats_cfd_exists_;
606   mutex_.Unlock();
607   if (persistent_stats_cfd_exists_) {
608     // Check persistent stats format version compatibility. Drop and recreate
609     // persistent stats CF if format version is incompatible
610     uint64_t format_version_recovered = 0;
611     Status s_format = DecodePersistentStatsVersionNumber(
612         this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
613     uint64_t compatible_version_recovered = 0;
614     Status s_compatible = DecodePersistentStatsVersionNumber(
615         this, StatsVersionKeyType::kCompatibleVersion,
616         &compatible_version_recovered);
617     // abort reading from existing stats CF if any of following is true:
618     // 1. failed to read format version or compatible version from disk
619     // 2. sst's format version is greater than current format version, meaning
620     // this sst is encoded with a newer RocksDB release, and current compatible
621     // version is below the sst's compatible version
622     if (!s_format.ok() || !s_compatible.ok() ||
623         (kStatsCFCurrentFormatVersion < format_version_recovered &&
624          kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
625       if (!s_format.ok() || !s_compatible.ok()) {
626         ROCKS_LOG_INFO(
627             immutable_db_options_.info_log,
628             "Reading persistent stats version key failed. Format key: %s, "
629             "compatible key: %s",
630             s_format.ToString().c_str(), s_compatible.ToString().c_str());
631       } else {
632         ROCKS_LOG_INFO(
633             immutable_db_options_.info_log,
634             "Disable persistent stats due to corrupted or incompatible format "
635             "version\n");
636       }
637       DropColumnFamily(persist_stats_cf_handle_);
638       DestroyColumnFamilyHandle(persist_stats_cf_handle_);
639       ColumnFamilyHandle* handle = nullptr;
640       ColumnFamilyOptions cfo;
641       OptimizeForPersistentStats(&cfo);
642       s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
643       persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
644       // should also persist version here because old stats CF is discarded
645       should_persist_format_version = true;
646     }
647   }
648   if (s.ok() && should_persist_format_version) {
649     // Persistent stats CF being created for the first time, need to write
650     // format version key
651     WriteBatch batch;
652     batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
653               ToString(kStatsCFCurrentFormatVersion));
654     batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
655               ToString(kStatsCFCompatibleFormatVersion));
656     WriteOptions wo;
657     wo.low_pri = true;
658     wo.no_slowdown = true;
659     wo.sync = false;
660     s = Write(wo, &batch);
661   }
662   mutex_.Lock();
663   return s;
664 }
665 
InitPersistStatsColumnFamily()666 Status DBImpl::InitPersistStatsColumnFamily() {
667   mutex_.AssertHeld();
668   assert(!persist_stats_cf_handle_);
669   ColumnFamilyData* persistent_stats_cfd =
670       versions_->GetColumnFamilySet()->GetColumnFamily(
671           kPersistentStatsColumnFamilyName);
672   persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
673 
674   Status s;
675   if (persistent_stats_cfd != nullptr) {
676     // We are recovering from a DB which already contains persistent stats CF,
677     // the CF is already created in VersionSet::ApplyOneVersionEdit, but
678     // column family handle was not. Need to explicitly create handle here.
679     persist_stats_cf_handle_ =
680         new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
681   } else {
682     mutex_.Unlock();
683     ColumnFamilyHandle* handle = nullptr;
684     ColumnFamilyOptions cfo;
685     OptimizeForPersistentStats(&cfo);
686     s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
687     persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
688     mutex_.Lock();
689   }
690   return s;
691 }
692 
693 // REQUIRES: log_numbers are sorted in ascending order
RecoverLogFiles(const std::vector<uint64_t> & log_numbers,SequenceNumber * next_sequence,bool read_only,bool * corrupted_log_found)694 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
695                                SequenceNumber* next_sequence, bool read_only,
696                                bool* corrupted_log_found) {
697   struct LogReporter : public log::Reader::Reporter {
698     Env* env;
699     Logger* info_log;
700     const char* fname;
701     Status* status;  // nullptr if immutable_db_options_.paranoid_checks==false
702     void Corruption(size_t bytes, const Status& s) override {
703       ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
704                      (this->status == nullptr ? "(ignoring error) " : ""),
705                      fname, static_cast<int>(bytes), s.ToString().c_str());
706       if (this->status != nullptr && this->status->ok()) {
707         *this->status = s;
708       }
709     }
710   };
711 
712   mutex_.AssertHeld();
713   Status status;
714   std::unordered_map<int, VersionEdit> version_edits;
715   // no need to refcount because iteration is under mutex
716   for (auto cfd : *versions_->GetColumnFamilySet()) {
717     VersionEdit edit;
718     edit.SetColumnFamily(cfd->GetID());
719     version_edits.insert({cfd->GetID(), edit});
720   }
721   int job_id = next_job_id_.fetch_add(1);
722   {
723     auto stream = event_logger_.Log();
724     stream << "job" << job_id << "event"
725            << "recovery_started";
726     stream << "log_files";
727     stream.StartArray();
728     for (auto log_number : log_numbers) {
729       stream << log_number;
730     }
731     stream.EndArray();
732   }
733 
734 #ifndef ROCKSDB_LITE
735   if (immutable_db_options_.wal_filter != nullptr) {
736     std::map<std::string, uint32_t> cf_name_id_map;
737     std::map<uint32_t, uint64_t> cf_lognumber_map;
738     for (auto cfd : *versions_->GetColumnFamilySet()) {
739       cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
740       cf_lognumber_map.insert(
741           std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
742     }
743 
744     immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
745                                                                cf_name_id_map);
746   }
747 #endif
748 
749   bool stop_replay_by_wal_filter = false;
750   bool stop_replay_for_corruption = false;
751   bool flushed = false;
752   uint64_t corrupted_log_number = kMaxSequenceNumber;
753   uint64_t min_log_number = MinLogNumberToKeep();
754   for (auto log_number : log_numbers) {
755     if (log_number < min_log_number) {
756       ROCKS_LOG_INFO(immutable_db_options_.info_log,
757                      "Skipping log #%" PRIu64
758                      " since it is older than min log to keep #%" PRIu64,
759                      log_number, min_log_number);
760       continue;
761     }
762     // The previous incarnation may not have written any MANIFEST
763     // records after allocating this log number.  So we manually
764     // update the file number allocation counter in VersionSet.
765     versions_->MarkFileNumberUsed(log_number);
766     // Open the log file
767     std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
768 
769     ROCKS_LOG_INFO(immutable_db_options_.info_log,
770                    "Recovering log #%" PRIu64 " mode %d", log_number,
771                    static_cast<int>(immutable_db_options_.wal_recovery_mode));
772     auto logFileDropped = [this, &fname]() {
773       uint64_t bytes;
774       if (env_->GetFileSize(fname, &bytes).ok()) {
775         auto info_log = immutable_db_options_.info_log.get();
776         ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
777                        static_cast<int>(bytes));
778       }
779     };
780     if (stop_replay_by_wal_filter) {
781       logFileDropped();
782       continue;
783     }
784 
785     std::unique_ptr<SequentialFileReader> file_reader;
786     {
787       std::unique_ptr<FSSequentialFile> file;
788       status = fs_->NewSequentialFile(fname,
789                                       fs_->OptimizeForLogRead(file_options_),
790                                       &file, nullptr);
791       if (!status.ok()) {
792         MaybeIgnoreError(&status);
793         if (!status.ok()) {
794           return status;
795         } else {
796           // Fail with one log file, but that's ok.
797           // Try next one.
798           continue;
799         }
800       }
801       file_reader.reset(new SequentialFileReader(
802           std::move(file), fname, immutable_db_options_.log_readahead_size));
803     }
804 
805     // Create the log reader.
806     LogReporter reporter;
807     reporter.env = env_;
808     reporter.info_log = immutable_db_options_.info_log.get();
809     reporter.fname = fname.c_str();
810     if (!immutable_db_options_.paranoid_checks ||
811         immutable_db_options_.wal_recovery_mode ==
812             WALRecoveryMode::kSkipAnyCorruptedRecords) {
813       reporter.status = nullptr;
814     } else {
815       reporter.status = &status;
816     }
817     // We intentially make log::Reader do checksumming even if
818     // paranoid_checks==false so that corruptions cause entire commits
819     // to be skipped instead of propagating bad information (like overly
820     // large sequence numbers).
821     log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
822                        &reporter, true /*checksum*/, log_number);
823 
824     // Determine if we should tolerate incomplete records at the tail end of the
825     // Read all the records and add to a memtable
826     std::string scratch;
827     Slice record;
828     WriteBatch batch;
829 
830     while (!stop_replay_by_wal_filter &&
831            reader.ReadRecord(&record, &scratch,
832                              immutable_db_options_.wal_recovery_mode) &&
833            status.ok()) {
834       if (record.size() < WriteBatchInternal::kHeader) {
835         reporter.Corruption(record.size(),
836                             Status::Corruption("log record too small"));
837         continue;
838       }
839       WriteBatchInternal::SetContents(&batch, record);
840       SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
841 
842       if (immutable_db_options_.wal_recovery_mode ==
843           WALRecoveryMode::kPointInTimeRecovery) {
844         // In point-in-time recovery mode, if sequence id of log files are
845         // consecutive, we continue recovery despite corruption. This could
846         // happen when we open and write to a corrupted DB, where sequence id
847         // will start from the last sequence id we recovered.
848         if (sequence == *next_sequence) {
849           stop_replay_for_corruption = false;
850         }
851         if (stop_replay_for_corruption) {
852           logFileDropped();
853           break;
854         }
855       }
856 
857 #ifndef ROCKSDB_LITE
858       if (immutable_db_options_.wal_filter != nullptr) {
859         WriteBatch new_batch;
860         bool batch_changed = false;
861 
862         WalFilter::WalProcessingOption wal_processing_option =
863             immutable_db_options_.wal_filter->LogRecordFound(
864                 log_number, fname, batch, &new_batch, &batch_changed);
865 
866         switch (wal_processing_option) {
867           case WalFilter::WalProcessingOption::kContinueProcessing:
868             // do nothing, proceeed normally
869             break;
870           case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
871             // skip current record
872             continue;
873           case WalFilter::WalProcessingOption::kStopReplay:
874             // skip current record and stop replay
875             stop_replay_by_wal_filter = true;
876             continue;
877           case WalFilter::WalProcessingOption::kCorruptedRecord: {
878             status =
879                 Status::Corruption("Corruption reported by Wal Filter ",
880                                    immutable_db_options_.wal_filter->Name());
881             MaybeIgnoreError(&status);
882             if (!status.ok()) {
883               reporter.Corruption(record.size(), status);
884               continue;
885             }
886             break;
887           }
888           default: {
889             assert(false);  // unhandled case
890             status = Status::NotSupported(
891                 "Unknown WalProcessingOption returned"
892                 " by Wal Filter ",
893                 immutable_db_options_.wal_filter->Name());
894             MaybeIgnoreError(&status);
895             if (!status.ok()) {
896               return status;
897             } else {
898               // Ignore the error with current record processing.
899               continue;
900             }
901           }
902         }
903 
904         if (batch_changed) {
905           // Make sure that the count in the new batch is
906           // within the orignal count.
907           int new_count = WriteBatchInternal::Count(&new_batch);
908           int original_count = WriteBatchInternal::Count(&batch);
909           if (new_count > original_count) {
910             ROCKS_LOG_FATAL(
911                 immutable_db_options_.info_log,
912                 "Recovering log #%" PRIu64
913                 " mode %d log filter %s returned "
914                 "more records (%d) than original (%d) which is not allowed. "
915                 "Aborting recovery.",
916                 log_number,
917                 static_cast<int>(immutable_db_options_.wal_recovery_mode),
918                 immutable_db_options_.wal_filter->Name(), new_count,
919                 original_count);
920             status = Status::NotSupported(
921                 "More than original # of records "
922                 "returned by Wal Filter ",
923                 immutable_db_options_.wal_filter->Name());
924             return status;
925           }
926           // Set the same sequence number in the new_batch
927           // as the original batch.
928           WriteBatchInternal::SetSequence(&new_batch,
929                                           WriteBatchInternal::Sequence(&batch));
930           batch = new_batch;
931         }
932       }
933 #endif  // ROCKSDB_LITE
934 
935       // If column family was not found, it might mean that the WAL write
936       // batch references to the column family that was dropped after the
937       // insert. We don't want to fail the whole write batch in that case --
938       // we just ignore the update.
939       // That's why we set ignore missing column families to true
940       bool has_valid_writes = false;
941       status = WriteBatchInternal::InsertInto(
942           &batch, column_family_memtables_.get(), &flush_scheduler_,
943           &trim_history_scheduler_, true, log_number, this,
944           false /* concurrent_memtable_writes */, next_sequence,
945           &has_valid_writes, seq_per_batch_, batch_per_txn_);
946       MaybeIgnoreError(&status);
947       if (!status.ok()) {
948         // We are treating this as a failure while reading since we read valid
949         // blocks that do not form coherent data
950         reporter.Corruption(record.size(), status);
951         continue;
952       }
953 
954       if (has_valid_writes && !read_only) {
955         // we can do this because this is called before client has access to the
956         // DB and there is only a single thread operating on DB
957         ColumnFamilyData* cfd;
958 
959         while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
960           cfd->UnrefAndTryDelete();
961           // If this asserts, it means that InsertInto failed in
962           // filtering updates to already-flushed column families
963           assert(cfd->GetLogNumber() <= log_number);
964           auto iter = version_edits.find(cfd->GetID());
965           assert(iter != version_edits.end());
966           VersionEdit* edit = &iter->second;
967           status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
968           if (!status.ok()) {
969             // Reflect errors immediately so that conditions like full
970             // file-systems cause the DB::Open() to fail.
971             return status;
972           }
973           flushed = true;
974 
975           cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
976                                  *next_sequence);
977         }
978       }
979     }
980 
981     if (!status.ok()) {
982       if (status.IsNotSupported()) {
983         // We should not treat NotSupported as corruption. It is rather a clear
984         // sign that we are processing a WAL that is produced by an incompatible
985         // version of the code.
986         return status;
987       }
988       if (immutable_db_options_.wal_recovery_mode ==
989           WALRecoveryMode::kSkipAnyCorruptedRecords) {
990         // We should ignore all errors unconditionally
991         status = Status::OK();
992       } else if (immutable_db_options_.wal_recovery_mode ==
993                  WALRecoveryMode::kPointInTimeRecovery) {
994         // We should ignore the error but not continue replaying
995         status = Status::OK();
996         stop_replay_for_corruption = true;
997         corrupted_log_number = log_number;
998         if (corrupted_log_found != nullptr) {
999           *corrupted_log_found = true;
1000         }
1001         ROCKS_LOG_INFO(immutable_db_options_.info_log,
1002                        "Point in time recovered to log #%" PRIu64
1003                        " seq #%" PRIu64,
1004                        log_number, *next_sequence);
1005       } else {
1006         assert(immutable_db_options_.wal_recovery_mode ==
1007                    WALRecoveryMode::kTolerateCorruptedTailRecords ||
1008                immutable_db_options_.wal_recovery_mode ==
1009                    WALRecoveryMode::kAbsoluteConsistency);
1010         return status;
1011       }
1012     }
1013 
1014     flush_scheduler_.Clear();
1015     trim_history_scheduler_.Clear();
1016     auto last_sequence = *next_sequence - 1;
1017     if ((*next_sequence != kMaxSequenceNumber) &&
1018         (versions_->LastSequence() <= last_sequence)) {
1019       versions_->SetLastAllocatedSequence(last_sequence);
1020       versions_->SetLastPublishedSequence(last_sequence);
1021       versions_->SetLastSequence(last_sequence);
1022     }
1023   }
1024   // Compare the corrupted log number to all columnfamily's current log number.
1025   // Abort Open() if any column family's log number is greater than
1026   // the corrupted log number, which means CF contains data beyond the point of
1027   // corruption. This could during PIT recovery when the WAL is corrupted and
1028   // some (but not all) CFs are flushed
1029   // Exclude the PIT case where no log is dropped after the corruption point.
1030   // This is to cover the case for empty logs after corrupted log, in which we
1031   // don't reset stop_replay_for_corruption.
1032   if (stop_replay_for_corruption == true &&
1033       (immutable_db_options_.wal_recovery_mode ==
1034            WALRecoveryMode::kPointInTimeRecovery ||
1035        immutable_db_options_.wal_recovery_mode ==
1036            WALRecoveryMode::kTolerateCorruptedTailRecords)) {
1037     for (auto cfd : *versions_->GetColumnFamilySet()) {
1038       if (cfd->GetLogNumber() > corrupted_log_number) {
1039         ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1040                         "Column family inconsistency: SST file contains data"
1041                         " beyond the point of corruption.");
1042         return Status::Corruption("SST file is ahead of WALs");
1043       }
1044     }
1045   }
1046 
1047   // True if there's any data in the WALs; if not, we can skip re-processing
1048   // them later
1049   bool data_seen = false;
1050   if (!read_only) {
1051     // no need to refcount since client still doesn't have access
1052     // to the DB and can not drop column families while we iterate
1053     auto max_log_number = log_numbers.back();
1054     for (auto cfd : *versions_->GetColumnFamilySet()) {
1055       auto iter = version_edits.find(cfd->GetID());
1056       assert(iter != version_edits.end());
1057       VersionEdit* edit = &iter->second;
1058 
1059       if (cfd->GetLogNumber() > max_log_number) {
1060         // Column family cfd has already flushed the data
1061         // from all logs. Memtable has to be empty because
1062         // we filter the updates based on log_number
1063         // (in WriteBatch::InsertInto)
1064         assert(cfd->mem()->GetFirstSequenceNumber() == 0);
1065         assert(edit->NumEntries() == 0);
1066         continue;
1067       }
1068 
1069       TEST_SYNC_POINT_CALLBACK(
1070           "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1071 
1072       // flush the final memtable (if non-empty)
1073       if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1074         // If flush happened in the middle of recovery (e.g. due to memtable
1075         // being full), we flush at the end. Otherwise we'll need to record
1076         // where we were on last flush, which make the logic complicated.
1077         if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
1078           status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1079           if (!status.ok()) {
1080             // Recovery failed
1081             break;
1082           }
1083           flushed = true;
1084 
1085           cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1086                                  versions_->LastSequence());
1087         }
1088         data_seen = true;
1089       }
1090 
1091       // Update the log number info in the version edit corresponding to this
1092       // column family. Note that the version edits will be written to MANIFEST
1093       // together later.
1094       // writing log_number in the manifest means that any log file
1095       // with number strongly less than (log_number + 1) is already
1096       // recovered and should be ignored on next reincarnation.
1097       // Since we already recovered max_log_number, we want all logs
1098       // with numbers `<= max_log_number` (includes this one) to be ignored
1099       if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
1100         edit->SetLogNumber(max_log_number + 1);
1101       }
1102     }
1103     if (status.ok()) {
1104       // we must mark the next log number as used, even though it's
1105       // not actually used. that is because VersionSet assumes
1106       // VersionSet::next_file_number_ always to be strictly greater than any
1107       // log number
1108       versions_->MarkFileNumberUsed(max_log_number + 1);
1109 
1110       autovector<ColumnFamilyData*> cfds;
1111       autovector<const MutableCFOptions*> cf_opts;
1112       autovector<autovector<VersionEdit*>> edit_lists;
1113       for (auto* cfd : *versions_->GetColumnFamilySet()) {
1114         cfds.push_back(cfd);
1115         cf_opts.push_back(cfd->GetLatestMutableCFOptions());
1116         auto iter = version_edits.find(cfd->GetID());
1117         assert(iter != version_edits.end());
1118         edit_lists.push_back({&iter->second});
1119       }
1120       // write MANIFEST with update
1121       status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
1122                                       directories_.GetDbDir(),
1123                                       /*new_descriptor_log=*/true);
1124     }
1125   }
1126 
1127   if (status.ok() && data_seen && !flushed) {
1128     status = RestoreAliveLogFiles(log_numbers);
1129   }
1130 
1131   event_logger_.Log() << "job" << job_id << "event"
1132                       << "recovery_finished";
1133 
1134   return status;
1135 }
1136 
RestoreAliveLogFiles(const std::vector<uint64_t> & log_numbers)1137 Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers) {
1138   if (log_numbers.empty()) {
1139     return Status::OK();
1140   }
1141   Status s;
1142   mutex_.AssertHeld();
1143   assert(immutable_db_options_.avoid_flush_during_recovery);
1144   if (two_write_queues_) {
1145     log_write_mutex_.Lock();
1146   }
1147   // Mark these as alive so they'll be considered for deletion later by
1148   // FindObsoleteFiles()
1149   total_log_size_ = 0;
1150   log_empty_ = false;
1151   for (auto log_number : log_numbers) {
1152     LogFileNumberSize log(log_number);
1153     std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
1154     // This gets the appear size of the logs, not including preallocated space.
1155     s = env_->GetFileSize(fname, &log.size);
1156     if (!s.ok()) {
1157       break;
1158     }
1159     total_log_size_ += log.size;
1160     alive_log_files_.push_back(log);
1161     // We preallocate space for logs, but then after a crash and restart, those
1162     // preallocated space are not needed anymore. It is likely only the last
1163     // log has such preallocated space, so we only truncate for the last log.
1164     if (log_number == log_numbers.back()) {
1165       std::unique_ptr<FSWritableFile> last_log;
1166       Status truncate_status = fs_->ReopenWritableFile(
1167           fname,
1168           fs_->OptimizeForLogWrite(
1169               file_options_,
1170               BuildDBOptions(immutable_db_options_, mutable_db_options_)),
1171           &last_log, nullptr);
1172       if (truncate_status.ok()) {
1173         truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
1174       }
1175       if (truncate_status.ok()) {
1176         truncate_status = last_log->Close(IOOptions(), nullptr);
1177       }
1178       // Not a critical error if fail to truncate.
1179       if (!truncate_status.ok()) {
1180         ROCKS_LOG_WARN(immutable_db_options_.info_log,
1181                        "Failed to truncate log #%" PRIu64 ": %s", log_number,
1182                        truncate_status.ToString().c_str());
1183       }
1184     }
1185   }
1186   if (two_write_queues_) {
1187     log_write_mutex_.Unlock();
1188   }
1189   return s;
1190 }
1191 
WriteLevel0TableForRecovery(int job_id,ColumnFamilyData * cfd,MemTable * mem,VersionEdit * edit)1192 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1193                                            MemTable* mem, VersionEdit* edit) {
1194   mutex_.AssertHeld();
1195   const uint64_t start_micros = env_->NowMicros();
1196   FileMetaData meta;
1197   std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1198       new std::list<uint64_t>::iterator(
1199           CaptureCurrentFileNumberInPendingOutputs()));
1200   meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
1201   ReadOptions ro;
1202   ro.total_order_seek = true;
1203   Arena arena;
1204   Status s;
1205   TableProperties table_properties;
1206   {
1207     ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
1208     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1209                     "[%s] [WriteLevel0TableForRecovery]"
1210                     " Level-0 table #%" PRIu64 ": started",
1211                     cfd->GetName().c_str(), meta.fd.GetNumber());
1212 
1213     // Get the latest mutable cf options while the mutex is still locked
1214     const MutableCFOptions mutable_cf_options =
1215         *cfd->GetLatestMutableCFOptions();
1216     bool paranoid_file_checks =
1217         cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
1218 
1219     int64_t _current_time = 0;
1220     env_->GetCurrentTime(&_current_time);  // ignore error
1221     const uint64_t current_time = static_cast<uint64_t>(_current_time);
1222     meta.oldest_ancester_time = current_time;
1223 
1224     {
1225       auto write_hint = cfd->CalculateSSTWriteHint(0);
1226       mutex_.Unlock();
1227 
1228       SequenceNumber earliest_write_conflict_snapshot;
1229       std::vector<SequenceNumber> snapshot_seqs =
1230           snapshots_.GetAll(&earliest_write_conflict_snapshot);
1231       auto snapshot_checker = snapshot_checker_.get();
1232       if (use_custom_gc_ && snapshot_checker == nullptr) {
1233         snapshot_checker = DisableGCSnapshotChecker::Instance();
1234       }
1235       std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
1236           range_del_iters;
1237       auto range_del_iter =
1238           mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
1239       if (range_del_iter != nullptr) {
1240         range_del_iters.emplace_back(range_del_iter);
1241       }
1242       IOStatus io_s;
1243       s = BuildTable(
1244           dbname_, env_, fs_.get(), *cfd->ioptions(), mutable_cf_options,
1245           file_options_for_compaction_, cfd->table_cache(), iter.get(),
1246           std::move(range_del_iters), &meta, cfd->internal_comparator(),
1247           cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
1248           snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1249           GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
1250           mutable_cf_options.sample_for_compression,
1251           cfd->ioptions()->compression_opts, paranoid_file_checks,
1252           cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
1253           &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
1254           -1 /* level */, current_time, write_hint);
1255       LogFlush(immutable_db_options_.info_log);
1256       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1257                       "[%s] [WriteLevel0TableForRecovery]"
1258                       " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1259                       cfd->GetName().c_str(), meta.fd.GetNumber(),
1260                       meta.fd.GetFileSize(), s.ToString().c_str());
1261       mutex_.Lock();
1262     }
1263   }
1264   ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1265 
1266   // Note that if file_size is zero, the file has been deleted and
1267   // should not be added to the manifest.
1268   int level = 0;
1269   if (s.ok() && meta.fd.GetFileSize() > 0) {
1270     edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
1271                   meta.fd.GetFileSize(), meta.smallest, meta.largest,
1272                   meta.fd.smallest_seqno, meta.fd.largest_seqno,
1273                   meta.marked_for_compaction, meta.oldest_blob_file_number,
1274                   meta.oldest_ancester_time, meta.file_creation_time,
1275                   meta.file_checksum, meta.file_checksum_func_name);
1276   }
1277 
1278   InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
1279   stats.micros = env_->NowMicros() - start_micros;
1280   stats.bytes_written = meta.fd.GetFileSize();
1281   stats.num_output_files = 1;
1282   cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
1283   cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
1284                                     meta.fd.GetFileSize());
1285   RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
1286   return s;
1287 }
1288 
Open(const Options & options,const std::string & dbname,DB ** dbptr)1289 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1290   DBOptions db_options(options);
1291   ColumnFamilyOptions cf_options(options);
1292   std::vector<ColumnFamilyDescriptor> column_families;
1293   column_families.push_back(
1294       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
1295   if (db_options.persist_stats_to_disk) {
1296     column_families.push_back(
1297         ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
1298   }
1299   std::vector<ColumnFamilyHandle*> handles;
1300   Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
1301   if (s.ok()) {
1302     if (db_options.persist_stats_to_disk) {
1303       assert(handles.size() == 2);
1304     } else {
1305       assert(handles.size() == 1);
1306     }
1307     // i can delete the handle since DBImpl is always holding a reference to
1308     // default column family
1309     if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
1310       delete handles[1];
1311     }
1312     delete handles[0];
1313   }
1314   return s;
1315 }
1316 
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr)1317 Status DB::Open(const DBOptions& db_options, const std::string& dbname,
1318                 const std::vector<ColumnFamilyDescriptor>& column_families,
1319                 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
1320   const bool kSeqPerBatch = true;
1321   const bool kBatchPerTxn = true;
1322   return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
1323                       !kSeqPerBatch, kBatchPerTxn);
1324 }
1325 
CreateWAL(uint64_t log_file_num,uint64_t recycle_log_number,size_t preallocate_block_size,log::Writer ** new_log)1326 Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
1327                          size_t preallocate_block_size, log::Writer** new_log) {
1328   Status s;
1329   std::unique_ptr<FSWritableFile> lfile;
1330 
1331   DBOptions db_options =
1332       BuildDBOptions(immutable_db_options_, mutable_db_options_);
1333   FileOptions opt_file_options =
1334       fs_->OptimizeForLogWrite(file_options_, db_options);
1335   std::string log_fname =
1336       LogFileName(immutable_db_options_.wal_dir, log_file_num);
1337 
1338   if (recycle_log_number) {
1339     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1340                    "reusing log %" PRIu64 " from recycle list\n",
1341                    recycle_log_number);
1342     std::string old_log_fname =
1343         LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
1344     TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
1345     TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
1346     s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
1347                                &lfile, /*dbg=*/nullptr);
1348   } else {
1349     s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
1350   }
1351 
1352   if (s.ok()) {
1353     lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
1354     lfile->SetPreallocationBlockSize(preallocate_block_size);
1355 
1356     const auto& listeners = immutable_db_options_.listeners;
1357     std::unique_ptr<WritableFileWriter> file_writer(
1358         new WritableFileWriter(std::move(lfile), log_fname, opt_file_options,
1359                                env_, nullptr /* stats */, listeners));
1360     *new_log = new log::Writer(std::move(file_writer), log_file_num,
1361                                immutable_db_options_.recycle_log_file_num > 0,
1362                                immutable_db_options_.manual_wal_flush);
1363   }
1364   return s;
1365 }
1366 
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,const bool seq_per_batch,const bool batch_per_txn)1367 Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
1368                     const std::vector<ColumnFamilyDescriptor>& column_families,
1369                     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1370                     const bool seq_per_batch, const bool batch_per_txn) {
1371   Status s = SanitizeOptionsByTable(db_options, column_families);
1372   if (!s.ok()) {
1373     return s;
1374   }
1375 
1376   s = ValidateOptions(db_options, column_families);
1377   if (!s.ok()) {
1378     return s;
1379   }
1380 
1381   *dbptr = nullptr;
1382   handles->clear();
1383 
1384   size_t max_write_buffer_size = 0;
1385   for (auto cf : column_families) {
1386     max_write_buffer_size =
1387         std::max(max_write_buffer_size, cf.options.write_buffer_size);
1388   }
1389 
1390   DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
1391   s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
1392   if (s.ok()) {
1393     std::vector<std::string> paths;
1394     for (auto& db_path : impl->immutable_db_options_.db_paths) {
1395       paths.emplace_back(db_path.path);
1396     }
1397     for (auto& cf : column_families) {
1398       for (auto& cf_path : cf.options.cf_paths) {
1399         paths.emplace_back(cf_path.path);
1400       }
1401     }
1402     for (auto& path : paths) {
1403       s = impl->env_->CreateDirIfMissing(path);
1404       if (!s.ok()) {
1405         break;
1406       }
1407     }
1408 
1409     // For recovery from NoSpace() error, we can only handle
1410     // the case where the database is stored in a single path
1411     if (paths.size() <= 1) {
1412       impl->error_handler_.EnableAutoRecovery();
1413     }
1414   }
1415   if (s.ok()) {
1416     s = impl->CreateArchivalDirectory();
1417   }
1418   if (!s.ok()) {
1419     delete impl;
1420     return s;
1421   }
1422 
1423   impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
1424 
1425   impl->mutex_.Lock();
1426   // Handles create_if_missing, error_if_exists
1427   uint64_t recovered_seq(kMaxSequenceNumber);
1428   s = impl->Recover(column_families, false, false, false, &recovered_seq);
1429   if (s.ok()) {
1430     uint64_t new_log_number = impl->versions_->NewFileNumber();
1431     log::Writer* new_log = nullptr;
1432     const size_t preallocate_block_size =
1433         impl->GetWalPreallocateBlockSize(max_write_buffer_size);
1434     s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
1435                         preallocate_block_size, &new_log);
1436     if (s.ok()) {
1437       InstrumentedMutexLock wl(&impl->log_write_mutex_);
1438       impl->logfile_number_ = new_log_number;
1439       assert(new_log != nullptr);
1440       impl->logs_.emplace_back(new_log_number, new_log);
1441     }
1442 
1443     if (s.ok()) {
1444       // set column family handles
1445       for (auto cf : column_families) {
1446         auto cfd =
1447             impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
1448         if (cfd != nullptr) {
1449           handles->push_back(
1450               new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
1451           impl->NewThreadStatusCfInfo(cfd);
1452         } else {
1453           if (db_options.create_missing_column_families) {
1454             // missing column family, create it
1455             ColumnFamilyHandle* handle;
1456             impl->mutex_.Unlock();
1457             s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
1458             impl->mutex_.Lock();
1459             if (s.ok()) {
1460               handles->push_back(handle);
1461             } else {
1462               break;
1463             }
1464           } else {
1465             s = Status::InvalidArgument("Column family not found: ", cf.name);
1466             break;
1467           }
1468         }
1469       }
1470     }
1471     if (s.ok()) {
1472       SuperVersionContext sv_context(/* create_superversion */ true);
1473       for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1474         impl->InstallSuperVersionAndScheduleWork(
1475             cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
1476       }
1477       sv_context.Clean();
1478       if (impl->two_write_queues_) {
1479         impl->log_write_mutex_.Lock();
1480       }
1481       impl->alive_log_files_.push_back(
1482           DBImpl::LogFileNumberSize(impl->logfile_number_));
1483       if (impl->two_write_queues_) {
1484         impl->log_write_mutex_.Unlock();
1485       }
1486 
1487       impl->DeleteObsoleteFiles();
1488       s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
1489     }
1490     if (s.ok()) {
1491       // In WritePrepared there could be gap in sequence numbers. This breaks
1492       // the trick we use in kPointInTimeRecovery which assumes the first seq in
1493       // the log right after the corrupted log is one larger than the last seq
1494       // we read from the logs. To let this trick keep working, we add a dummy
1495       // entry with the expected sequence to the first log right after recovery.
1496       // In non-WritePrepared case also the new log after recovery could be
1497       // empty, and thus missing the consecutive seq hint to distinguish
1498       // middle-log corruption to corrupted-log-remained-after-recovery. This
1499       // case also will be addressed by a dummy write.
1500       if (recovered_seq != kMaxSequenceNumber) {
1501         WriteBatch empty_batch;
1502         WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
1503         WriteOptions write_options;
1504         uint64_t log_used, log_size;
1505         log::Writer* log_writer = impl->logs_.back().writer;
1506         s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
1507         if (s.ok()) {
1508           // Need to fsync, otherwise it might get lost after a power reset.
1509           s = impl->FlushWAL(false);
1510           if (s.ok()) {
1511             s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
1512           }
1513         }
1514       }
1515     }
1516   }
1517   if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
1518     // try to read format version but no need to fail Open() even if it fails
1519     s = impl->PersistentStatsProcessFormatVersion();
1520   }
1521 
1522   if (s.ok()) {
1523     for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1524       if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1525         auto* vstorage = cfd->current()->storage_info();
1526         for (int i = 1; i < vstorage->num_levels(); ++i) {
1527           int num_files = vstorage->NumLevelFiles(i);
1528           if (num_files > 0) {
1529             s = Status::InvalidArgument(
1530                 "Not all files are at level 0. Cannot "
1531                 "open with FIFO compaction style.");
1532             break;
1533           }
1534         }
1535       }
1536       if (!cfd->mem()->IsSnapshotSupported()) {
1537         impl->is_snapshot_supported_ = false;
1538       }
1539       if (cfd->ioptions()->merge_operator != nullptr &&
1540           !cfd->mem()->IsMergeOperatorSupported()) {
1541         s = Status::InvalidArgument(
1542             "The memtable of column family %s does not support merge operator "
1543             "its options.merge_operator is non-null",
1544             cfd->GetName().c_str());
1545       }
1546       if (!s.ok()) {
1547         break;
1548       }
1549     }
1550   }
1551   TEST_SYNC_POINT("DBImpl::Open:Opened");
1552   Status persist_options_status;
1553   if (s.ok()) {
1554     // Persist RocksDB Options before scheduling the compaction.
1555     // The WriteOptionsFile() will release and lock the mutex internally.
1556     persist_options_status = impl->WriteOptionsFile(
1557         false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1558 
1559     *dbptr = impl;
1560     impl->opened_successfully_ = true;
1561     impl->MaybeScheduleFlushOrCompaction();
1562   }
1563   impl->mutex_.Unlock();
1564 
1565 #ifndef ROCKSDB_LITE
1566   auto sfm = static_cast<SstFileManagerImpl*>(
1567       impl->immutable_db_options_.sst_file_manager.get());
1568   if (s.ok() && sfm) {
1569     // Notify SstFileManager about all sst files that already exist in
1570     // db_paths[0] and cf_paths[0] when the DB is opened.
1571 
1572     // SstFileManagerImpl needs to know sizes of the files. For files whose size
1573     // we already know (sst files that appear in manifest - typically that's the
1574     // vast majority of all files), we'll pass the size to SstFileManager.
1575     // For all other files SstFileManager will query the size from filesystem.
1576 
1577     std::vector<LiveFileMetaData> metadata;
1578 
1579     impl->mutex_.Lock();
1580     impl->versions_->GetLiveFilesMetaData(&metadata);
1581     impl->mutex_.Unlock();
1582 
1583     std::unordered_map<std::string, uint64_t> known_file_sizes;
1584     for (const auto& md : metadata) {
1585       std::string name = md.name;
1586       if (!name.empty() && name[0] == '/') {
1587         name = name.substr(1);
1588       }
1589       known_file_sizes[name] = md.size;
1590     }
1591 
1592     std::vector<std::string> paths;
1593     paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
1594     for (auto& cf : column_families) {
1595       if (!cf.options.cf_paths.empty()) {
1596         paths.emplace_back(cf.options.cf_paths[0].path);
1597       }
1598     }
1599     // Remove duplicate paths.
1600     std::sort(paths.begin(), paths.end());
1601     paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
1602     for (auto& path : paths) {
1603       std::vector<std::string> existing_files;
1604       impl->immutable_db_options_.env->GetChildren(path, &existing_files);
1605       for (auto& file_name : existing_files) {
1606         uint64_t file_number;
1607         FileType file_type;
1608         std::string file_path = path + "/" + file_name;
1609         if (ParseFileName(file_name, &file_number, &file_type) &&
1610             file_type == kTableFile) {
1611           if (known_file_sizes.count(file_name)) {
1612             // We're assuming that each sst file name exists in at most one of
1613             // the paths.
1614             sfm->OnAddFile(file_path, known_file_sizes.at(file_name),
1615                            /* compaction */ false);
1616           } else {
1617             sfm->OnAddFile(file_path);
1618           }
1619         }
1620       }
1621     }
1622 
1623     // Reserve some disk buffer space. This is a heuristic - when we run out
1624     // of disk space, this ensures that there is atleast write_buffer_size
1625     // amount of free space before we resume DB writes. In low disk space
1626     // conditions, we want to avoid a lot of small L0 files due to frequent
1627     // WAL write failures and resultant forced flushes
1628     sfm->ReserveDiskBuffer(max_write_buffer_size,
1629                            impl->immutable_db_options_.db_paths[0].path);
1630   }
1631 #endif  // !ROCKSDB_LITE
1632 
1633   if (s.ok()) {
1634     ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
1635                      impl);
1636     LogFlush(impl->immutable_db_options_.info_log);
1637     assert(impl->TEST_WALBufferIsEmpty());
1638     // If the assert above fails then we need to FlushWAL before returning
1639     // control back to the user.
1640     if (!persist_options_status.ok()) {
1641       s = Status::IOError(
1642           "DB::Open() failed --- Unable to persist Options file",
1643           persist_options_status.ToString());
1644     }
1645   }
1646   if (s.ok()) {
1647     impl->StartTimedTasks();
1648   }
1649   if (!s.ok()) {
1650     for (auto* h : *handles) {
1651       delete h;
1652     }
1653     handles->clear();
1654     delete impl;
1655     *dbptr = nullptr;
1656   }
1657   return s;
1658 }
1659 }  // namespace ROCKSDB_NAMESPACE
1660