1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifndef ROCKSDB_LITE
11
12 #include <stdlib.h>
13 #include <algorithm>
14 #include <atomic>
15 #include <cinttypes>
16 #include <functional>
17 #include <future>
18 #include <limits>
19 #include <map>
20 #include <mutex>
21 #include <sstream>
22 #include <string>
23 #include <thread>
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <vector>
27
28 #include "env/composite_env_wrapper.h"
29 #include "file/filename.h"
30 #include "file/sequence_file_reader.h"
31 #include "file/writable_file_writer.h"
32 #include "logging/logging.h"
33 #include "port/port.h"
34 #include "rocksdb/rate_limiter.h"
35 #include "rocksdb/transaction_log.h"
36 #include "rocksdb/utilities/backupable_db.h"
37 #include "test_util/sync_point.h"
38 #include "util/channel.h"
39 #include "util/coding.h"
40 #include "util/crc32c.h"
41 #include "util/string_util.h"
42 #include "utilities/checkpoint/checkpoint_impl.h"
43
44 namespace ROCKSDB_NAMESPACE {
45
IncrementNumberSuccessBackup()46 void BackupStatistics::IncrementNumberSuccessBackup() {
47 number_success_backup++;
48 }
IncrementNumberFailBackup()49 void BackupStatistics::IncrementNumberFailBackup() {
50 number_fail_backup++;
51 }
52
GetNumberSuccessBackup() const53 uint32_t BackupStatistics::GetNumberSuccessBackup() const {
54 return number_success_backup;
55 }
GetNumberFailBackup() const56 uint32_t BackupStatistics::GetNumberFailBackup() const {
57 return number_fail_backup;
58 }
59
ToString() const60 std::string BackupStatistics::ToString() const {
61 char result[50];
62 snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
63 GetNumberSuccessBackup(), GetNumberFailBackup());
64 return result;
65 }
66
Dump(Logger * logger) const67 void BackupableDBOptions::Dump(Logger* logger) const {
68 ROCKS_LOG_INFO(logger, " Options.backup_dir: %s",
69 backup_dir.c_str());
70 ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env);
71 ROCKS_LOG_INFO(logger, " Options.share_table_files: %d",
72 static_cast<int>(share_table_files));
73 ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log);
74 ROCKS_LOG_INFO(logger, " Options.sync: %d",
75 static_cast<int>(sync));
76 ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d",
77 static_cast<int>(destroy_old_data));
78 ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d",
79 static_cast<int>(backup_log_files));
80 ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64,
81 backup_rate_limit);
82 ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64,
83 restore_rate_limit);
84 ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
85 max_background_operations);
86 }
87
88 // -------- BackupEngineImpl class ---------
89 class BackupEngineImpl : public BackupEngine {
90 public:
91 BackupEngineImpl(const BackupableDBOptions& options, Env* db_env,
92 bool read_only = false);
93 ~BackupEngineImpl() override;
94
95 using BackupEngine::CreateNewBackupWithMetadata;
96 Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
97 const std::string& app_metadata) override;
98
99 Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
100
101 Status DeleteBackup(BackupID backup_id) override;
102
StopBackup()103 void StopBackup() override {
104 stop_backup_.store(true, std::memory_order_release);
105 }
106
107 Status GarbageCollect() override;
108
109 // The returned BackupInfos are in chronological order, which means the
110 // latest backup comes last.
111 void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
112
113 void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
114
115 using BackupEngine::RestoreDBFromBackup;
116 Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
117 const std::string& db_dir,
118 const std::string& wal_dir) override;
119
120 using BackupEngine::RestoreDBFromLatestBackup;
RestoreDBFromLatestBackup(const RestoreOptions & options,const std::string & db_dir,const std::string & wal_dir)121 Status RestoreDBFromLatestBackup(const RestoreOptions& options,
122 const std::string& db_dir,
123 const std::string& wal_dir) override {
124 return RestoreDBFromBackup(options, latest_valid_backup_id_, db_dir,
125 wal_dir);
126 }
127
128 Status VerifyBackup(BackupID backup_id) override;
129
130 Status Initialize();
131
132 private:
133 void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
134 Status DeleteBackupInternal(BackupID backup_id);
135
136 // Extends the "result" map with pathname->size mappings for the contents of
137 // "dir" in "env". Pathnames are prefixed with "dir".
138 Status InsertPathnameToSizeBytes(
139 const std::string& dir, Env* env,
140 std::unordered_map<std::string, uint64_t>* result);
141
142 struct FileInfo {
FileInfoROCKSDB_NAMESPACE::BackupEngineImpl::FileInfo143 FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
144 : refs(0), filename(fname), size(sz), checksum_value(checksum) {}
145
146 FileInfo(const FileInfo&) = delete;
147 FileInfo& operator=(const FileInfo&) = delete;
148
149 int refs;
150 const std::string filename;
151 const uint64_t size;
152 const uint32_t checksum_value;
153 };
154
155 class BackupMeta {
156 public:
BackupMeta(const std::string & meta_filename,const std::string & meta_tmp_filename,std::unordered_map<std::string,std::shared_ptr<FileInfo>> * file_infos,Env * env)157 BackupMeta(
158 const std::string& meta_filename, const std::string& meta_tmp_filename,
159 std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
160 Env* env)
161 : timestamp_(0),
162 sequence_number_(0),
163 size_(0),
164 meta_filename_(meta_filename),
165 meta_tmp_filename_(meta_tmp_filename),
166 file_infos_(file_infos),
167 env_(env) {}
168
169 BackupMeta(const BackupMeta&) = delete;
170 BackupMeta& operator=(const BackupMeta&) = delete;
171
~BackupMeta()172 ~BackupMeta() {}
173
RecordTimestamp()174 void RecordTimestamp() {
175 env_->GetCurrentTime(×tamp_);
176 }
GetTimestamp() const177 int64_t GetTimestamp() const {
178 return timestamp_;
179 }
GetSize() const180 uint64_t GetSize() const {
181 return size_;
182 }
GetNumberFiles()183 uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
SetSequenceNumber(uint64_t sequence_number)184 void SetSequenceNumber(uint64_t sequence_number) {
185 sequence_number_ = sequence_number;
186 }
GetSequenceNumber()187 uint64_t GetSequenceNumber() {
188 return sequence_number_;
189 }
190
GetAppMetadata() const191 const std::string& GetAppMetadata() const { return app_metadata_; }
192
SetAppMetadata(const std::string & app_metadata)193 void SetAppMetadata(const std::string& app_metadata) {
194 app_metadata_ = app_metadata;
195 }
196
197 Status AddFile(std::shared_ptr<FileInfo> file_info);
198
199 Status Delete(bool delete_meta = true);
200
Empty()201 bool Empty() {
202 return files_.empty();
203 }
204
GetFile(const std::string & filename) const205 std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
206 auto it = file_infos_->find(filename);
207 if (it == file_infos_->end())
208 return nullptr;
209 return it->second;
210 }
211
GetFiles()212 const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
213 return files_;
214 }
215
216 // @param abs_path_to_size Pre-fetched file sizes (bytes).
217 Status LoadFromFile(
218 const std::string& backup_dir,
219 const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
220 Status StoreToFile(bool sync);
221
GetInfoString()222 std::string GetInfoString() {
223 std::ostringstream ss;
224 ss << "Timestamp: " << timestamp_ << std::endl;
225 char human_size[16];
226 AppendHumanBytes(size_, human_size, sizeof(human_size));
227 ss << "Size: " << human_size << std::endl;
228 ss << "Files:" << std::endl;
229 for (const auto& file : files_) {
230 AppendHumanBytes(file->size, human_size, sizeof(human_size));
231 ss << file->filename << ", size " << human_size << ", refs "
232 << file->refs << std::endl;
233 }
234 return ss.str();
235 }
236
237 private:
238 int64_t timestamp_;
239 // sequence number is only approximate, should not be used
240 // by clients
241 uint64_t sequence_number_;
242 uint64_t size_;
243 std::string app_metadata_;
244 std::string const meta_filename_;
245 std::string const meta_tmp_filename_;
246 // files with relative paths (without "/" prefix!!)
247 std::vector<std::shared_ptr<FileInfo>> files_;
248 std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
249 Env* env_;
250
251 static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
252 }; // BackupMeta
253
GetAbsolutePath(const std::string & relative_path="") const254 inline std::string GetAbsolutePath(
255 const std::string &relative_path = "") const {
256 assert(relative_path.size() == 0 || relative_path[0] != '/');
257 return options_.backup_dir + "/" + relative_path;
258 }
GetPrivateDirRel() const259 inline std::string GetPrivateDirRel() const {
260 return "private";
261 }
GetSharedChecksumDirRel() const262 inline std::string GetSharedChecksumDirRel() const {
263 return "shared_checksum";
264 }
GetPrivateFileRel(BackupID backup_id,bool tmp=false,const std::string & file="") const265 inline std::string GetPrivateFileRel(BackupID backup_id,
266 bool tmp = false,
267 const std::string& file = "") const {
268 assert(file.size() == 0 || file[0] != '/');
269 return GetPrivateDirRel() + "/" + ROCKSDB_NAMESPACE::ToString(backup_id) +
270 (tmp ? ".tmp" : "") + "/" + file;
271 }
GetSharedFileRel(const std::string & file="",bool tmp=false) const272 inline std::string GetSharedFileRel(const std::string& file = "",
273 bool tmp = false) const {
274 assert(file.size() == 0 || file[0] != '/');
275 return std::string("shared/") + (tmp ? "." : "") + file +
276 (tmp ? ".tmp" : "");
277 }
GetSharedFileWithChecksumRel(const std::string & file="",bool tmp=false) const278 inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
279 bool tmp = false) const {
280 assert(file.size() == 0 || file[0] != '/');
281 return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file +
282 (tmp ? ".tmp" : "");
283 }
GetSharedFileWithChecksum(const std::string & file,const uint32_t checksum_value,const uint64_t file_size) const284 inline std::string GetSharedFileWithChecksum(const std::string& file,
285 const uint32_t checksum_value,
286 const uint64_t file_size) const {
287 assert(file.size() == 0 || file[0] != '/');
288 std::string file_copy = file;
289 return file_copy.insert(file_copy.find_last_of('.'),
290 "_" + ROCKSDB_NAMESPACE::ToString(checksum_value) +
291 "_" + ROCKSDB_NAMESPACE::ToString(file_size));
292 }
GetFileFromChecksumFile(const std::string & file) const293 inline std::string GetFileFromChecksumFile(const std::string& file) const {
294 assert(file.size() == 0 || file[0] != '/');
295 std::string file_copy = file;
296 size_t first_underscore = file_copy.find_first_of('_');
297 return file_copy.erase(first_underscore,
298 file_copy.find_last_of('.') - first_underscore);
299 }
GetBackupMetaDir() const300 inline std::string GetBackupMetaDir() const {
301 return GetAbsolutePath("meta");
302 }
GetBackupMetaFile(BackupID backup_id,bool tmp) const303 inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
304 return GetBackupMetaDir() + "/" + (tmp ? "." : "") +
305 ROCKSDB_NAMESPACE::ToString(backup_id) + (tmp ? ".tmp" : "");
306 }
307
308 // If size_limit == 0, there is no size limit, copy everything.
309 //
310 // Exactly one of src and contents must be non-empty.
311 //
312 // @param src If non-empty, the file is copied from this pathname.
313 // @param contents If non-empty, the file will be created with these contents.
314 Status CopyOrCreateFile(const std::string& src, const std::string& dst,
315 const std::string& contents, Env* src_env,
316 Env* dst_env, const EnvOptions& src_env_options,
317 bool sync, RateLimiter* rate_limiter,
318 uint64_t* size = nullptr,
319 uint32_t* checksum_value = nullptr,
320 uint64_t size_limit = 0,
__anon2d133cc30102() 321 std::function<void()> progress_callback = []() {});
322
323 Status CalculateChecksum(const std::string& src, Env* src_env,
324 const EnvOptions& src_env_options,
325 uint64_t size_limit, uint32_t* checksum_value);
326
327 struct CopyOrCreateResult {
328 uint64_t size;
329 uint32_t checksum_value;
330 Status status;
331 };
332
333 // Exactly one of src_path and contents must be non-empty. If src_path is
334 // non-empty, the file is copied from this pathname. Otherwise, if contents is
335 // non-empty, the file will be created at dst_path with these contents.
336 struct CopyOrCreateWorkItem {
337 std::string src_path;
338 std::string dst_path;
339 std::string contents;
340 Env* src_env;
341 Env* dst_env;
342 EnvOptions src_env_options;
343 bool sync;
344 RateLimiter* rate_limiter;
345 uint64_t size_limit;
346 std::promise<CopyOrCreateResult> result;
347 std::function<void()> progress_callback;
348
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem349 CopyOrCreateWorkItem()
350 : src_path(""),
351 dst_path(""),
352 contents(""),
353 src_env(nullptr),
354 dst_env(nullptr),
355 src_env_options(),
356 sync(false),
357 rate_limiter(nullptr),
358 size_limit(0) {}
359
360 CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
361 CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
362
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem363 CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
364 *this = std::move(o);
365 }
366
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem367 CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
368 src_path = std::move(o.src_path);
369 dst_path = std::move(o.dst_path);
370 contents = std::move(o.contents);
371 src_env = o.src_env;
372 dst_env = o.dst_env;
373 src_env_options = std::move(o.src_env_options);
374 sync = o.sync;
375 rate_limiter = o.rate_limiter;
376 size_limit = o.size_limit;
377 result = std::move(o.result);
378 progress_callback = std::move(o.progress_callback);
379 return *this;
380 }
381
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem382 CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
383 std::string _contents, Env* _src_env, Env* _dst_env,
384 EnvOptions _src_env_options, bool _sync,
385 RateLimiter* _rate_limiter, uint64_t _size_limit,
386 std::function<void()> _progress_callback = []() {})
387 : src_path(std::move(_src_path)),
388 dst_path(std::move(_dst_path)),
389 contents(std::move(_contents)),
390 src_env(_src_env),
391 dst_env(_dst_env),
392 src_env_options(std::move(_src_env_options)),
393 sync(_sync),
394 rate_limiter(_rate_limiter),
395 size_limit(_size_limit),
396 progress_callback(_progress_callback) {}
397 };
398
399 struct BackupAfterCopyOrCreateWorkItem {
400 std::future<CopyOrCreateResult> result;
401 bool shared;
402 bool needed_to_copy;
403 Env* backup_env;
404 std::string dst_path_tmp;
405 std::string dst_path;
406 std::string dst_relative;
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem407 BackupAfterCopyOrCreateWorkItem()
408 : shared(false),
409 needed_to_copy(false),
410 backup_env(nullptr),
411 dst_path_tmp(""),
412 dst_path(""),
413 dst_relative("") {}
414
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem415 BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
416 ROCKSDB_NOEXCEPT {
417 *this = std::move(o);
418 }
419
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem420 BackupAfterCopyOrCreateWorkItem& operator=(
421 BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
422 result = std::move(o.result);
423 shared = o.shared;
424 needed_to_copy = o.needed_to_copy;
425 backup_env = o.backup_env;
426 dst_path_tmp = std::move(o.dst_path_tmp);
427 dst_path = std::move(o.dst_path);
428 dst_relative = std::move(o.dst_relative);
429 return *this;
430 }
431
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem432 BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
433 bool _shared, bool _needed_to_copy,
434 Env* _backup_env, std::string _dst_path_tmp,
435 std::string _dst_path,
436 std::string _dst_relative)
437 : result(std::move(_result)),
438 shared(_shared),
439 needed_to_copy(_needed_to_copy),
440 backup_env(_backup_env),
441 dst_path_tmp(std::move(_dst_path_tmp)),
442 dst_path(std::move(_dst_path)),
443 dst_relative(std::move(_dst_relative)) {}
444 };
445
446 struct RestoreAfterCopyOrCreateWorkItem {
447 std::future<CopyOrCreateResult> result;
448 uint32_t checksum_value;
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem449 RestoreAfterCopyOrCreateWorkItem()
450 : checksum_value(0) {}
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem451 RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
452 uint32_t _checksum_value)
453 : result(std::move(_result)), checksum_value(_checksum_value) {}
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem454 RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
455 ROCKSDB_NOEXCEPT {
456 *this = std::move(o);
457 }
458
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem459 RestoreAfterCopyOrCreateWorkItem& operator=(
460 RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
461 result = std::move(o.result);
462 checksum_value = o.checksum_value;
463 return *this;
464 }
465 };
466
467 bool initialized_;
468 std::mutex byte_report_mutex_;
469 channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
470 std::vector<port::Thread> threads_;
471 std::atomic<CpuPriority> threads_cpu_priority_;
472 // Certain operations like PurgeOldBackups and DeleteBackup will trigger
473 // automatic GarbageCollect (true) unless we've already done one in this
474 // session and have not failed to delete backup files since then (false).
475 bool might_need_garbage_collect_ = true;
476
477 // Adds a file to the backup work queue to be copied or created if it doesn't
478 // already exist.
479 //
480 // Exactly one of src_dir and contents must be non-empty.
481 //
482 // @param src_dir If non-empty, the file in this directory named fname will be
483 // copied.
484 // @param fname Name of destination file and, in case of copy, source file.
485 // @param contents If non-empty, the file will be created with these contents.
486 Status AddBackupFileWorkItem(
487 std::unordered_set<std::string>& live_dst_paths,
488 std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
489 BackupID backup_id, bool shared, const std::string& src_dir,
490 const std::string& fname, // starts with "/"
491 const EnvOptions& src_env_options, RateLimiter* rate_limiter,
492 uint64_t size_bytes, uint64_t size_limit = 0,
493 bool shared_checksum = false,
__anon2d133cc30302() 494 std::function<void()> progress_callback = []() {},
495 const std::string& contents = std::string());
496
497 // backup state data
498 BackupID latest_backup_id_;
499 BackupID latest_valid_backup_id_;
500 std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
501 std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
502 corrupt_backups_;
503 std::unordered_map<std::string,
504 std::shared_ptr<FileInfo>> backuped_file_infos_;
505 std::atomic<bool> stop_backup_;
506
507 // options data
508 BackupableDBOptions options_;
509 Env* db_env_;
510 Env* backup_env_;
511
512 // directories
513 std::unique_ptr<Directory> backup_directory_;
514 std::unique_ptr<Directory> shared_directory_;
515 std::unique_ptr<Directory> meta_directory_;
516 std::unique_ptr<Directory> private_directory_;
517
518 static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
519 size_t copy_file_buffer_size_;
520 bool read_only_;
521 BackupStatistics backup_statistics_;
522 static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB
523 };
524
Open(const BackupableDBOptions & options,Env * env,BackupEngine ** backup_engine_ptr)525 Status BackupEngine::Open(const BackupableDBOptions& options, Env* env,
526 BackupEngine** backup_engine_ptr) {
527 std::unique_ptr<BackupEngineImpl> backup_engine(
528 new BackupEngineImpl(options, env));
529 auto s = backup_engine->Initialize();
530 if (!s.ok()) {
531 *backup_engine_ptr = nullptr;
532 return s;
533 }
534 *backup_engine_ptr = backup_engine.release();
535 return Status::OK();
536 }
537
BackupEngineImpl(const BackupableDBOptions & options,Env * db_env,bool read_only)538 BackupEngineImpl::BackupEngineImpl(const BackupableDBOptions& options,
539 Env* db_env, bool read_only)
540 : initialized_(false),
541 latest_backup_id_(0),
542 latest_valid_backup_id_(0),
543 stop_backup_(false),
544 options_(options),
545 db_env_(db_env),
546 backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
547 copy_file_buffer_size_(kDefaultCopyFileBufferSize),
548 read_only_(read_only) {
549 if (options_.backup_rate_limiter == nullptr &&
550 options_.backup_rate_limit > 0) {
551 options_.backup_rate_limiter.reset(
552 NewGenericRateLimiter(options_.backup_rate_limit));
553 }
554 if (options_.restore_rate_limiter == nullptr &&
555 options_.restore_rate_limit > 0) {
556 options_.restore_rate_limiter.reset(
557 NewGenericRateLimiter(options_.restore_rate_limit));
558 }
559 }
560
~BackupEngineImpl()561 BackupEngineImpl::~BackupEngineImpl() {
562 files_to_copy_or_create_.sendEof();
563 for (auto& t : threads_) {
564 t.join();
565 }
566 LogFlush(options_.info_log);
567 }
568
Initialize()569 Status BackupEngineImpl::Initialize() {
570 assert(!initialized_);
571 initialized_ = true;
572 if (read_only_) {
573 ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
574 }
575 options_.Dump(options_.info_log);
576
577 if (!read_only_) {
578 // we might need to clean up from previous crash or I/O errors
579 might_need_garbage_collect_ = true;
580
581 if (options_.max_valid_backups_to_open != port::kMaxInt32) {
582 options_.max_valid_backups_to_open = port::kMaxInt32;
583 ROCKS_LOG_WARN(
584 options_.info_log,
585 "`max_valid_backups_to_open` is not set to the default value. Ignoring "
586 "its value since BackupEngine is not read-only.");
587 }
588
589 // gather the list of directories that we need to create
590 std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
591 directories;
592 directories.emplace_back(GetAbsolutePath(), &backup_directory_);
593 if (options_.share_table_files) {
594 if (options_.share_files_with_checksum) {
595 directories.emplace_back(
596 GetAbsolutePath(GetSharedFileWithChecksumRel()),
597 &shared_directory_);
598 } else {
599 directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
600 &shared_directory_);
601 }
602 }
603 directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
604 &private_directory_);
605 directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
606 // create all the dirs we need
607 for (const auto& d : directories) {
608 auto s = backup_env_->CreateDirIfMissing(d.first);
609 if (s.ok()) {
610 s = backup_env_->NewDirectory(d.first, d.second);
611 }
612 if (!s.ok()) {
613 return s;
614 }
615 }
616 }
617
618 std::vector<std::string> backup_meta_files;
619 {
620 auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
621 if (s.IsNotFound()) {
622 return Status::NotFound(GetBackupMetaDir() + " is missing");
623 } else if (!s.ok()) {
624 return s;
625 }
626 }
627 // create backups_ structure
628 for (auto& file : backup_meta_files) {
629 if (file == "." || file == "..") {
630 continue;
631 }
632 ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
633 BackupID backup_id = 0;
634 sscanf(file.c_str(), "%u", &backup_id);
635 if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
636 if (!read_only_) {
637 // invalid file name, delete that
638 auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
639 ROCKS_LOG_INFO(options_.info_log,
640 "Unrecognized meta file %s, deleting -- %s",
641 file.c_str(), s.ToString().c_str());
642 }
643 continue;
644 }
645 assert(backups_.find(backup_id) == backups_.end());
646 backups_.insert(std::make_pair(
647 backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
648 GetBackupMetaFile(backup_id, false /* tmp */),
649 GetBackupMetaFile(backup_id, true /* tmp */),
650 &backuped_file_infos_, backup_env_))));
651 }
652
653 latest_backup_id_ = 0;
654 latest_valid_backup_id_ = 0;
655 if (options_.destroy_old_data) { // Destroy old data
656 assert(!read_only_);
657 ROCKS_LOG_INFO(
658 options_.info_log,
659 "Backup Engine started with destroy_old_data == true, deleting all "
660 "backups");
661 auto s = PurgeOldBackups(0);
662 if (s.ok()) {
663 s = GarbageCollect();
664 }
665 if (!s.ok()) {
666 return s;
667 }
668 } else { // Load data from storage
669 std::unordered_map<std::string, uint64_t> abs_path_to_size;
670 for (const auto& rel_dir :
671 {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
672 const auto abs_dir = GetAbsolutePath(rel_dir);
673 InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
674 }
675 // load the backups if any, until valid_backups_to_open of the latest
676 // non-corrupted backups have been successfully opened.
677 int valid_backups_to_open = options_.max_valid_backups_to_open;
678 for (auto backup_iter = backups_.rbegin();
679 backup_iter != backups_.rend();
680 ++backup_iter) {
681 assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
682 if (latest_backup_id_ == 0) {
683 latest_backup_id_ = backup_iter->first;
684 }
685 if (valid_backups_to_open == 0) {
686 break;
687 }
688
689 InsertPathnameToSizeBytes(
690 GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
691 &abs_path_to_size);
692 Status s = backup_iter->second->LoadFromFile(options_.backup_dir,
693 abs_path_to_size);
694 if (s.IsCorruption()) {
695 ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
696 backup_iter->first, s.ToString().c_str());
697 corrupt_backups_.insert(
698 std::make_pair(backup_iter->first,
699 std::make_pair(s, std::move(backup_iter->second))));
700 } else if (!s.ok()) {
701 // Distinguish corruption errors from errors in the backup Env.
702 // Errors in the backup Env (i.e., this code path) will cause Open() to
703 // fail, whereas corruption errors would not cause Open() failures.
704 return s;
705 } else {
706 ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
707 backup_iter->first,
708 backup_iter->second->GetInfoString().c_str());
709 assert(latest_valid_backup_id_ == 0 ||
710 latest_valid_backup_id_ > backup_iter->first);
711 if (latest_valid_backup_id_ == 0) {
712 latest_valid_backup_id_ = backup_iter->first;
713 }
714 --valid_backups_to_open;
715 }
716 }
717
718 for (const auto& corrupt : corrupt_backups_) {
719 backups_.erase(backups_.find(corrupt.first));
720 }
721 // erase the backups before max_valid_backups_to_open
722 int num_unopened_backups;
723 if (options_.max_valid_backups_to_open == 0) {
724 num_unopened_backups = 0;
725 } else {
726 num_unopened_backups =
727 std::max(0, static_cast<int>(backups_.size()) -
728 options_.max_valid_backups_to_open);
729 }
730 for (int i = 0; i < num_unopened_backups; ++i) {
731 assert(backups_.begin()->second->Empty());
732 backups_.erase(backups_.begin());
733 }
734 }
735
736 ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
737 ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
738 latest_valid_backup_id_);
739
740 // set up threads perform copies from files_to_copy_or_create_ in the
741 // background
742 threads_cpu_priority_ = CpuPriority::kNormal;
743 threads_.reserve(options_.max_background_operations);
744 for (int t = 0; t < options_.max_background_operations; t++) {
745 threads_.emplace_back([this]() {
746 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
747 #if __GLIBC_PREREQ(2, 12)
748 pthread_setname_np(pthread_self(), "backup_engine");
749 #endif
750 #endif
751 CpuPriority current_priority = CpuPriority::kNormal;
752 CopyOrCreateWorkItem work_item;
753 while (files_to_copy_or_create_.read(work_item)) {
754 CpuPriority priority = threads_cpu_priority_;
755 if (current_priority != priority) {
756 TEST_SYNC_POINT_CALLBACK(
757 "BackupEngineImpl::Initialize:SetCpuPriority", &priority);
758 port::SetCpuPriority(0, priority);
759 current_priority = priority;
760 }
761 CopyOrCreateResult result;
762 result.status = CopyOrCreateFile(
763 work_item.src_path, work_item.dst_path, work_item.contents,
764 work_item.src_env, work_item.dst_env, work_item.src_env_options,
765 work_item.sync, work_item.rate_limiter, &result.size,
766 &result.checksum_value, work_item.size_limit,
767 work_item.progress_callback);
768 work_item.result.set_value(std::move(result));
769 }
770 });
771 }
772 ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
773
774 return Status::OK();
775 }
776
CreateNewBackupWithMetadata(const CreateBackupOptions & options,DB * db,const std::string & app_metadata)777 Status BackupEngineImpl::CreateNewBackupWithMetadata(
778 const CreateBackupOptions& options, DB* db,
779 const std::string& app_metadata) {
780 assert(initialized_);
781 assert(!read_only_);
782 if (app_metadata.size() > kMaxAppMetaSize) {
783 return Status::InvalidArgument("App metadata too large");
784 }
785
786 if (options.decrease_background_thread_cpu_priority) {
787 if (options.background_thread_cpu_priority < threads_cpu_priority_) {
788 threads_cpu_priority_.store(options.background_thread_cpu_priority);
789 }
790 }
791
792 BackupID new_backup_id = latest_backup_id_ + 1;
793
794 assert(backups_.find(new_backup_id) == backups_.end());
795
796 auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
797 Status s = backup_env_->FileExists(private_dir);
798 if (s.ok()) {
799 // maybe last backup failed and left partial state behind, clean it up.
800 // need to do this before updating backups_ such that a private dir
801 // named after new_backup_id will be cleaned up.
802 // (If an incomplete new backup is followed by an incomplete delete
803 // of the latest full backup, then there could be more than one next
804 // id with a private dir, the last thing to be deleted in delete
805 // backup, but all will be cleaned up with a GarbageCollect.)
806 s = GarbageCollect();
807 } else if (s.IsNotFound()) {
808 // normal case, the new backup's private dir doesn't exist yet
809 s = Status::OK();
810 }
811
812 auto ret = backups_.insert(std::make_pair(
813 new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
814 GetBackupMetaFile(new_backup_id, false /* tmp */),
815 GetBackupMetaFile(new_backup_id, true /* tmp */),
816 &backuped_file_infos_, backup_env_))));
817 assert(ret.second == true);
818 auto& new_backup = ret.first->second;
819 new_backup->RecordTimestamp();
820 new_backup->SetAppMetadata(app_metadata);
821
822 auto start_backup = backup_env_->NowMicros();
823
824 ROCKS_LOG_INFO(options_.info_log,
825 "Started the backup process -- creating backup %u",
826 new_backup_id);
827 if (s.ok()) {
828 s = backup_env_->CreateDir(private_dir);
829 }
830
831 RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
832 if (rate_limiter) {
833 copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
834 }
835
836 // A set into which we will insert the dst_paths that are calculated for live
837 // files and live WAL files.
838 // This is used to check whether a live files shares a dst_path with another
839 // live file.
840 std::unordered_set<std::string> live_dst_paths;
841
842 std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
843 // Add a CopyOrCreateWorkItem to the channel for each live file
844 db->DisableFileDeletions();
845 if (s.ok()) {
846 CheckpointImpl checkpoint(db);
847 uint64_t sequence_number = 0;
848 DBOptions db_options = db->GetDBOptions();
849 EnvOptions src_raw_env_options(db_options);
850 s = checkpoint.CreateCustomCheckpoint(
851 db_options,
852 [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
853 FileType) {
854 // custom checkpoint will switch to calling copy_file_cb after it sees
855 // NotSupported returned from link_file_cb.
856 return Status::NotSupported();
857 } /* link_file_cb */,
858 [&](const std::string& src_dirname, const std::string& fname,
859 uint64_t size_limit_bytes, FileType type) {
860 if (type == kLogFile && !options_.backup_log_files) {
861 return Status::OK();
862 }
863 Log(options_.info_log, "add file for backup %s", fname.c_str());
864 uint64_t size_bytes = 0;
865 Status st;
866 if (type == kTableFile) {
867 st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
868 }
869 EnvOptions src_env_options;
870 switch (type) {
871 case kLogFile:
872 src_env_options =
873 db_env_->OptimizeForLogRead(src_raw_env_options);
874 break;
875 case kTableFile:
876 src_env_options = db_env_->OptimizeForCompactionTableRead(
877 src_raw_env_options, ImmutableDBOptions(db_options));
878 break;
879 case kDescriptorFile:
880 src_env_options =
881 db_env_->OptimizeForManifestRead(src_raw_env_options);
882 break;
883 default:
884 // Other backed up files (like options file) are not read by live
885 // DB, so don't need to worry about avoiding mixing buffered and
886 // direct I/O. Just use plain defaults.
887 src_env_options = src_raw_env_options;
888 break;
889 }
890 if (st.ok()) {
891 st = AddBackupFileWorkItem(
892 live_dst_paths, backup_items_to_finish, new_backup_id,
893 options_.share_table_files && type == kTableFile, src_dirname,
894 fname, src_env_options, rate_limiter, size_bytes,
895 size_limit_bytes,
896 options_.share_files_with_checksum && type == kTableFile,
897 options.progress_callback);
898 }
899 return st;
900 } /* copy_file_cb */,
901 [&](const std::string& fname, const std::string& contents, FileType) {
902 Log(options_.info_log, "add file for backup %s", fname.c_str());
903 return AddBackupFileWorkItem(
904 live_dst_paths, backup_items_to_finish, new_backup_id,
905 false /* shared */, "" /* src_dir */, fname,
906 EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
907 0 /* size_limit */, false /* shared_checksum */,
908 options.progress_callback, contents);
909 } /* create_file_cb */,
910 &sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64);
911 if (s.ok()) {
912 new_backup->SetSequenceNumber(sequence_number);
913 }
914 }
915 ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
916 Status item_status;
917 for (auto& item : backup_items_to_finish) {
918 item.result.wait();
919 auto result = item.result.get();
920 item_status = result.status;
921 if (item_status.ok() && item.shared && item.needed_to_copy) {
922 item_status = item.backup_env->RenameFile(item.dst_path_tmp,
923 item.dst_path);
924 }
925 if (item_status.ok()) {
926 item_status = new_backup.get()->AddFile(
927 std::make_shared<FileInfo>(item.dst_relative,
928 result.size,
929 result.checksum_value));
930 }
931 if (!item_status.ok()) {
932 s = item_status;
933 }
934 }
935
936 // we copied all the files, enable file deletions
937 db->EnableFileDeletions(false);
938
939 auto backup_time = backup_env_->NowMicros() - start_backup;
940
941 if (s.ok()) {
942 // persist the backup metadata on the disk
943 s = new_backup->StoreToFile(options_.sync);
944 }
945 if (s.ok() && options_.sync) {
946 std::unique_ptr<Directory> backup_private_directory;
947 backup_env_->NewDirectory(
948 GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
949 &backup_private_directory);
950 if (backup_private_directory != nullptr) {
951 s = backup_private_directory->Fsync();
952 }
953 if (s.ok() && private_directory_ != nullptr) {
954 s = private_directory_->Fsync();
955 }
956 if (s.ok() && meta_directory_ != nullptr) {
957 s = meta_directory_->Fsync();
958 }
959 if (s.ok() && shared_directory_ != nullptr) {
960 s = shared_directory_->Fsync();
961 }
962 if (s.ok() && backup_directory_ != nullptr) {
963 s = backup_directory_->Fsync();
964 }
965 }
966
967 if (s.ok()) {
968 backup_statistics_.IncrementNumberSuccessBackup();
969 }
970 if (!s.ok()) {
971 backup_statistics_.IncrementNumberFailBackup();
972 // clean all the files we might have created
973 ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
974 s.ToString().c_str());
975 ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
976 backup_statistics_.ToString().c_str());
977 // delete files that we might have already written
978 might_need_garbage_collect_ = true;
979 DeleteBackup(new_backup_id);
980 return s;
981 }
982
983 // here we know that we succeeded and installed the new backup
984 // in the LATEST_BACKUP file
985 latest_backup_id_ = new_backup_id;
986 latest_valid_backup_id_ = new_backup_id;
987 ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
988
989 // backup_speed is in byte/second
990 double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
991 ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
992 new_backup->GetNumberFiles());
993 char human_size[16];
994 AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
995 ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
996 ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
997 backup_time);
998 ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
999 ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
1000 backup_statistics_.ToString().c_str());
1001 return s;
1002 }
1003
PurgeOldBackups(uint32_t num_backups_to_keep)1004 Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
1005 assert(initialized_);
1006 assert(!read_only_);
1007
1008 // Best effort deletion even with errors
1009 Status overall_status = Status::OK();
1010
1011 ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
1012 num_backups_to_keep);
1013 std::vector<BackupID> to_delete;
1014 auto itr = backups_.begin();
1015 while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
1016 to_delete.push_back(itr->first);
1017 itr++;
1018 }
1019 for (auto backup_id : to_delete) {
1020 auto s = DeleteBackupInternal(backup_id);
1021 if (!s.ok()) {
1022 overall_status = s;
1023 }
1024 }
1025 // Clean up after any incomplete backup deletion, potentially from
1026 // earlier session.
1027 if (might_need_garbage_collect_) {
1028 auto s = GarbageCollect();
1029 if (!s.ok() && overall_status.ok()) {
1030 overall_status = s;
1031 }
1032 }
1033 return overall_status;
1034 }
1035
DeleteBackup(BackupID backup_id)1036 Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
1037 auto s1 = DeleteBackupInternal(backup_id);
1038 auto s2 = Status::OK();
1039
1040 // Clean up after any incomplete backup deletion, potentially from
1041 // earlier session.
1042 if (might_need_garbage_collect_) {
1043 s2 = GarbageCollect();
1044 }
1045
1046 if (!s1.ok()) {
1047 return s1;
1048 } else {
1049 return s2;
1050 }
1051 }
1052
1053 // Does not auto-GarbageCollect
DeleteBackupInternal(BackupID backup_id)1054 Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
1055 assert(initialized_);
1056 assert(!read_only_);
1057
1058 ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
1059 auto backup = backups_.find(backup_id);
1060 if (backup != backups_.end()) {
1061 auto s = backup->second->Delete();
1062 if (!s.ok()) {
1063 return s;
1064 }
1065 backups_.erase(backup);
1066 } else {
1067 auto corrupt = corrupt_backups_.find(backup_id);
1068 if (corrupt == corrupt_backups_.end()) {
1069 return Status::NotFound("Backup not found");
1070 }
1071 auto s = corrupt->second.second->Delete();
1072 if (!s.ok()) {
1073 return s;
1074 }
1075 corrupt_backups_.erase(corrupt);
1076 }
1077
1078 // After removing meta file, best effort deletion even with errors.
1079 // (Don't delete other files if we can't delete the meta file right
1080 // now.)
1081 std::vector<std::string> to_delete;
1082 for (auto& itr : backuped_file_infos_) {
1083 if (itr.second->refs == 0) {
1084 Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
1085 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
1086 s.ToString().c_str());
1087 to_delete.push_back(itr.first);
1088 if (!s.ok()) {
1089 // Trying again later might work
1090 might_need_garbage_collect_ = true;
1091 }
1092 }
1093 }
1094 for (auto& td : to_delete) {
1095 backuped_file_infos_.erase(td);
1096 }
1097
1098 // take care of private dirs -- GarbageCollect() will take care of them
1099 // if they are not empty
1100 std::string private_dir = GetPrivateFileRel(backup_id);
1101 Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
1102 ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
1103 private_dir.c_str(), s.ToString().c_str());
1104 if (!s.ok()) {
1105 // Full gc or trying again later might work
1106 might_need_garbage_collect_ = true;
1107 }
1108 return Status::OK();
1109 }
1110
GetBackupInfo(std::vector<BackupInfo> * backup_info)1111 void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1112 assert(initialized_);
1113 backup_info->reserve(backups_.size());
1114 for (auto& backup : backups_) {
1115 if (!backup.second->Empty()) {
1116 backup_info->push_back(BackupInfo(
1117 backup.first, backup.second->GetTimestamp(), backup.second->GetSize(),
1118 backup.second->GetNumberFiles(), backup.second->GetAppMetadata()));
1119 }
1120 }
1121 }
1122
1123 void
GetCorruptedBackups(std::vector<BackupID> * corrupt_backup_ids)1124 BackupEngineImpl::GetCorruptedBackups(
1125 std::vector<BackupID>* corrupt_backup_ids) {
1126 assert(initialized_);
1127 corrupt_backup_ids->reserve(corrupt_backups_.size());
1128 for (auto& backup : corrupt_backups_) {
1129 corrupt_backup_ids->push_back(backup.first);
1130 }
1131 }
1132
RestoreDBFromBackup(const RestoreOptions & options,BackupID backup_id,const std::string & db_dir,const std::string & wal_dir)1133 Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
1134 BackupID backup_id,
1135 const std::string& db_dir,
1136 const std::string& wal_dir) {
1137 assert(initialized_);
1138 auto corrupt_itr = corrupt_backups_.find(backup_id);
1139 if (corrupt_itr != corrupt_backups_.end()) {
1140 return corrupt_itr->second.first;
1141 }
1142 auto backup_itr = backups_.find(backup_id);
1143 if (backup_itr == backups_.end()) {
1144 return Status::NotFound("Backup not found");
1145 }
1146 auto& backup = backup_itr->second;
1147 if (backup->Empty()) {
1148 return Status::NotFound("Backup not found");
1149 }
1150
1151 ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
1152 ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
1153 static_cast<int>(options.keep_log_files));
1154
1155 // just in case. Ignore errors
1156 db_env_->CreateDirIfMissing(db_dir);
1157 db_env_->CreateDirIfMissing(wal_dir);
1158
1159 if (options.keep_log_files) {
1160 // delete files in db_dir, but keep all the log files
1161 DeleteChildren(db_dir, 1 << kLogFile);
1162 // move all the files from archive dir to wal_dir
1163 std::string archive_dir = ArchivalDirectory(wal_dir);
1164 std::vector<std::string> archive_files;
1165 db_env_->GetChildren(archive_dir, &archive_files); // ignore errors
1166 for (const auto& f : archive_files) {
1167 uint64_t number;
1168 FileType type;
1169 bool ok = ParseFileName(f, &number, &type);
1170 if (ok && type == kLogFile) {
1171 ROCKS_LOG_INFO(options_.info_log,
1172 "Moving log file from archive/ to wal_dir: %s",
1173 f.c_str());
1174 Status s =
1175 db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
1176 if (!s.ok()) {
1177 // if we can't move log file from archive_dir to wal_dir,
1178 // we should fail, since it might mean data loss
1179 return s;
1180 }
1181 }
1182 }
1183 } else {
1184 DeleteChildren(wal_dir);
1185 DeleteChildren(ArchivalDirectory(wal_dir));
1186 DeleteChildren(db_dir);
1187 }
1188
1189 RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
1190 if (rate_limiter) {
1191 copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
1192 }
1193 Status s;
1194 std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
1195 for (const auto& file_info : backup->GetFiles()) {
1196 const std::string &file = file_info->filename;
1197 std::string dst;
1198 // 1. extract the filename
1199 size_t slash = file.find_last_of('/');
1200 // file will either be shared/<file>, shared_checksum/<file_crc32_size>
1201 // or private/<number>/<file>
1202 assert(slash != std::string::npos);
1203 dst = file.substr(slash + 1);
1204
1205 // if the file was in shared_checksum, extract the real file name
1206 // in this case the file is <number>_<checksum>_<size>.<type>
1207 if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
1208 dst = GetFileFromChecksumFile(dst);
1209 }
1210
1211 // 2. find the filetype
1212 uint64_t number;
1213 FileType type;
1214 bool ok = ParseFileName(dst, &number, &type);
1215 if (!ok) {
1216 return Status::Corruption("Backup corrupted");
1217 }
1218 // 3. Construct the final path
1219 // kLogFile lives in wal_dir and all the rest live in db_dir
1220 dst = ((type == kLogFile) ? wal_dir : db_dir) +
1221 "/" + dst;
1222
1223 ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
1224 dst.c_str());
1225 CopyOrCreateWorkItem copy_or_create_work_item(
1226 GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
1227 EnvOptions() /* src_env_options */, false, rate_limiter,
1228 0 /* size_limit */);
1229 RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1230 copy_or_create_work_item.result.get_future(),
1231 file_info->checksum_value);
1232 files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1233 restore_items_to_finish.push_back(
1234 std::move(after_copy_or_create_work_item));
1235 }
1236 Status item_status;
1237 for (auto& item : restore_items_to_finish) {
1238 item.result.wait();
1239 auto result = item.result.get();
1240 item_status = result.status;
1241 // Note: It is possible that both of the following bad-status cases occur
1242 // during copying. But, we only return one status.
1243 if (!item_status.ok()) {
1244 s = item_status;
1245 break;
1246 } else if (item.checksum_value != result.checksum_value) {
1247 s = Status::Corruption("Checksum check failed");
1248 break;
1249 }
1250 }
1251
1252 ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
1253 s.ToString().c_str());
1254 return s;
1255 }
1256
VerifyBackup(BackupID backup_id)1257 Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
1258 assert(initialized_);
1259 auto corrupt_itr = corrupt_backups_.find(backup_id);
1260 if (corrupt_itr != corrupt_backups_.end()) {
1261 return corrupt_itr->second.first;
1262 }
1263
1264 auto backup_itr = backups_.find(backup_id);
1265 if (backup_itr == backups_.end()) {
1266 return Status::NotFound();
1267 }
1268
1269 auto& backup = backup_itr->second;
1270 if (backup->Empty()) {
1271 return Status::NotFound();
1272 }
1273
1274 ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
1275
1276 std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
1277 for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
1278 GetSharedFileWithChecksumRel()}) {
1279 const auto abs_dir = GetAbsolutePath(rel_dir);
1280 InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size);
1281 }
1282
1283 for (const auto& file_info : backup->GetFiles()) {
1284 const auto abs_path = GetAbsolutePath(file_info->filename);
1285 if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
1286 return Status::NotFound("File missing: " + abs_path);
1287 }
1288 if (file_info->size != curr_abs_path_to_size[abs_path]) {
1289 return Status::Corruption("File corrupted: " + abs_path);
1290 }
1291 }
1292 return Status::OK();
1293 }
1294
CopyOrCreateFile(const std::string & src,const std::string & dst,const std::string & contents,Env * src_env,Env * dst_env,const EnvOptions & src_env_options,bool sync,RateLimiter * rate_limiter,uint64_t * size,uint32_t * checksum_value,uint64_t size_limit,std::function<void ()> progress_callback)1295 Status BackupEngineImpl::CopyOrCreateFile(
1296 const std::string& src, const std::string& dst, const std::string& contents,
1297 Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
1298 RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value,
1299 uint64_t size_limit, std::function<void()> progress_callback) {
1300 assert(src.empty() != contents.empty());
1301 Status s;
1302 std::unique_ptr<WritableFile> dst_file;
1303 std::unique_ptr<SequentialFile> src_file;
1304 EnvOptions dst_env_options;
1305 dst_env_options.use_mmap_writes = false;
1306 // TODO:(gzh) maybe use direct reads/writes here if possible
1307 if (size != nullptr) {
1308 *size = 0;
1309 }
1310 if (checksum_value != nullptr) {
1311 *checksum_value = 0;
1312 }
1313
1314 // Check if size limit is set. if not, set it to very big number
1315 if (size_limit == 0) {
1316 size_limit = std::numeric_limits<uint64_t>::max();
1317 }
1318
1319 s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options);
1320 if (s.ok() && !src.empty()) {
1321 s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1322 }
1323 if (!s.ok()) {
1324 return s;
1325 }
1326
1327 std::unique_ptr<WritableFileWriter> dest_writer(new WritableFileWriter(
1328 NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options));
1329 std::unique_ptr<SequentialFileReader> src_reader;
1330 std::unique_ptr<char[]> buf;
1331 if (!src.empty()) {
1332 src_reader.reset(new SequentialFileReader(
1333 NewLegacySequentialFileWrapper(src_file), src));
1334 buf.reset(new char[copy_file_buffer_size_]);
1335 }
1336
1337 Slice data;
1338 uint64_t processed_buffer_size = 0;
1339 do {
1340 if (stop_backup_.load(std::memory_order_acquire)) {
1341 return Status::Incomplete("Backup stopped");
1342 }
1343 if (!src.empty()) {
1344 size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
1345 ? copy_file_buffer_size_
1346 : static_cast<size_t>(size_limit);
1347 s = src_reader->Read(buffer_to_read, &data, buf.get());
1348 processed_buffer_size += buffer_to_read;
1349 } else {
1350 data = contents;
1351 }
1352 size_limit -= data.size();
1353
1354 if (!s.ok()) {
1355 return s;
1356 }
1357
1358 if (size != nullptr) {
1359 *size += data.size();
1360 }
1361 if (checksum_value != nullptr) {
1362 *checksum_value =
1363 crc32c::Extend(*checksum_value, data.data(), data.size());
1364 }
1365 s = dest_writer->Append(data);
1366 if (rate_limiter != nullptr) {
1367 rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
1368 RateLimiter::OpType::kWrite);
1369 }
1370 if (processed_buffer_size > options_.callback_trigger_interval_size) {
1371 processed_buffer_size -= options_.callback_trigger_interval_size;
1372 std::lock_guard<std::mutex> lock(byte_report_mutex_);
1373 progress_callback();
1374 }
1375 } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
1376
1377 if (s.ok() && sync) {
1378 s = dest_writer->Sync(false);
1379 }
1380 if (s.ok()) {
1381 s = dest_writer->Close();
1382 }
1383 return s;
1384 }
1385
1386 // fname will always start with "/"
AddBackupFileWorkItem(std::unordered_set<std::string> & live_dst_paths,std::vector<BackupAfterCopyOrCreateWorkItem> & backup_items_to_finish,BackupID backup_id,bool shared,const std::string & src_dir,const std::string & fname,const EnvOptions & src_env_options,RateLimiter * rate_limiter,uint64_t size_bytes,uint64_t size_limit,bool shared_checksum,std::function<void ()> progress_callback,const std::string & contents)1387 Status BackupEngineImpl::AddBackupFileWorkItem(
1388 std::unordered_set<std::string>& live_dst_paths,
1389 std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
1390 BackupID backup_id, bool shared, const std::string& src_dir,
1391 const std::string& fname, const EnvOptions& src_env_options,
1392 RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
1393 bool shared_checksum, std::function<void()> progress_callback,
1394 const std::string& contents) {
1395 assert(!fname.empty() && fname[0] == '/');
1396 assert(contents.empty() != src_dir.empty());
1397
1398 std::string dst_relative = fname.substr(1);
1399 std::string dst_relative_tmp;
1400 Status s;
1401 uint32_t checksum_value = 0;
1402
1403 if (shared && shared_checksum) {
1404 // add checksum and file length to the file name
1405 s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit,
1406 &checksum_value);
1407 if (!s.ok()) {
1408 return s;
1409 }
1410 if (size_bytes == port::kMaxUint64) {
1411 return Status::NotFound("File missing: " + src_dir + fname);
1412 }
1413 dst_relative =
1414 GetSharedFileWithChecksum(dst_relative, checksum_value, size_bytes);
1415 dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
1416 dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
1417 } else if (shared) {
1418 dst_relative_tmp = GetSharedFileRel(dst_relative, true);
1419 dst_relative = GetSharedFileRel(dst_relative, false);
1420 } else {
1421 dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
1422 }
1423
1424 // We copy into `temp_dest_path` and, once finished, rename it to
1425 // `final_dest_path`. This allows files to atomically appear at
1426 // `final_dest_path`. We can copy directly to the final path when atomicity
1427 // is unnecessary, like for files in private backup directories.
1428 const std::string* copy_dest_path;
1429 std::string temp_dest_path;
1430 std::string final_dest_path = GetAbsolutePath(dst_relative);
1431 if (!dst_relative_tmp.empty()) {
1432 temp_dest_path = GetAbsolutePath(dst_relative_tmp);
1433 copy_dest_path = &temp_dest_path;
1434 } else {
1435 copy_dest_path = &final_dest_path;
1436 }
1437
1438 // if it's shared, we also need to check if it exists -- if it does, no need
1439 // to copy it again.
1440 bool need_to_copy = true;
1441 // true if final_dest_path is the same path as another live file
1442 const bool same_path =
1443 live_dst_paths.find(final_dest_path) != live_dst_paths.end();
1444
1445 bool file_exists = false;
1446 if (shared && !same_path) {
1447 Status exist = backup_env_->FileExists(final_dest_path);
1448 if (exist.ok()) {
1449 file_exists = true;
1450 } else if (exist.IsNotFound()) {
1451 file_exists = false;
1452 } else {
1453 assert(s.IsIOError());
1454 return exist;
1455 }
1456 }
1457
1458 if (!contents.empty()) {
1459 need_to_copy = false;
1460 } else if (shared && (same_path || file_exists)) {
1461 need_to_copy = false;
1462 if (shared_checksum) {
1463 ROCKS_LOG_INFO(options_.info_log,
1464 "%s already present, with checksum %u and size %" PRIu64,
1465 fname.c_str(), checksum_value, size_bytes);
1466 } else if (backuped_file_infos_.find(dst_relative) ==
1467 backuped_file_infos_.end() && !same_path) {
1468 // file already exists, but it's not referenced by any backup. overwrite
1469 // the file
1470 ROCKS_LOG_INFO(
1471 options_.info_log,
1472 "%s already present, but not referenced by any backup. We will "
1473 "overwrite the file.",
1474 fname.c_str());
1475 need_to_copy = true;
1476 backup_env_->DeleteFile(final_dest_path);
1477 } else {
1478 // the file is present and referenced by a backup
1479 ROCKS_LOG_INFO(options_.info_log,
1480 "%s already present, calculate checksum", fname.c_str());
1481 s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
1482 size_limit, &checksum_value);
1483 }
1484 }
1485 live_dst_paths.insert(final_dest_path);
1486
1487 if (!contents.empty() || need_to_copy) {
1488 ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
1489 copy_dest_path->c_str());
1490 CopyOrCreateWorkItem copy_or_create_work_item(
1491 src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
1492 db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
1493 size_limit, progress_callback);
1494 BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1495 copy_or_create_work_item.result.get_future(), shared, need_to_copy,
1496 backup_env_, temp_dest_path, final_dest_path, dst_relative);
1497 files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1498 backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1499 } else {
1500 std::promise<CopyOrCreateResult> promise_result;
1501 BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1502 promise_result.get_future(), shared, need_to_copy, backup_env_,
1503 temp_dest_path, final_dest_path, dst_relative);
1504 backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1505 CopyOrCreateResult result;
1506 result.status = s;
1507 result.size = size_bytes;
1508 result.checksum_value = checksum_value;
1509 promise_result.set_value(std::move(result));
1510 }
1511 return s;
1512 }
1513
CalculateChecksum(const std::string & src,Env * src_env,const EnvOptions & src_env_options,uint64_t size_limit,uint32_t * checksum_value)1514 Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
1515 const EnvOptions& src_env_options,
1516 uint64_t size_limit,
1517 uint32_t* checksum_value) {
1518 *checksum_value = 0;
1519 if (size_limit == 0) {
1520 size_limit = std::numeric_limits<uint64_t>::max();
1521 }
1522
1523 std::unique_ptr<SequentialFile> src_file;
1524 Status s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1525 if (!s.ok()) {
1526 return s;
1527 }
1528
1529 std::unique_ptr<SequentialFileReader> src_reader(
1530 new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src));
1531 std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
1532 Slice data;
1533
1534 do {
1535 if (stop_backup_.load(std::memory_order_acquire)) {
1536 return Status::Incomplete("Backup stopped");
1537 }
1538 size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
1539 copy_file_buffer_size_ : static_cast<size_t>(size_limit);
1540 s = src_reader->Read(buffer_to_read, &data, buf.get());
1541
1542 if (!s.ok()) {
1543 return s;
1544 }
1545
1546 size_limit -= data.size();
1547 *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
1548 } while (data.size() > 0 && size_limit > 0);
1549
1550 return s;
1551 }
1552
DeleteChildren(const std::string & dir,uint32_t file_type_filter)1553 void BackupEngineImpl::DeleteChildren(const std::string& dir,
1554 uint32_t file_type_filter) {
1555 std::vector<std::string> children;
1556 db_env_->GetChildren(dir, &children); // ignore errors
1557
1558 for (const auto& f : children) {
1559 uint64_t number;
1560 FileType type;
1561 bool ok = ParseFileName(f, &number, &type);
1562 if (ok && (file_type_filter & (1 << type))) {
1563 // don't delete this file
1564 continue;
1565 }
1566 db_env_->DeleteFile(dir + "/" + f); // ignore errors
1567 }
1568 }
1569
InsertPathnameToSizeBytes(const std::string & dir,Env * env,std::unordered_map<std::string,uint64_t> * result)1570 Status BackupEngineImpl::InsertPathnameToSizeBytes(
1571 const std::string& dir, Env* env,
1572 std::unordered_map<std::string, uint64_t>* result) {
1573 assert(result != nullptr);
1574 std::vector<Env::FileAttributes> files_attrs;
1575 Status status = env->FileExists(dir);
1576 if (status.ok()) {
1577 status = env->GetChildrenFileAttributes(dir, &files_attrs);
1578 } else if (status.IsNotFound()) {
1579 // Insert no entries can be considered success
1580 status = Status::OK();
1581 }
1582 const bool slash_needed = dir.empty() || dir.back() != '/';
1583 for (const auto& file_attrs : files_attrs) {
1584 result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
1585 file_attrs.size_bytes);
1586 }
1587 return status;
1588 }
1589
GarbageCollect()1590 Status BackupEngineImpl::GarbageCollect() {
1591 assert(!read_only_);
1592
1593 // We will make a best effort to remove all garbage even in the presence
1594 // of inconsistencies or I/O failures that inhibit finding garbage.
1595 Status overall_status = Status::OK();
1596 // If all goes well, we don't need another auto-GC this session
1597 might_need_garbage_collect_ = false;
1598
1599 ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
1600
1601 // delete obsolete shared files
1602 for (bool with_checksum : {false, true}) {
1603 std::vector<std::string> shared_children;
1604 {
1605 std::string shared_path;
1606 if (with_checksum) {
1607 shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
1608 } else {
1609 shared_path = GetAbsolutePath(GetSharedFileRel());
1610 }
1611 auto s = backup_env_->FileExists(shared_path);
1612 if (s.ok()) {
1613 s = backup_env_->GetChildren(shared_path, &shared_children);
1614 } else if (s.IsNotFound()) {
1615 s = Status::OK();
1616 }
1617 if (!s.ok()) {
1618 overall_status = s;
1619 // Trying again later might work
1620 might_need_garbage_collect_ = true;
1621 }
1622 }
1623 for (auto& child : shared_children) {
1624 if (child == "." || child == "..") {
1625 continue;
1626 }
1627 std::string rel_fname;
1628 if (with_checksum) {
1629 rel_fname = GetSharedFileWithChecksumRel(child);
1630 } else {
1631 rel_fname = GetSharedFileRel(child);
1632 }
1633 auto child_itr = backuped_file_infos_.find(rel_fname);
1634 // if it's not refcounted, delete it
1635 if (child_itr == backuped_file_infos_.end() ||
1636 child_itr->second->refs == 0) {
1637 // this might be a directory, but DeleteFile will just fail in that
1638 // case, so we're good
1639 Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
1640 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1641 rel_fname.c_str(), s.ToString().c_str());
1642 backuped_file_infos_.erase(rel_fname);
1643 if (!s.ok()) {
1644 // Trying again later might work
1645 might_need_garbage_collect_ = true;
1646 }
1647 }
1648 }
1649 }
1650
1651 // delete obsolete private files
1652 std::vector<std::string> private_children;
1653 {
1654 auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
1655 &private_children);
1656 if (!s.ok()) {
1657 overall_status = s;
1658 // Trying again later might work
1659 might_need_garbage_collect_ = true;
1660 }
1661 }
1662 for (auto& child : private_children) {
1663 if (child == "." || child == "..") {
1664 continue;
1665 }
1666
1667 BackupID backup_id = 0;
1668 bool tmp_dir = child.find(".tmp") != std::string::npos;
1669 sscanf(child.c_str(), "%u", &backup_id);
1670 if (!tmp_dir && // if it's tmp_dir, delete it
1671 (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
1672 // it's either not a number or it's still alive. continue
1673 continue;
1674 }
1675 // here we have to delete the dir and all its children
1676 std::string full_private_path =
1677 GetAbsolutePath(GetPrivateFileRel(backup_id));
1678 std::vector<std::string> subchildren;
1679 backup_env_->GetChildren(full_private_path, &subchildren);
1680 for (auto& subchild : subchildren) {
1681 if (subchild == "." || subchild == "..") {
1682 continue;
1683 }
1684 Status s = backup_env_->DeleteFile(full_private_path + subchild);
1685 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1686 (full_private_path + subchild).c_str(),
1687 s.ToString().c_str());
1688 if (!s.ok()) {
1689 // Trying again later might work
1690 might_need_garbage_collect_ = true;
1691 }
1692 }
1693 // finally delete the private dir
1694 Status s = backup_env_->DeleteDir(full_private_path);
1695 ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
1696 full_private_path.c_str(), s.ToString().c_str());
1697 if (!s.ok()) {
1698 // Trying again later might work
1699 might_need_garbage_collect_ = true;
1700 }
1701 }
1702
1703 assert(overall_status.ok() || might_need_garbage_collect_);
1704 return overall_status;
1705 }
1706
1707 // ------- BackupMeta class --------
1708
AddFile(std::shared_ptr<FileInfo> file_info)1709 Status BackupEngineImpl::BackupMeta::AddFile(
1710 std::shared_ptr<FileInfo> file_info) {
1711 auto itr = file_infos_->find(file_info->filename);
1712 if (itr == file_infos_->end()) {
1713 auto ret = file_infos_->insert({file_info->filename, file_info});
1714 if (ret.second) {
1715 itr = ret.first;
1716 itr->second->refs = 1;
1717 } else {
1718 // if this happens, something is seriously wrong
1719 return Status::Corruption("In memory metadata insertion error");
1720 }
1721 } else {
1722 if (itr->second->checksum_value != file_info->checksum_value) {
1723 return Status::Corruption(
1724 "Checksum mismatch for existing backup file. Delete old backups and "
1725 "try again.");
1726 }
1727 ++itr->second->refs; // increase refcount if already present
1728 }
1729
1730 size_ += file_info->size;
1731 files_.push_back(itr->second);
1732
1733 return Status::OK();
1734 }
1735
Delete(bool delete_meta)1736 Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
1737 Status s;
1738 for (const auto& file : files_) {
1739 --file->refs; // decrease refcount
1740 }
1741 files_.clear();
1742 // delete meta file
1743 if (delete_meta) {
1744 s = env_->FileExists(meta_filename_);
1745 if (s.ok()) {
1746 s = env_->DeleteFile(meta_filename_);
1747 } else if (s.IsNotFound()) {
1748 s = Status::OK(); // nothing to delete
1749 }
1750 }
1751 timestamp_ = 0;
1752 return s;
1753 }
1754
1755 Slice kMetaDataPrefix("metadata ");
1756
1757 // each backup meta file is of the format:
1758 // <timestamp>
1759 // <seq number>
1760 // <metadata(literal string)> <metadata> (optional)
1761 // <number of files>
1762 // <file1> <crc32(literal string)> <crc32_value>
1763 // <file2> <crc32(literal string)> <crc32_value>
1764 // ...
LoadFromFile(const std::string & backup_dir,const std::unordered_map<std::string,uint64_t> & abs_path_to_size)1765 Status BackupEngineImpl::BackupMeta::LoadFromFile(
1766 const std::string& backup_dir,
1767 const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
1768 assert(Empty());
1769 Status s;
1770 std::unique_ptr<SequentialFile> backup_meta_file;
1771 s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
1772 if (!s.ok()) {
1773 return s;
1774 }
1775
1776 std::unique_ptr<SequentialFileReader> backup_meta_reader(
1777 new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file),
1778 meta_filename_));
1779 std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
1780 Slice data;
1781 s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
1782
1783 if (!s.ok() || data.size() == max_backup_meta_file_size_) {
1784 return s.ok() ? Status::Corruption("File size too big") : s;
1785 }
1786 buf[data.size()] = 0;
1787
1788 uint32_t num_files = 0;
1789 char *next;
1790 timestamp_ = strtoull(data.data(), &next, 10);
1791 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1792 sequence_number_ = strtoull(data.data(), &next, 10);
1793 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1794
1795 if (data.starts_with(kMetaDataPrefix)) {
1796 // app metadata present
1797 data.remove_prefix(kMetaDataPrefix.size());
1798 Slice hex_encoded_metadata = GetSliceUntil(&data, '\n');
1799 bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_);
1800 if (!decode_success) {
1801 return Status::Corruption(
1802 "Failed to decode stored hex encoded app metadata");
1803 }
1804 }
1805
1806 num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
1807 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1808
1809 std::vector<std::shared_ptr<FileInfo>> files;
1810
1811 Slice checksum_prefix("crc32 ");
1812
1813 for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
1814 auto line = GetSliceUntil(&data, '\n');
1815 std::string filename = GetSliceUntil(&line, ' ').ToString();
1816
1817 uint64_t size;
1818 const std::shared_ptr<FileInfo> file_info = GetFile(filename);
1819 if (file_info) {
1820 size = file_info->size;
1821 } else {
1822 std::string abs_path = backup_dir + "/" + filename;
1823 try {
1824 size = abs_path_to_size.at(abs_path);
1825 } catch (std::out_of_range&) {
1826 return Status::Corruption("Size missing for pathname: " + abs_path);
1827 }
1828 }
1829
1830 if (line.empty()) {
1831 return Status::Corruption("File checksum is missing for " + filename +
1832 " in " + meta_filename_);
1833 }
1834
1835 uint32_t checksum_value = 0;
1836 if (line.starts_with(checksum_prefix)) {
1837 line.remove_prefix(checksum_prefix.size());
1838 checksum_value = static_cast<uint32_t>(
1839 strtoul(line.data(), nullptr, 10));
1840 if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
1841 return Status::Corruption("Invalid checksum value for " + filename +
1842 " in " + meta_filename_);
1843 }
1844 } else {
1845 return Status::Corruption("Unknown checksum type for " + filename +
1846 " in " + meta_filename_);
1847 }
1848
1849 files.emplace_back(new FileInfo(filename, size, checksum_value));
1850 }
1851
1852 if (s.ok() && data.size() > 0) {
1853 // file has to be read completely. if not, we count it as corruption
1854 s = Status::Corruption("Tailing data in backup meta file in " +
1855 meta_filename_);
1856 }
1857
1858 if (s.ok()) {
1859 files_.reserve(files.size());
1860 for (const auto& file_info : files) {
1861 s = AddFile(file_info);
1862 if (!s.ok()) {
1863 break;
1864 }
1865 }
1866 }
1867
1868 return s;
1869 }
1870
StoreToFile(bool sync)1871 Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
1872 Status s;
1873 std::unique_ptr<WritableFile> backup_meta_file;
1874 EnvOptions env_options;
1875 env_options.use_mmap_writes = false;
1876 env_options.use_direct_writes = false;
1877 s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
1878 if (!s.ok()) {
1879 return s;
1880 }
1881
1882 std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
1883 size_t len = 0, buf_size = max_backup_meta_file_size_;
1884 len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
1885 len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
1886 sequence_number_);
1887 if (!app_metadata_.empty()) {
1888 std::string hex_encoded_metadata =
1889 Slice(app_metadata_).ToString(/* hex */ true);
1890
1891 // +1 to accommodate newline character
1892 size_t hex_meta_strlen = kMetaDataPrefix.ToString().length() + hex_encoded_metadata.length() + 1;
1893 if (hex_meta_strlen >= buf_size) {
1894 return Status::Corruption("Buffer too small to fit backup metadata");
1895 }
1896 else if (len + hex_meta_strlen >= buf_size) {
1897 backup_meta_file->Append(Slice(buf.get(), len));
1898 buf.reset();
1899 std::unique_ptr<char[]> new_reset_buf(
1900 new char[max_backup_meta_file_size_]);
1901 buf.swap(new_reset_buf);
1902 len = 0;
1903 }
1904 len += snprintf(buf.get() + len, buf_size - len, "%s%s\n",
1905 kMetaDataPrefix.ToString().c_str(),
1906 hex_encoded_metadata.c_str());
1907 }
1908
1909 char writelen_temp[19];
1910 if (len + snprintf(writelen_temp, sizeof(writelen_temp),
1911 "%" ROCKSDB_PRIszt "\n", files_.size()) >= buf_size) {
1912 backup_meta_file->Append(Slice(buf.get(), len));
1913 buf.reset();
1914 std::unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
1915 buf.swap(new_reset_buf);
1916 len = 0;
1917 }
1918 {
1919 const char *const_write = writelen_temp;
1920 len += snprintf(buf.get() + len, buf_size - len, "%s", const_write);
1921 }
1922
1923 for (const auto& file : files_) {
1924 // use crc32 for now, switch to something else if needed
1925
1926 size_t newlen = len + file->filename.length() + snprintf(writelen_temp,
1927 sizeof(writelen_temp), " crc32 %u\n", file->checksum_value);
1928 const char *const_write = writelen_temp;
1929 if (newlen >= buf_size) {
1930 backup_meta_file->Append(Slice(buf.get(), len));
1931 buf.reset();
1932 std::unique_ptr<char[]> new_reset_buf(
1933 new char[max_backup_meta_file_size_]);
1934 buf.swap(new_reset_buf);
1935 len = 0;
1936 }
1937 len += snprintf(buf.get() + len, buf_size - len, "%s%s",
1938 file->filename.c_str(), const_write);
1939 }
1940
1941 s = backup_meta_file->Append(Slice(buf.get(), len));
1942 if (s.ok() && sync) {
1943 s = backup_meta_file->Sync();
1944 }
1945 if (s.ok()) {
1946 s = backup_meta_file->Close();
1947 }
1948 if (s.ok()) {
1949 s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
1950 }
1951 return s;
1952 }
1953
1954 // -------- BackupEngineReadOnlyImpl ---------
1955 class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
1956 public:
BackupEngineReadOnlyImpl(const BackupableDBOptions & options,Env * db_env)1957 BackupEngineReadOnlyImpl(const BackupableDBOptions& options, Env* db_env)
1958 : backup_engine_(new BackupEngineImpl(options, db_env, true)) {}
1959
~BackupEngineReadOnlyImpl()1960 ~BackupEngineReadOnlyImpl() override {}
1961
1962 // The returned BackupInfos are in chronological order, which means the
1963 // latest backup comes last.
GetBackupInfo(std::vector<BackupInfo> * backup_info)1964 void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
1965 backup_engine_->GetBackupInfo(backup_info);
1966 }
1967
GetCorruptedBackups(std::vector<BackupID> * corrupt_backup_ids)1968 void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override {
1969 backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
1970 }
1971
1972 using BackupEngineReadOnly::RestoreDBFromBackup;
RestoreDBFromBackup(const RestoreOptions & options,BackupID backup_id,const std::string & db_dir,const std::string & wal_dir)1973 Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
1974 const std::string& db_dir,
1975 const std::string& wal_dir) override {
1976 return backup_engine_->RestoreDBFromBackup(options, backup_id, db_dir,
1977 wal_dir);
1978 }
1979
1980 using BackupEngineReadOnly::RestoreDBFromLatestBackup;
RestoreDBFromLatestBackup(const RestoreOptions & options,const std::string & db_dir,const std::string & wal_dir)1981 Status RestoreDBFromLatestBackup(const RestoreOptions& options,
1982 const std::string& db_dir,
1983 const std::string& wal_dir) override {
1984 return backup_engine_->RestoreDBFromLatestBackup(options, db_dir, wal_dir);
1985 }
1986
VerifyBackup(BackupID backup_id)1987 Status VerifyBackup(BackupID backup_id) override {
1988 return backup_engine_->VerifyBackup(backup_id);
1989 }
1990
Initialize()1991 Status Initialize() { return backup_engine_->Initialize(); }
1992
1993 private:
1994 std::unique_ptr<BackupEngineImpl> backup_engine_;
1995 };
1996
Open(const BackupableDBOptions & options,Env * env,BackupEngineReadOnly ** backup_engine_ptr)1997 Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env,
1998 BackupEngineReadOnly** backup_engine_ptr) {
1999 if (options.destroy_old_data) {
2000 return Status::InvalidArgument(
2001 "Can't destroy old data with ReadOnly BackupEngine");
2002 }
2003 std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
2004 new BackupEngineReadOnlyImpl(options, env));
2005 auto s = backup_engine->Initialize();
2006 if (!s.ok()) {
2007 *backup_engine_ptr = nullptr;
2008 return s;
2009 }
2010 *backup_engine_ptr = backup_engine.release();
2011 return Status::OK();
2012 }
2013
2014 } // namespace ROCKSDB_NAMESPACE
2015
2016 #endif // ROCKSDB_LITE
2017