1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <cinttypes>
12
13 #include "db/builder.h"
14 #include "db/error_handler.h"
15 #include "env/composite_env_wrapper.h"
16 #include "file/read_write_util.h"
17 #include "file/sst_file_manager_impl.h"
18 #include "file/writable_file_writer.h"
19 #include "monitoring/persistent_stats_history.h"
20 #include "options/options_helper.h"
21 #include "rocksdb/wal_filter.h"
22 #include "table/block_based/block_based_table_factory.h"
23 #include "test_util/sync_point.h"
24 #include "util/rate_limiter.h"
25
26 namespace ROCKSDB_NAMESPACE {
SanitizeOptions(const std::string & dbname,const Options & src)27 Options SanitizeOptions(const std::string& dbname, const Options& src) {
28 auto db_options = SanitizeOptions(dbname, DBOptions(src));
29 ImmutableDBOptions immutable_db_options(db_options);
30 auto cf_options =
31 SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
32 return Options(db_options, cf_options);
33 }
34
SanitizeOptions(const std::string & dbname,const DBOptions & src)35 DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
36 DBOptions result(src);
37
38 if (result.env == nullptr) {
39 result.env = Env::Default();
40 }
41
42 // result.max_open_files means an "infinite" open files.
43 if (result.max_open_files != -1) {
44 int max_max_open_files = port::GetMaxOpenFiles();
45 if (max_max_open_files == -1) {
46 max_max_open_files = 0x400000;
47 }
48 ClipToRange(&result.max_open_files, 20, max_max_open_files);
49 TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
50 &result.max_open_files);
51 }
52
53 if (result.info_log == nullptr) {
54 Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
55 if (!s.ok()) {
56 // No place suitable for logging
57 result.info_log = nullptr;
58 }
59 }
60
61 if (!result.write_buffer_manager) {
62 result.write_buffer_manager.reset(
63 new WriteBufferManager(result.db_write_buffer_size));
64 }
65 auto bg_job_limits = DBImpl::GetBGJobLimits(
66 result.max_background_flushes, result.max_background_compactions,
67 result.max_background_jobs, true /* parallelize_compactions */);
68 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
69 Env::Priority::LOW);
70 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
71 Env::Priority::HIGH);
72
73 if (result.rate_limiter.get() != nullptr) {
74 if (result.bytes_per_sync == 0) {
75 result.bytes_per_sync = 1024 * 1024;
76 }
77 }
78
79 if (result.delayed_write_rate == 0) {
80 if (result.rate_limiter.get() != nullptr) {
81 result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
82 }
83 if (result.delayed_write_rate == 0) {
84 result.delayed_write_rate = 16 * 1024 * 1024;
85 }
86 }
87
88 if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
89 result.recycle_log_file_num = false;
90 }
91
92 if (result.recycle_log_file_num &&
93 (result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
94 result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
95 // kPointInTimeRecovery is inconsistent with recycle log file feature since
96 // we define the "end" of the log as the first corrupt record we encounter.
97 // kAbsoluteConsistency doesn't make sense because even a clean
98 // shutdown leaves old junk at the end of the log file.
99 result.recycle_log_file_num = 0;
100 }
101
102 if (result.wal_dir.empty()) {
103 // Use dbname as default
104 result.wal_dir = dbname;
105 }
106 if (result.wal_dir.back() == '/') {
107 result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
108 }
109
110 if (result.db_paths.size() == 0) {
111 result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
112 }
113
114 if (result.use_direct_reads && result.compaction_readahead_size == 0) {
115 TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
116 result.compaction_readahead_size = 1024 * 1024 * 2;
117 }
118
119 if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
120 result.new_table_reader_for_compaction_inputs = true;
121 }
122
123 // Force flush on DB open if 2PC is enabled, since with 2PC we have no
124 // guarantee that consecutive log files have consecutive sequence id, which
125 // make recovery complicated.
126 if (result.allow_2pc) {
127 result.avoid_flush_during_recovery = false;
128 }
129
130 #ifndef ROCKSDB_LITE
131 ImmutableDBOptions immutable_db_options(result);
132 if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
133 // Either the WAL dir and db_paths[0]/db_name are not the same, or we
134 // cannot tell for sure. In either case, assume they're different and
135 // explicitly cleanup the trash log files (bypass DeleteScheduler)
136 // Do this first so even if we end up calling
137 // DeleteScheduler::CleanupDirectory on the same dir later, it will be
138 // safe
139 std::vector<std::string> filenames;
140 result.env->GetChildren(result.wal_dir, &filenames);
141 for (std::string& filename : filenames) {
142 if (filename.find(".log.trash", filename.length() -
143 std::string(".log.trash").length()) !=
144 std::string::npos) {
145 std::string trash_file = result.wal_dir + "/" + filename;
146 result.env->DeleteFile(trash_file);
147 }
148 }
149 }
150 // When the DB is stopped, it's possible that there are some .trash files that
151 // were not deleted yet, when we open the DB we will find these .trash files
152 // and schedule them to be deleted (or delete immediately if SstFileManager
153 // was not used)
154 auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
155 for (size_t i = 0; i < result.db_paths.size(); i++) {
156 DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path);
157 }
158
159 // Create a default SstFileManager for purposes of tracking compaction size
160 // and facilitating recovery from out of space errors.
161 if (result.sst_file_manager.get() == nullptr) {
162 std::shared_ptr<SstFileManager> sst_file_manager(
163 NewSstFileManager(result.env, result.info_log));
164 result.sst_file_manager = sst_file_manager;
165 }
166 #endif
167
168 if (!result.paranoid_checks) {
169 result.skip_checking_sst_file_sizes_on_db_open = true;
170 ROCKS_LOG_INFO(result.info_log,
171 "file size check will be skipped during open.");
172 }
173
174 return result;
175 }
176
177 namespace {
SanitizeOptionsByTable(const DBOptions & db_opts,const std::vector<ColumnFamilyDescriptor> & column_families)178 Status SanitizeOptionsByTable(
179 const DBOptions& db_opts,
180 const std::vector<ColumnFamilyDescriptor>& column_families) {
181 Status s;
182 for (auto cf : column_families) {
183 s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
184 if (!s.ok()) {
185 return s;
186 }
187 }
188 return Status::OK();
189 }
190 } // namespace
191
ValidateOptions(const DBOptions & db_options,const std::vector<ColumnFamilyDescriptor> & column_families)192 Status DBImpl::ValidateOptions(
193 const DBOptions& db_options,
194 const std::vector<ColumnFamilyDescriptor>& column_families) {
195 Status s;
196 for (auto& cfd : column_families) {
197 s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
198 if (!s.ok()) {
199 return s;
200 }
201 }
202 s = ValidateOptions(db_options);
203 return s;
204 }
205
ValidateOptions(const DBOptions & db_options)206 Status DBImpl::ValidateOptions(const DBOptions& db_options) {
207 if (db_options.db_paths.size() > 4) {
208 return Status::NotSupported(
209 "More than four DB paths are not supported yet. ");
210 }
211
212 if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
213 // Protect against assert in PosixMMapReadableFile constructor
214 return Status::NotSupported(
215 "If memory mapped reads (allow_mmap_reads) are enabled "
216 "then direct I/O reads (use_direct_reads) must be disabled. ");
217 }
218
219 if (db_options.allow_mmap_writes &&
220 db_options.use_direct_io_for_flush_and_compaction) {
221 return Status::NotSupported(
222 "If memory mapped writes (allow_mmap_writes) are enabled "
223 "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
224 "be disabled. ");
225 }
226
227 if (db_options.keep_log_file_num == 0) {
228 return Status::InvalidArgument("keep_log_file_num must be greater than 0");
229 }
230
231 if (db_options.unordered_write &&
232 !db_options.allow_concurrent_memtable_write) {
233 return Status::InvalidArgument(
234 "unordered_write is incompatible with !allow_concurrent_memtable_write");
235 }
236
237 if (db_options.unordered_write && db_options.enable_pipelined_write) {
238 return Status::InvalidArgument(
239 "unordered_write is incompatible with enable_pipelined_write");
240 }
241
242 if (db_options.atomic_flush && db_options.enable_pipelined_write) {
243 return Status::InvalidArgument(
244 "atomic_flush is incompatible with enable_pipelined_write");
245 }
246
247 // TODO remove this restriction
248 if (db_options.atomic_flush && db_options.best_efforts_recovery) {
249 return Status::InvalidArgument(
250 "atomic_flush is currently incompatible with best-efforts recovery");
251 }
252
253 return Status::OK();
254 }
255
NewDB()256 Status DBImpl::NewDB() {
257 VersionEdit new_db;
258 Status s = SetIdentityFile(env_, dbname_);
259 if (!s.ok()) {
260 return s;
261 }
262 if (immutable_db_options_.write_dbid_to_manifest) {
263 std::string temp_db_id;
264 GetDbIdentityFromIdentityFile(&temp_db_id);
265 new_db.SetDBId(temp_db_id);
266 }
267 new_db.SetLogNumber(0);
268 new_db.SetNextFile(2);
269 new_db.SetLastSequence(0);
270
271 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
272 const std::string manifest = DescriptorFileName(dbname_, 1);
273 {
274 std::unique_ptr<FSWritableFile> file;
275 FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
276 s = NewWritableFile(fs_.get(), manifest, &file, file_options);
277 if (!s.ok()) {
278 return s;
279 }
280 file->SetPreallocationBlockSize(
281 immutable_db_options_.manifest_preallocation_size);
282 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
283 std::move(file), manifest, file_options, env_, nullptr /* stats */,
284 immutable_db_options_.listeners));
285 log::Writer log(std::move(file_writer), 0, false);
286 std::string record;
287 new_db.EncodeTo(&record);
288 s = log.AddRecord(record);
289 if (s.ok()) {
290 s = SyncManifest(env_, &immutable_db_options_, log.file());
291 }
292 }
293 if (s.ok()) {
294 // Make "CURRENT" file that points to the new manifest file.
295 s = SetCurrentFile(fs_.get(), dbname_, 1, directories_.GetDbDir());
296 } else {
297 fs_->DeleteFile(manifest, IOOptions(), nullptr);
298 }
299 return s;
300 }
301
CreateAndNewDirectory(FileSystem * fs,const std::string & dirname,std::unique_ptr<FSDirectory> * directory)302 IOStatus DBImpl::CreateAndNewDirectory(
303 FileSystem* fs, const std::string& dirname,
304 std::unique_ptr<FSDirectory>* directory) {
305 // We call CreateDirIfMissing() as the directory may already exist (if we
306 // are reopening a DB), when this happens we don't want creating the
307 // directory to cause an error. However, we need to check if creating the
308 // directory fails or else we may get an obscure message about the lock
309 // file not existing. One real-world example of this occurring is if
310 // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
311 // when dbname_ is "dir/db" but when "dir" doesn't exist.
312 IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
313 if (!io_s.ok()) {
314 return io_s;
315 }
316 return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
317 }
318
SetDirectories(FileSystem * fs,const std::string & dbname,const std::string & wal_dir,const std::vector<DbPath> & data_paths)319 IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
320 const std::string& wal_dir,
321 const std::vector<DbPath>& data_paths) {
322 IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
323 if (!io_s.ok()) {
324 return io_s;
325 }
326 if (!wal_dir.empty() && dbname != wal_dir) {
327 io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
328 if (!io_s.ok()) {
329 return io_s;
330 }
331 }
332
333 data_dirs_.clear();
334 for (auto& p : data_paths) {
335 const std::string db_path = p.path;
336 if (db_path == dbname) {
337 data_dirs_.emplace_back(nullptr);
338 } else {
339 std::unique_ptr<FSDirectory> path_directory;
340 io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
341 if (!io_s.ok()) {
342 return io_s;
343 }
344 data_dirs_.emplace_back(path_directory.release());
345 }
346 }
347 assert(data_dirs_.size() == data_paths.size());
348 return IOStatus::OK();
349 }
350
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,bool error_if_log_file_exist,bool error_if_data_exists_in_logs,uint64_t * recovered_seq)351 Status DBImpl::Recover(
352 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
353 bool error_if_log_file_exist, bool error_if_data_exists_in_logs,
354 uint64_t* recovered_seq) {
355 mutex_.AssertHeld();
356
357 bool is_new_db = false;
358 assert(db_lock_ == nullptr);
359 if (!read_only) {
360 Status s = directories_.SetDirectories(fs_.get(), dbname_,
361 immutable_db_options_.wal_dir,
362 immutable_db_options_.db_paths);
363 if (!s.ok()) {
364 return s;
365 }
366
367 s = env_->LockFile(LockFileName(dbname_), &db_lock_);
368 if (!s.ok()) {
369 return s;
370 }
371
372 std::string current_fname = CurrentFileName(dbname_);
373 s = env_->FileExists(current_fname);
374 if (s.IsNotFound()) {
375 if (immutable_db_options_.create_if_missing) {
376 s = NewDB();
377 is_new_db = true;
378 if (!s.ok()) {
379 return s;
380 }
381 } else {
382 return Status::InvalidArgument(
383 current_fname, "does not exist (create_if_missing is false)");
384 }
385 } else if (s.ok()) {
386 if (immutable_db_options_.error_if_exists) {
387 return Status::InvalidArgument(dbname_,
388 "exists (error_if_exists is true)");
389 }
390 } else {
391 // Unexpected error reading file
392 assert(s.IsIOError());
393 return s;
394 }
395 // Verify compatibility of file_options_ and filesystem
396 {
397 std::unique_ptr<FSRandomAccessFile> idfile;
398 FileOptions customized_fs(file_options_);
399 customized_fs.use_direct_reads |=
400 immutable_db_options_.use_direct_io_for_flush_and_compaction;
401 s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
402 nullptr);
403 if (!s.ok()) {
404 std::string error_str = s.ToString();
405 // Check if unsupported Direct I/O is the root cause
406 customized_fs.use_direct_reads = false;
407 s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
408 nullptr);
409 if (s.ok()) {
410 return Status::InvalidArgument(
411 "Direct I/O is not supported by the specified DB.");
412 } else {
413 return Status::InvalidArgument(
414 "Found options incompatible with filesystem", error_str.c_str());
415 }
416 }
417 }
418 }
419 assert(db_id_.empty());
420 Status s;
421 bool missing_table_file = false;
422 if (!immutable_db_options_.best_efforts_recovery) {
423 s = versions_->Recover(column_families, read_only, &db_id_);
424 } else {
425 s = versions_->TryRecover(column_families, read_only, &db_id_,
426 &missing_table_file);
427 if (s.ok()) {
428 s = CleanupFilesAfterRecovery();
429 }
430 }
431 if (!s.ok()) {
432 return s;
433 }
434 // Happens when immutable_db_options_.write_dbid_to_manifest is set to true
435 // the very first time.
436 if (db_id_.empty()) {
437 // Check for the IDENTITY file and create it if not there.
438 s = fs_->FileExists(IdentityFileName(dbname_), IOOptions(), nullptr);
439 // Typically Identity file is created in NewDB() and for some reason if
440 // it is no longer available then at this point DB ID is not in Identity
441 // file or Manifest.
442 if (s.IsNotFound()) {
443 s = SetIdentityFile(env_, dbname_);
444 if (!s.ok()) {
445 return s;
446 }
447 } else if (!s.ok()) {
448 assert(s.IsIOError());
449 return s;
450 }
451 s = GetDbIdentityFromIdentityFile(&db_id_);
452 if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
453 VersionEdit edit;
454 edit.SetDBId(db_id_);
455 Options options;
456 MutableCFOptions mutable_cf_options(options);
457 versions_->db_id_ = db_id_;
458 s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
459 mutable_cf_options, &edit, &mutex_, nullptr,
460 false);
461 }
462 } else {
463 s = SetIdentityFile(env_, dbname_, db_id_);
464 }
465
466 if (immutable_db_options_.paranoid_checks && s.ok()) {
467 s = CheckConsistency();
468 }
469 if (s.ok() && !read_only) {
470 std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
471 for (auto cfd : *versions_->GetColumnFamilySet()) {
472 s = cfd->AddDirectories(&created_dirs);
473 if (!s.ok()) {
474 return s;
475 }
476 }
477 }
478 // DB mutex is already held
479 if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
480 s = InitPersistStatsColumnFamily();
481 }
482
483 if (s.ok()) {
484 // Initial max_total_in_memory_state_ before recovery logs. Log recovery
485 // may check this value to decide whether to flush.
486 max_total_in_memory_state_ = 0;
487 for (auto cfd : *versions_->GetColumnFamilySet()) {
488 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
489 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
490 mutable_cf_options->max_write_buffer_number;
491 }
492
493 SequenceNumber next_sequence(kMaxSequenceNumber);
494 default_cf_handle_ = new ColumnFamilyHandleImpl(
495 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
496 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
497 // TODO(Zhongyi): handle single_column_family_mode_ when
498 // persistent_stats is enabled
499 single_column_family_mode_ =
500 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
501
502 // Recover from all newer log files than the ones named in the
503 // descriptor (new log files may have been added by the previous
504 // incarnation without registering them in the descriptor).
505 //
506 // Note that prev_log_number() is no longer used, but we pay
507 // attention to it in case we are recovering a database
508 // produced by an older version of rocksdb.
509 std::vector<std::string> filenames;
510 if (!immutable_db_options_.best_efforts_recovery) {
511 s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
512 }
513 if (s.IsNotFound()) {
514 return Status::InvalidArgument("wal_dir not found",
515 immutable_db_options_.wal_dir);
516 } else if (!s.ok()) {
517 return s;
518 }
519
520 std::vector<uint64_t> logs;
521 for (size_t i = 0; i < filenames.size(); i++) {
522 uint64_t number;
523 FileType type;
524 if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
525 if (is_new_db) {
526 return Status::Corruption(
527 "While creating a new Db, wal_dir contains "
528 "existing log file: ",
529 filenames[i]);
530 } else {
531 logs.push_back(number);
532 }
533 }
534 }
535
536 if (logs.size() > 0) {
537 if (error_if_log_file_exist) {
538 return Status::Corruption(
539 "The db was opened in readonly mode with error_if_log_file_exist"
540 "flag but a log file already exists");
541 } else if (error_if_data_exists_in_logs) {
542 for (auto& log : logs) {
543 std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
544 uint64_t bytes;
545 s = env_->GetFileSize(fname, &bytes);
546 if (s.ok()) {
547 if (bytes > 0) {
548 return Status::Corruption(
549 "error_if_data_exists_in_logs is set but there are data "
550 " in log files.");
551 }
552 }
553 }
554 }
555 }
556
557 if (!logs.empty()) {
558 // Recover in the order in which the logs were generated
559 std::sort(logs.begin(), logs.end());
560 bool corrupted_log_found = false;
561 s = RecoverLogFiles(logs, &next_sequence, read_only,
562 &corrupted_log_found);
563 if (corrupted_log_found && recovered_seq != nullptr) {
564 *recovered_seq = next_sequence;
565 }
566 if (!s.ok()) {
567 // Clear memtables if recovery failed
568 for (auto cfd : *versions_->GetColumnFamilySet()) {
569 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
570 kMaxSequenceNumber);
571 }
572 }
573 }
574 }
575
576 if (read_only) {
577 // If we are opening as read-only, we need to update options_file_number_
578 // to reflect the most recent OPTIONS file. It does not matter for regular
579 // read-write db instance because options_file_number_ will later be
580 // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
581 std::vector<std::string> file_names;
582 if (s.ok()) {
583 s = env_->GetChildren(GetName(), &file_names);
584 }
585 if (s.ok()) {
586 uint64_t number = 0;
587 uint64_t options_file_number = 0;
588 FileType type;
589 for (const auto& fname : file_names) {
590 if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
591 options_file_number = std::max(number, options_file_number);
592 }
593 }
594 versions_->options_file_number_ = options_file_number;
595 }
596 }
597
598 return s;
599 }
600
PersistentStatsProcessFormatVersion()601 Status DBImpl::PersistentStatsProcessFormatVersion() {
602 mutex_.AssertHeld();
603 Status s;
604 // persist version when stats CF doesn't exist
605 bool should_persist_format_version = !persistent_stats_cfd_exists_;
606 mutex_.Unlock();
607 if (persistent_stats_cfd_exists_) {
608 // Check persistent stats format version compatibility. Drop and recreate
609 // persistent stats CF if format version is incompatible
610 uint64_t format_version_recovered = 0;
611 Status s_format = DecodePersistentStatsVersionNumber(
612 this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
613 uint64_t compatible_version_recovered = 0;
614 Status s_compatible = DecodePersistentStatsVersionNumber(
615 this, StatsVersionKeyType::kCompatibleVersion,
616 &compatible_version_recovered);
617 // abort reading from existing stats CF if any of following is true:
618 // 1. failed to read format version or compatible version from disk
619 // 2. sst's format version is greater than current format version, meaning
620 // this sst is encoded with a newer RocksDB release, and current compatible
621 // version is below the sst's compatible version
622 if (!s_format.ok() || !s_compatible.ok() ||
623 (kStatsCFCurrentFormatVersion < format_version_recovered &&
624 kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
625 if (!s_format.ok() || !s_compatible.ok()) {
626 ROCKS_LOG_INFO(
627 immutable_db_options_.info_log,
628 "Reading persistent stats version key failed. Format key: %s, "
629 "compatible key: %s",
630 s_format.ToString().c_str(), s_compatible.ToString().c_str());
631 } else {
632 ROCKS_LOG_INFO(
633 immutable_db_options_.info_log,
634 "Disable persistent stats due to corrupted or incompatible format "
635 "version\n");
636 }
637 DropColumnFamily(persist_stats_cf_handle_);
638 DestroyColumnFamilyHandle(persist_stats_cf_handle_);
639 ColumnFamilyHandle* handle = nullptr;
640 ColumnFamilyOptions cfo;
641 OptimizeForPersistentStats(&cfo);
642 s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
643 persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
644 // should also persist version here because old stats CF is discarded
645 should_persist_format_version = true;
646 }
647 }
648 if (s.ok() && should_persist_format_version) {
649 // Persistent stats CF being created for the first time, need to write
650 // format version key
651 WriteBatch batch;
652 batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
653 ToString(kStatsCFCurrentFormatVersion));
654 batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
655 ToString(kStatsCFCompatibleFormatVersion));
656 WriteOptions wo;
657 wo.low_pri = true;
658 wo.no_slowdown = true;
659 wo.sync = false;
660 s = Write(wo, &batch);
661 }
662 mutex_.Lock();
663 return s;
664 }
665
InitPersistStatsColumnFamily()666 Status DBImpl::InitPersistStatsColumnFamily() {
667 mutex_.AssertHeld();
668 assert(!persist_stats_cf_handle_);
669 ColumnFamilyData* persistent_stats_cfd =
670 versions_->GetColumnFamilySet()->GetColumnFamily(
671 kPersistentStatsColumnFamilyName);
672 persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
673
674 Status s;
675 if (persistent_stats_cfd != nullptr) {
676 // We are recovering from a DB which already contains persistent stats CF,
677 // the CF is already created in VersionSet::ApplyOneVersionEdit, but
678 // column family handle was not. Need to explicitly create handle here.
679 persist_stats_cf_handle_ =
680 new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
681 } else {
682 mutex_.Unlock();
683 ColumnFamilyHandle* handle = nullptr;
684 ColumnFamilyOptions cfo;
685 OptimizeForPersistentStats(&cfo);
686 s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
687 persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
688 mutex_.Lock();
689 }
690 return s;
691 }
692
693 // REQUIRES: log_numbers are sorted in ascending order
RecoverLogFiles(const std::vector<uint64_t> & log_numbers,SequenceNumber * next_sequence,bool read_only,bool * corrupted_log_found)694 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
695 SequenceNumber* next_sequence, bool read_only,
696 bool* corrupted_log_found) {
697 struct LogReporter : public log::Reader::Reporter {
698 Env* env;
699 Logger* info_log;
700 const char* fname;
701 Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
702 void Corruption(size_t bytes, const Status& s) override {
703 ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
704 (this->status == nullptr ? "(ignoring error) " : ""),
705 fname, static_cast<int>(bytes), s.ToString().c_str());
706 if (this->status != nullptr && this->status->ok()) {
707 *this->status = s;
708 }
709 }
710 };
711
712 mutex_.AssertHeld();
713 Status status;
714 std::unordered_map<int, VersionEdit> version_edits;
715 // no need to refcount because iteration is under mutex
716 for (auto cfd : *versions_->GetColumnFamilySet()) {
717 VersionEdit edit;
718 edit.SetColumnFamily(cfd->GetID());
719 version_edits.insert({cfd->GetID(), edit});
720 }
721 int job_id = next_job_id_.fetch_add(1);
722 {
723 auto stream = event_logger_.Log();
724 stream << "job" << job_id << "event"
725 << "recovery_started";
726 stream << "log_files";
727 stream.StartArray();
728 for (auto log_number : log_numbers) {
729 stream << log_number;
730 }
731 stream.EndArray();
732 }
733
734 #ifndef ROCKSDB_LITE
735 if (immutable_db_options_.wal_filter != nullptr) {
736 std::map<std::string, uint32_t> cf_name_id_map;
737 std::map<uint32_t, uint64_t> cf_lognumber_map;
738 for (auto cfd : *versions_->GetColumnFamilySet()) {
739 cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
740 cf_lognumber_map.insert(
741 std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
742 }
743
744 immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
745 cf_name_id_map);
746 }
747 #endif
748
749 bool stop_replay_by_wal_filter = false;
750 bool stop_replay_for_corruption = false;
751 bool flushed = false;
752 uint64_t corrupted_log_number = kMaxSequenceNumber;
753 uint64_t min_log_number = MinLogNumberToKeep();
754 for (auto log_number : log_numbers) {
755 if (log_number < min_log_number) {
756 ROCKS_LOG_INFO(immutable_db_options_.info_log,
757 "Skipping log #%" PRIu64
758 " since it is older than min log to keep #%" PRIu64,
759 log_number, min_log_number);
760 continue;
761 }
762 // The previous incarnation may not have written any MANIFEST
763 // records after allocating this log number. So we manually
764 // update the file number allocation counter in VersionSet.
765 versions_->MarkFileNumberUsed(log_number);
766 // Open the log file
767 std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
768
769 ROCKS_LOG_INFO(immutable_db_options_.info_log,
770 "Recovering log #%" PRIu64 " mode %d", log_number,
771 static_cast<int>(immutable_db_options_.wal_recovery_mode));
772 auto logFileDropped = [this, &fname]() {
773 uint64_t bytes;
774 if (env_->GetFileSize(fname, &bytes).ok()) {
775 auto info_log = immutable_db_options_.info_log.get();
776 ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
777 static_cast<int>(bytes));
778 }
779 };
780 if (stop_replay_by_wal_filter) {
781 logFileDropped();
782 continue;
783 }
784
785 std::unique_ptr<SequentialFileReader> file_reader;
786 {
787 std::unique_ptr<FSSequentialFile> file;
788 status = fs_->NewSequentialFile(fname,
789 fs_->OptimizeForLogRead(file_options_),
790 &file, nullptr);
791 if (!status.ok()) {
792 MaybeIgnoreError(&status);
793 if (!status.ok()) {
794 return status;
795 } else {
796 // Fail with one log file, but that's ok.
797 // Try next one.
798 continue;
799 }
800 }
801 file_reader.reset(new SequentialFileReader(
802 std::move(file), fname, immutable_db_options_.log_readahead_size));
803 }
804
805 // Create the log reader.
806 LogReporter reporter;
807 reporter.env = env_;
808 reporter.info_log = immutable_db_options_.info_log.get();
809 reporter.fname = fname.c_str();
810 if (!immutable_db_options_.paranoid_checks ||
811 immutable_db_options_.wal_recovery_mode ==
812 WALRecoveryMode::kSkipAnyCorruptedRecords) {
813 reporter.status = nullptr;
814 } else {
815 reporter.status = &status;
816 }
817 // We intentially make log::Reader do checksumming even if
818 // paranoid_checks==false so that corruptions cause entire commits
819 // to be skipped instead of propagating bad information (like overly
820 // large sequence numbers).
821 log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
822 &reporter, true /*checksum*/, log_number);
823
824 // Determine if we should tolerate incomplete records at the tail end of the
825 // Read all the records and add to a memtable
826 std::string scratch;
827 Slice record;
828 WriteBatch batch;
829
830 while (!stop_replay_by_wal_filter &&
831 reader.ReadRecord(&record, &scratch,
832 immutable_db_options_.wal_recovery_mode) &&
833 status.ok()) {
834 if (record.size() < WriteBatchInternal::kHeader) {
835 reporter.Corruption(record.size(),
836 Status::Corruption("log record too small"));
837 continue;
838 }
839 WriteBatchInternal::SetContents(&batch, record);
840 SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
841
842 if (immutable_db_options_.wal_recovery_mode ==
843 WALRecoveryMode::kPointInTimeRecovery) {
844 // In point-in-time recovery mode, if sequence id of log files are
845 // consecutive, we continue recovery despite corruption. This could
846 // happen when we open and write to a corrupted DB, where sequence id
847 // will start from the last sequence id we recovered.
848 if (sequence == *next_sequence) {
849 stop_replay_for_corruption = false;
850 }
851 if (stop_replay_for_corruption) {
852 logFileDropped();
853 break;
854 }
855 }
856
857 #ifndef ROCKSDB_LITE
858 if (immutable_db_options_.wal_filter != nullptr) {
859 WriteBatch new_batch;
860 bool batch_changed = false;
861
862 WalFilter::WalProcessingOption wal_processing_option =
863 immutable_db_options_.wal_filter->LogRecordFound(
864 log_number, fname, batch, &new_batch, &batch_changed);
865
866 switch (wal_processing_option) {
867 case WalFilter::WalProcessingOption::kContinueProcessing:
868 // do nothing, proceeed normally
869 break;
870 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
871 // skip current record
872 continue;
873 case WalFilter::WalProcessingOption::kStopReplay:
874 // skip current record and stop replay
875 stop_replay_by_wal_filter = true;
876 continue;
877 case WalFilter::WalProcessingOption::kCorruptedRecord: {
878 status =
879 Status::Corruption("Corruption reported by Wal Filter ",
880 immutable_db_options_.wal_filter->Name());
881 MaybeIgnoreError(&status);
882 if (!status.ok()) {
883 reporter.Corruption(record.size(), status);
884 continue;
885 }
886 break;
887 }
888 default: {
889 assert(false); // unhandled case
890 status = Status::NotSupported(
891 "Unknown WalProcessingOption returned"
892 " by Wal Filter ",
893 immutable_db_options_.wal_filter->Name());
894 MaybeIgnoreError(&status);
895 if (!status.ok()) {
896 return status;
897 } else {
898 // Ignore the error with current record processing.
899 continue;
900 }
901 }
902 }
903
904 if (batch_changed) {
905 // Make sure that the count in the new batch is
906 // within the orignal count.
907 int new_count = WriteBatchInternal::Count(&new_batch);
908 int original_count = WriteBatchInternal::Count(&batch);
909 if (new_count > original_count) {
910 ROCKS_LOG_FATAL(
911 immutable_db_options_.info_log,
912 "Recovering log #%" PRIu64
913 " mode %d log filter %s returned "
914 "more records (%d) than original (%d) which is not allowed. "
915 "Aborting recovery.",
916 log_number,
917 static_cast<int>(immutable_db_options_.wal_recovery_mode),
918 immutable_db_options_.wal_filter->Name(), new_count,
919 original_count);
920 status = Status::NotSupported(
921 "More than original # of records "
922 "returned by Wal Filter ",
923 immutable_db_options_.wal_filter->Name());
924 return status;
925 }
926 // Set the same sequence number in the new_batch
927 // as the original batch.
928 WriteBatchInternal::SetSequence(&new_batch,
929 WriteBatchInternal::Sequence(&batch));
930 batch = new_batch;
931 }
932 }
933 #endif // ROCKSDB_LITE
934
935 // If column family was not found, it might mean that the WAL write
936 // batch references to the column family that was dropped after the
937 // insert. We don't want to fail the whole write batch in that case --
938 // we just ignore the update.
939 // That's why we set ignore missing column families to true
940 bool has_valid_writes = false;
941 status = WriteBatchInternal::InsertInto(
942 &batch, column_family_memtables_.get(), &flush_scheduler_,
943 &trim_history_scheduler_, true, log_number, this,
944 false /* concurrent_memtable_writes */, next_sequence,
945 &has_valid_writes, seq_per_batch_, batch_per_txn_);
946 MaybeIgnoreError(&status);
947 if (!status.ok()) {
948 // We are treating this as a failure while reading since we read valid
949 // blocks that do not form coherent data
950 reporter.Corruption(record.size(), status);
951 continue;
952 }
953
954 if (has_valid_writes && !read_only) {
955 // we can do this because this is called before client has access to the
956 // DB and there is only a single thread operating on DB
957 ColumnFamilyData* cfd;
958
959 while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
960 cfd->UnrefAndTryDelete();
961 // If this asserts, it means that InsertInto failed in
962 // filtering updates to already-flushed column families
963 assert(cfd->GetLogNumber() <= log_number);
964 auto iter = version_edits.find(cfd->GetID());
965 assert(iter != version_edits.end());
966 VersionEdit* edit = &iter->second;
967 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
968 if (!status.ok()) {
969 // Reflect errors immediately so that conditions like full
970 // file-systems cause the DB::Open() to fail.
971 return status;
972 }
973 flushed = true;
974
975 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
976 *next_sequence);
977 }
978 }
979 }
980
981 if (!status.ok()) {
982 if (status.IsNotSupported()) {
983 // We should not treat NotSupported as corruption. It is rather a clear
984 // sign that we are processing a WAL that is produced by an incompatible
985 // version of the code.
986 return status;
987 }
988 if (immutable_db_options_.wal_recovery_mode ==
989 WALRecoveryMode::kSkipAnyCorruptedRecords) {
990 // We should ignore all errors unconditionally
991 status = Status::OK();
992 } else if (immutable_db_options_.wal_recovery_mode ==
993 WALRecoveryMode::kPointInTimeRecovery) {
994 // We should ignore the error but not continue replaying
995 status = Status::OK();
996 stop_replay_for_corruption = true;
997 corrupted_log_number = log_number;
998 if (corrupted_log_found != nullptr) {
999 *corrupted_log_found = true;
1000 }
1001 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1002 "Point in time recovered to log #%" PRIu64
1003 " seq #%" PRIu64,
1004 log_number, *next_sequence);
1005 } else {
1006 assert(immutable_db_options_.wal_recovery_mode ==
1007 WALRecoveryMode::kTolerateCorruptedTailRecords ||
1008 immutable_db_options_.wal_recovery_mode ==
1009 WALRecoveryMode::kAbsoluteConsistency);
1010 return status;
1011 }
1012 }
1013
1014 flush_scheduler_.Clear();
1015 trim_history_scheduler_.Clear();
1016 auto last_sequence = *next_sequence - 1;
1017 if ((*next_sequence != kMaxSequenceNumber) &&
1018 (versions_->LastSequence() <= last_sequence)) {
1019 versions_->SetLastAllocatedSequence(last_sequence);
1020 versions_->SetLastPublishedSequence(last_sequence);
1021 versions_->SetLastSequence(last_sequence);
1022 }
1023 }
1024 // Compare the corrupted log number to all columnfamily's current log number.
1025 // Abort Open() if any column family's log number is greater than
1026 // the corrupted log number, which means CF contains data beyond the point of
1027 // corruption. This could during PIT recovery when the WAL is corrupted and
1028 // some (but not all) CFs are flushed
1029 // Exclude the PIT case where no log is dropped after the corruption point.
1030 // This is to cover the case for empty logs after corrupted log, in which we
1031 // don't reset stop_replay_for_corruption.
1032 if (stop_replay_for_corruption == true &&
1033 (immutable_db_options_.wal_recovery_mode ==
1034 WALRecoveryMode::kPointInTimeRecovery ||
1035 immutable_db_options_.wal_recovery_mode ==
1036 WALRecoveryMode::kTolerateCorruptedTailRecords)) {
1037 for (auto cfd : *versions_->GetColumnFamilySet()) {
1038 if (cfd->GetLogNumber() > corrupted_log_number) {
1039 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1040 "Column family inconsistency: SST file contains data"
1041 " beyond the point of corruption.");
1042 return Status::Corruption("SST file is ahead of WALs");
1043 }
1044 }
1045 }
1046
1047 // True if there's any data in the WALs; if not, we can skip re-processing
1048 // them later
1049 bool data_seen = false;
1050 if (!read_only) {
1051 // no need to refcount since client still doesn't have access
1052 // to the DB and can not drop column families while we iterate
1053 auto max_log_number = log_numbers.back();
1054 for (auto cfd : *versions_->GetColumnFamilySet()) {
1055 auto iter = version_edits.find(cfd->GetID());
1056 assert(iter != version_edits.end());
1057 VersionEdit* edit = &iter->second;
1058
1059 if (cfd->GetLogNumber() > max_log_number) {
1060 // Column family cfd has already flushed the data
1061 // from all logs. Memtable has to be empty because
1062 // we filter the updates based on log_number
1063 // (in WriteBatch::InsertInto)
1064 assert(cfd->mem()->GetFirstSequenceNumber() == 0);
1065 assert(edit->NumEntries() == 0);
1066 continue;
1067 }
1068
1069 TEST_SYNC_POINT_CALLBACK(
1070 "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1071
1072 // flush the final memtable (if non-empty)
1073 if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1074 // If flush happened in the middle of recovery (e.g. due to memtable
1075 // being full), we flush at the end. Otherwise we'll need to record
1076 // where we were on last flush, which make the logic complicated.
1077 if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
1078 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1079 if (!status.ok()) {
1080 // Recovery failed
1081 break;
1082 }
1083 flushed = true;
1084
1085 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1086 versions_->LastSequence());
1087 }
1088 data_seen = true;
1089 }
1090
1091 // Update the log number info in the version edit corresponding to this
1092 // column family. Note that the version edits will be written to MANIFEST
1093 // together later.
1094 // writing log_number in the manifest means that any log file
1095 // with number strongly less than (log_number + 1) is already
1096 // recovered and should be ignored on next reincarnation.
1097 // Since we already recovered max_log_number, we want all logs
1098 // with numbers `<= max_log_number` (includes this one) to be ignored
1099 if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
1100 edit->SetLogNumber(max_log_number + 1);
1101 }
1102 }
1103 if (status.ok()) {
1104 // we must mark the next log number as used, even though it's
1105 // not actually used. that is because VersionSet assumes
1106 // VersionSet::next_file_number_ always to be strictly greater than any
1107 // log number
1108 versions_->MarkFileNumberUsed(max_log_number + 1);
1109
1110 autovector<ColumnFamilyData*> cfds;
1111 autovector<const MutableCFOptions*> cf_opts;
1112 autovector<autovector<VersionEdit*>> edit_lists;
1113 for (auto* cfd : *versions_->GetColumnFamilySet()) {
1114 cfds.push_back(cfd);
1115 cf_opts.push_back(cfd->GetLatestMutableCFOptions());
1116 auto iter = version_edits.find(cfd->GetID());
1117 assert(iter != version_edits.end());
1118 edit_lists.push_back({&iter->second});
1119 }
1120 // write MANIFEST with update
1121 status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
1122 directories_.GetDbDir(),
1123 /*new_descriptor_log=*/true);
1124 }
1125 }
1126
1127 if (status.ok() && data_seen && !flushed) {
1128 status = RestoreAliveLogFiles(log_numbers);
1129 }
1130
1131 event_logger_.Log() << "job" << job_id << "event"
1132 << "recovery_finished";
1133
1134 return status;
1135 }
1136
RestoreAliveLogFiles(const std::vector<uint64_t> & log_numbers)1137 Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers) {
1138 if (log_numbers.empty()) {
1139 return Status::OK();
1140 }
1141 Status s;
1142 mutex_.AssertHeld();
1143 assert(immutable_db_options_.avoid_flush_during_recovery);
1144 if (two_write_queues_) {
1145 log_write_mutex_.Lock();
1146 }
1147 // Mark these as alive so they'll be considered for deletion later by
1148 // FindObsoleteFiles()
1149 total_log_size_ = 0;
1150 log_empty_ = false;
1151 for (auto log_number : log_numbers) {
1152 LogFileNumberSize log(log_number);
1153 std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
1154 // This gets the appear size of the logs, not including preallocated space.
1155 s = env_->GetFileSize(fname, &log.size);
1156 if (!s.ok()) {
1157 break;
1158 }
1159 total_log_size_ += log.size;
1160 alive_log_files_.push_back(log);
1161 // We preallocate space for logs, but then after a crash and restart, those
1162 // preallocated space are not needed anymore. It is likely only the last
1163 // log has such preallocated space, so we only truncate for the last log.
1164 if (log_number == log_numbers.back()) {
1165 std::unique_ptr<FSWritableFile> last_log;
1166 Status truncate_status = fs_->ReopenWritableFile(
1167 fname,
1168 fs_->OptimizeForLogWrite(
1169 file_options_,
1170 BuildDBOptions(immutable_db_options_, mutable_db_options_)),
1171 &last_log, nullptr);
1172 if (truncate_status.ok()) {
1173 truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
1174 }
1175 if (truncate_status.ok()) {
1176 truncate_status = last_log->Close(IOOptions(), nullptr);
1177 }
1178 // Not a critical error if fail to truncate.
1179 if (!truncate_status.ok()) {
1180 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1181 "Failed to truncate log #%" PRIu64 ": %s", log_number,
1182 truncate_status.ToString().c_str());
1183 }
1184 }
1185 }
1186 if (two_write_queues_) {
1187 log_write_mutex_.Unlock();
1188 }
1189 return s;
1190 }
1191
WriteLevel0TableForRecovery(int job_id,ColumnFamilyData * cfd,MemTable * mem,VersionEdit * edit)1192 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1193 MemTable* mem, VersionEdit* edit) {
1194 mutex_.AssertHeld();
1195 const uint64_t start_micros = env_->NowMicros();
1196 FileMetaData meta;
1197 std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1198 new std::list<uint64_t>::iterator(
1199 CaptureCurrentFileNumberInPendingOutputs()));
1200 meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
1201 ReadOptions ro;
1202 ro.total_order_seek = true;
1203 Arena arena;
1204 Status s;
1205 TableProperties table_properties;
1206 {
1207 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
1208 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1209 "[%s] [WriteLevel0TableForRecovery]"
1210 " Level-0 table #%" PRIu64 ": started",
1211 cfd->GetName().c_str(), meta.fd.GetNumber());
1212
1213 // Get the latest mutable cf options while the mutex is still locked
1214 const MutableCFOptions mutable_cf_options =
1215 *cfd->GetLatestMutableCFOptions();
1216 bool paranoid_file_checks =
1217 cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
1218
1219 int64_t _current_time = 0;
1220 env_->GetCurrentTime(&_current_time); // ignore error
1221 const uint64_t current_time = static_cast<uint64_t>(_current_time);
1222 meta.oldest_ancester_time = current_time;
1223
1224 {
1225 auto write_hint = cfd->CalculateSSTWriteHint(0);
1226 mutex_.Unlock();
1227
1228 SequenceNumber earliest_write_conflict_snapshot;
1229 std::vector<SequenceNumber> snapshot_seqs =
1230 snapshots_.GetAll(&earliest_write_conflict_snapshot);
1231 auto snapshot_checker = snapshot_checker_.get();
1232 if (use_custom_gc_ && snapshot_checker == nullptr) {
1233 snapshot_checker = DisableGCSnapshotChecker::Instance();
1234 }
1235 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
1236 range_del_iters;
1237 auto range_del_iter =
1238 mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
1239 if (range_del_iter != nullptr) {
1240 range_del_iters.emplace_back(range_del_iter);
1241 }
1242 IOStatus io_s;
1243 s = BuildTable(
1244 dbname_, env_, fs_.get(), *cfd->ioptions(), mutable_cf_options,
1245 file_options_for_compaction_, cfd->table_cache(), iter.get(),
1246 std::move(range_del_iters), &meta, cfd->internal_comparator(),
1247 cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
1248 snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1249 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
1250 mutable_cf_options.sample_for_compression,
1251 cfd->ioptions()->compression_opts, paranoid_file_checks,
1252 cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
1253 &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
1254 -1 /* level */, current_time, write_hint);
1255 LogFlush(immutable_db_options_.info_log);
1256 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1257 "[%s] [WriteLevel0TableForRecovery]"
1258 " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1259 cfd->GetName().c_str(), meta.fd.GetNumber(),
1260 meta.fd.GetFileSize(), s.ToString().c_str());
1261 mutex_.Lock();
1262 }
1263 }
1264 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1265
1266 // Note that if file_size is zero, the file has been deleted and
1267 // should not be added to the manifest.
1268 int level = 0;
1269 if (s.ok() && meta.fd.GetFileSize() > 0) {
1270 edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
1271 meta.fd.GetFileSize(), meta.smallest, meta.largest,
1272 meta.fd.smallest_seqno, meta.fd.largest_seqno,
1273 meta.marked_for_compaction, meta.oldest_blob_file_number,
1274 meta.oldest_ancester_time, meta.file_creation_time,
1275 meta.file_checksum, meta.file_checksum_func_name);
1276 }
1277
1278 InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
1279 stats.micros = env_->NowMicros() - start_micros;
1280 stats.bytes_written = meta.fd.GetFileSize();
1281 stats.num_output_files = 1;
1282 cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
1283 cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
1284 meta.fd.GetFileSize());
1285 RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
1286 return s;
1287 }
1288
Open(const Options & options,const std::string & dbname,DB ** dbptr)1289 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1290 DBOptions db_options(options);
1291 ColumnFamilyOptions cf_options(options);
1292 std::vector<ColumnFamilyDescriptor> column_families;
1293 column_families.push_back(
1294 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
1295 if (db_options.persist_stats_to_disk) {
1296 column_families.push_back(
1297 ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
1298 }
1299 std::vector<ColumnFamilyHandle*> handles;
1300 Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
1301 if (s.ok()) {
1302 if (db_options.persist_stats_to_disk) {
1303 assert(handles.size() == 2);
1304 } else {
1305 assert(handles.size() == 1);
1306 }
1307 // i can delete the handle since DBImpl is always holding a reference to
1308 // default column family
1309 if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
1310 delete handles[1];
1311 }
1312 delete handles[0];
1313 }
1314 return s;
1315 }
1316
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr)1317 Status DB::Open(const DBOptions& db_options, const std::string& dbname,
1318 const std::vector<ColumnFamilyDescriptor>& column_families,
1319 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
1320 const bool kSeqPerBatch = true;
1321 const bool kBatchPerTxn = true;
1322 return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
1323 !kSeqPerBatch, kBatchPerTxn);
1324 }
1325
CreateWAL(uint64_t log_file_num,uint64_t recycle_log_number,size_t preallocate_block_size,log::Writer ** new_log)1326 Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
1327 size_t preallocate_block_size, log::Writer** new_log) {
1328 Status s;
1329 std::unique_ptr<FSWritableFile> lfile;
1330
1331 DBOptions db_options =
1332 BuildDBOptions(immutable_db_options_, mutable_db_options_);
1333 FileOptions opt_file_options =
1334 fs_->OptimizeForLogWrite(file_options_, db_options);
1335 std::string log_fname =
1336 LogFileName(immutable_db_options_.wal_dir, log_file_num);
1337
1338 if (recycle_log_number) {
1339 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1340 "reusing log %" PRIu64 " from recycle list\n",
1341 recycle_log_number);
1342 std::string old_log_fname =
1343 LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
1344 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
1345 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
1346 s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
1347 &lfile, /*dbg=*/nullptr);
1348 } else {
1349 s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
1350 }
1351
1352 if (s.ok()) {
1353 lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
1354 lfile->SetPreallocationBlockSize(preallocate_block_size);
1355
1356 const auto& listeners = immutable_db_options_.listeners;
1357 std::unique_ptr<WritableFileWriter> file_writer(
1358 new WritableFileWriter(std::move(lfile), log_fname, opt_file_options,
1359 env_, nullptr /* stats */, listeners));
1360 *new_log = new log::Writer(std::move(file_writer), log_file_num,
1361 immutable_db_options_.recycle_log_file_num > 0,
1362 immutable_db_options_.manual_wal_flush);
1363 }
1364 return s;
1365 }
1366
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,const bool seq_per_batch,const bool batch_per_txn)1367 Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
1368 const std::vector<ColumnFamilyDescriptor>& column_families,
1369 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1370 const bool seq_per_batch, const bool batch_per_txn) {
1371 Status s = SanitizeOptionsByTable(db_options, column_families);
1372 if (!s.ok()) {
1373 return s;
1374 }
1375
1376 s = ValidateOptions(db_options, column_families);
1377 if (!s.ok()) {
1378 return s;
1379 }
1380
1381 *dbptr = nullptr;
1382 handles->clear();
1383
1384 size_t max_write_buffer_size = 0;
1385 for (auto cf : column_families) {
1386 max_write_buffer_size =
1387 std::max(max_write_buffer_size, cf.options.write_buffer_size);
1388 }
1389
1390 DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
1391 s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
1392 if (s.ok()) {
1393 std::vector<std::string> paths;
1394 for (auto& db_path : impl->immutable_db_options_.db_paths) {
1395 paths.emplace_back(db_path.path);
1396 }
1397 for (auto& cf : column_families) {
1398 for (auto& cf_path : cf.options.cf_paths) {
1399 paths.emplace_back(cf_path.path);
1400 }
1401 }
1402 for (auto& path : paths) {
1403 s = impl->env_->CreateDirIfMissing(path);
1404 if (!s.ok()) {
1405 break;
1406 }
1407 }
1408
1409 // For recovery from NoSpace() error, we can only handle
1410 // the case where the database is stored in a single path
1411 if (paths.size() <= 1) {
1412 impl->error_handler_.EnableAutoRecovery();
1413 }
1414 }
1415 if (s.ok()) {
1416 s = impl->CreateArchivalDirectory();
1417 }
1418 if (!s.ok()) {
1419 delete impl;
1420 return s;
1421 }
1422
1423 impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
1424
1425 impl->mutex_.Lock();
1426 // Handles create_if_missing, error_if_exists
1427 uint64_t recovered_seq(kMaxSequenceNumber);
1428 s = impl->Recover(column_families, false, false, false, &recovered_seq);
1429 if (s.ok()) {
1430 uint64_t new_log_number = impl->versions_->NewFileNumber();
1431 log::Writer* new_log = nullptr;
1432 const size_t preallocate_block_size =
1433 impl->GetWalPreallocateBlockSize(max_write_buffer_size);
1434 s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
1435 preallocate_block_size, &new_log);
1436 if (s.ok()) {
1437 InstrumentedMutexLock wl(&impl->log_write_mutex_);
1438 impl->logfile_number_ = new_log_number;
1439 assert(new_log != nullptr);
1440 impl->logs_.emplace_back(new_log_number, new_log);
1441 }
1442
1443 if (s.ok()) {
1444 // set column family handles
1445 for (auto cf : column_families) {
1446 auto cfd =
1447 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
1448 if (cfd != nullptr) {
1449 handles->push_back(
1450 new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
1451 impl->NewThreadStatusCfInfo(cfd);
1452 } else {
1453 if (db_options.create_missing_column_families) {
1454 // missing column family, create it
1455 ColumnFamilyHandle* handle;
1456 impl->mutex_.Unlock();
1457 s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
1458 impl->mutex_.Lock();
1459 if (s.ok()) {
1460 handles->push_back(handle);
1461 } else {
1462 break;
1463 }
1464 } else {
1465 s = Status::InvalidArgument("Column family not found: ", cf.name);
1466 break;
1467 }
1468 }
1469 }
1470 }
1471 if (s.ok()) {
1472 SuperVersionContext sv_context(/* create_superversion */ true);
1473 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1474 impl->InstallSuperVersionAndScheduleWork(
1475 cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
1476 }
1477 sv_context.Clean();
1478 if (impl->two_write_queues_) {
1479 impl->log_write_mutex_.Lock();
1480 }
1481 impl->alive_log_files_.push_back(
1482 DBImpl::LogFileNumberSize(impl->logfile_number_));
1483 if (impl->two_write_queues_) {
1484 impl->log_write_mutex_.Unlock();
1485 }
1486
1487 impl->DeleteObsoleteFiles();
1488 s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
1489 }
1490 if (s.ok()) {
1491 // In WritePrepared there could be gap in sequence numbers. This breaks
1492 // the trick we use in kPointInTimeRecovery which assumes the first seq in
1493 // the log right after the corrupted log is one larger than the last seq
1494 // we read from the logs. To let this trick keep working, we add a dummy
1495 // entry with the expected sequence to the first log right after recovery.
1496 // In non-WritePrepared case also the new log after recovery could be
1497 // empty, and thus missing the consecutive seq hint to distinguish
1498 // middle-log corruption to corrupted-log-remained-after-recovery. This
1499 // case also will be addressed by a dummy write.
1500 if (recovered_seq != kMaxSequenceNumber) {
1501 WriteBatch empty_batch;
1502 WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
1503 WriteOptions write_options;
1504 uint64_t log_used, log_size;
1505 log::Writer* log_writer = impl->logs_.back().writer;
1506 s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
1507 if (s.ok()) {
1508 // Need to fsync, otherwise it might get lost after a power reset.
1509 s = impl->FlushWAL(false);
1510 if (s.ok()) {
1511 s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
1512 }
1513 }
1514 }
1515 }
1516 }
1517 if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
1518 // try to read format version but no need to fail Open() even if it fails
1519 s = impl->PersistentStatsProcessFormatVersion();
1520 }
1521
1522 if (s.ok()) {
1523 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1524 if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1525 auto* vstorage = cfd->current()->storage_info();
1526 for (int i = 1; i < vstorage->num_levels(); ++i) {
1527 int num_files = vstorage->NumLevelFiles(i);
1528 if (num_files > 0) {
1529 s = Status::InvalidArgument(
1530 "Not all files are at level 0. Cannot "
1531 "open with FIFO compaction style.");
1532 break;
1533 }
1534 }
1535 }
1536 if (!cfd->mem()->IsSnapshotSupported()) {
1537 impl->is_snapshot_supported_ = false;
1538 }
1539 if (cfd->ioptions()->merge_operator != nullptr &&
1540 !cfd->mem()->IsMergeOperatorSupported()) {
1541 s = Status::InvalidArgument(
1542 "The memtable of column family %s does not support merge operator "
1543 "its options.merge_operator is non-null",
1544 cfd->GetName().c_str());
1545 }
1546 if (!s.ok()) {
1547 break;
1548 }
1549 }
1550 }
1551 TEST_SYNC_POINT("DBImpl::Open:Opened");
1552 Status persist_options_status;
1553 if (s.ok()) {
1554 // Persist RocksDB Options before scheduling the compaction.
1555 // The WriteOptionsFile() will release and lock the mutex internally.
1556 persist_options_status = impl->WriteOptionsFile(
1557 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1558
1559 *dbptr = impl;
1560 impl->opened_successfully_ = true;
1561 impl->MaybeScheduleFlushOrCompaction();
1562 }
1563 impl->mutex_.Unlock();
1564
1565 #ifndef ROCKSDB_LITE
1566 auto sfm = static_cast<SstFileManagerImpl*>(
1567 impl->immutable_db_options_.sst_file_manager.get());
1568 if (s.ok() && sfm) {
1569 // Notify SstFileManager about all sst files that already exist in
1570 // db_paths[0] and cf_paths[0] when the DB is opened.
1571
1572 // SstFileManagerImpl needs to know sizes of the files. For files whose size
1573 // we already know (sst files that appear in manifest - typically that's the
1574 // vast majority of all files), we'll pass the size to SstFileManager.
1575 // For all other files SstFileManager will query the size from filesystem.
1576
1577 std::vector<LiveFileMetaData> metadata;
1578
1579 impl->mutex_.Lock();
1580 impl->versions_->GetLiveFilesMetaData(&metadata);
1581 impl->mutex_.Unlock();
1582
1583 std::unordered_map<std::string, uint64_t> known_file_sizes;
1584 for (const auto& md : metadata) {
1585 std::string name = md.name;
1586 if (!name.empty() && name[0] == '/') {
1587 name = name.substr(1);
1588 }
1589 known_file_sizes[name] = md.size;
1590 }
1591
1592 std::vector<std::string> paths;
1593 paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
1594 for (auto& cf : column_families) {
1595 if (!cf.options.cf_paths.empty()) {
1596 paths.emplace_back(cf.options.cf_paths[0].path);
1597 }
1598 }
1599 // Remove duplicate paths.
1600 std::sort(paths.begin(), paths.end());
1601 paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
1602 for (auto& path : paths) {
1603 std::vector<std::string> existing_files;
1604 impl->immutable_db_options_.env->GetChildren(path, &existing_files);
1605 for (auto& file_name : existing_files) {
1606 uint64_t file_number;
1607 FileType file_type;
1608 std::string file_path = path + "/" + file_name;
1609 if (ParseFileName(file_name, &file_number, &file_type) &&
1610 file_type == kTableFile) {
1611 if (known_file_sizes.count(file_name)) {
1612 // We're assuming that each sst file name exists in at most one of
1613 // the paths.
1614 sfm->OnAddFile(file_path, known_file_sizes.at(file_name),
1615 /* compaction */ false);
1616 } else {
1617 sfm->OnAddFile(file_path);
1618 }
1619 }
1620 }
1621 }
1622
1623 // Reserve some disk buffer space. This is a heuristic - when we run out
1624 // of disk space, this ensures that there is atleast write_buffer_size
1625 // amount of free space before we resume DB writes. In low disk space
1626 // conditions, we want to avoid a lot of small L0 files due to frequent
1627 // WAL write failures and resultant forced flushes
1628 sfm->ReserveDiskBuffer(max_write_buffer_size,
1629 impl->immutable_db_options_.db_paths[0].path);
1630 }
1631 #endif // !ROCKSDB_LITE
1632
1633 if (s.ok()) {
1634 ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
1635 impl);
1636 LogFlush(impl->immutable_db_options_.info_log);
1637 assert(impl->TEST_WALBufferIsEmpty());
1638 // If the assert above fails then we need to FlushWAL before returning
1639 // control back to the user.
1640 if (!persist_options_status.ok()) {
1641 s = Status::IOError(
1642 "DB::Open() failed --- Unable to persist Options file",
1643 persist_options_status.ToString());
1644 }
1645 }
1646 if (s.ok()) {
1647 impl->StartTimedTasks();
1648 }
1649 if (!s.ok()) {
1650 for (auto* h : *handles) {
1651 delete h;
1652 }
1653 handles->clear();
1654 delete impl;
1655 *dbptr = nullptr;
1656 }
1657 return s;
1658 }
1659 } // namespace ROCKSDB_NAMESPACE
1660