1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #ifndef ROCKSDB_LITE
11 
12 #include <stdlib.h>
13 #include <algorithm>
14 #include <atomic>
15 #include <cinttypes>
16 #include <functional>
17 #include <future>
18 #include <limits>
19 #include <map>
20 #include <mutex>
21 #include <sstream>
22 #include <string>
23 #include <thread>
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <vector>
27 
28 #include "env/composite_env_wrapper.h"
29 #include "file/filename.h"
30 #include "file/sequence_file_reader.h"
31 #include "file/writable_file_writer.h"
32 #include "logging/logging.h"
33 #include "port/port.h"
34 #include "rocksdb/rate_limiter.h"
35 #include "rocksdb/transaction_log.h"
36 #include "rocksdb/utilities/backupable_db.h"
37 #include "test_util/sync_point.h"
38 #include "util/channel.h"
39 #include "util/coding.h"
40 #include "util/crc32c.h"
41 #include "util/string_util.h"
42 #include "utilities/checkpoint/checkpoint_impl.h"
43 
44 namespace ROCKSDB_NAMESPACE {
45 
IncrementNumberSuccessBackup()46 void BackupStatistics::IncrementNumberSuccessBackup() {
47   number_success_backup++;
48 }
IncrementNumberFailBackup()49 void BackupStatistics::IncrementNumberFailBackup() {
50   number_fail_backup++;
51 }
52 
GetNumberSuccessBackup() const53 uint32_t BackupStatistics::GetNumberSuccessBackup() const {
54   return number_success_backup;
55 }
GetNumberFailBackup() const56 uint32_t BackupStatistics::GetNumberFailBackup() const {
57   return number_fail_backup;
58 }
59 
ToString() const60 std::string BackupStatistics::ToString() const {
61   char result[50];
62   snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
63            GetNumberSuccessBackup(), GetNumberFailBackup());
64   return result;
65 }
66 
Dump(Logger * logger) const67 void BackupableDBOptions::Dump(Logger* logger) const {
68   ROCKS_LOG_INFO(logger, "               Options.backup_dir: %s",
69                  backup_dir.c_str());
70   ROCKS_LOG_INFO(logger, "               Options.backup_env: %p", backup_env);
71   ROCKS_LOG_INFO(logger, "        Options.share_table_files: %d",
72                  static_cast<int>(share_table_files));
73   ROCKS_LOG_INFO(logger, "                 Options.info_log: %p", info_log);
74   ROCKS_LOG_INFO(logger, "                     Options.sync: %d",
75                  static_cast<int>(sync));
76   ROCKS_LOG_INFO(logger, "         Options.destroy_old_data: %d",
77                  static_cast<int>(destroy_old_data));
78   ROCKS_LOG_INFO(logger, "         Options.backup_log_files: %d",
79                  static_cast<int>(backup_log_files));
80   ROCKS_LOG_INFO(logger, "        Options.backup_rate_limit: %" PRIu64,
81                  backup_rate_limit);
82   ROCKS_LOG_INFO(logger, "       Options.restore_rate_limit: %" PRIu64,
83                  restore_rate_limit);
84   ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
85                  max_background_operations);
86 }
87 
88 // -------- BackupEngineImpl class ---------
89 class BackupEngineImpl : public BackupEngine {
90  public:
91   BackupEngineImpl(const BackupableDBOptions& options, Env* db_env,
92                    bool read_only = false);
93   ~BackupEngineImpl() override;
94 
95   using BackupEngine::CreateNewBackupWithMetadata;
96   Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
97                                      const std::string& app_metadata) override;
98 
99   Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
100 
101   Status DeleteBackup(BackupID backup_id) override;
102 
StopBackup()103   void StopBackup() override {
104     stop_backup_.store(true, std::memory_order_release);
105   }
106 
107   Status GarbageCollect() override;
108 
109   // The returned BackupInfos are in chronological order, which means the
110   // latest backup comes last.
111   void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
112 
113   void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
114 
115   using BackupEngine::RestoreDBFromBackup;
116   Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
117                              const std::string& db_dir,
118                              const std::string& wal_dir) override;
119 
120   using BackupEngine::RestoreDBFromLatestBackup;
RestoreDBFromLatestBackup(const RestoreOptions & options,const std::string & db_dir,const std::string & wal_dir)121   Status RestoreDBFromLatestBackup(const RestoreOptions& options,
122                                    const std::string& db_dir,
123                                    const std::string& wal_dir) override {
124     return RestoreDBFromBackup(options, latest_valid_backup_id_, db_dir,
125                                wal_dir);
126   }
127 
128   Status VerifyBackup(BackupID backup_id) override;
129 
130   Status Initialize();
131 
132  private:
133   void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
134   Status DeleteBackupInternal(BackupID backup_id);
135 
136   // Extends the "result" map with pathname->size mappings for the contents of
137   // "dir" in "env". Pathnames are prefixed with "dir".
138   Status InsertPathnameToSizeBytes(
139       const std::string& dir, Env* env,
140       std::unordered_map<std::string, uint64_t>* result);
141 
142   struct FileInfo {
FileInfoROCKSDB_NAMESPACE::BackupEngineImpl::FileInfo143     FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
144       : refs(0), filename(fname), size(sz), checksum_value(checksum) {}
145 
146     FileInfo(const FileInfo&) = delete;
147     FileInfo& operator=(const FileInfo&) = delete;
148 
149     int refs;
150     const std::string filename;
151     const uint64_t size;
152     const uint32_t checksum_value;
153   };
154 
155   class BackupMeta {
156    public:
BackupMeta(const std::string & meta_filename,const std::string & meta_tmp_filename,std::unordered_map<std::string,std::shared_ptr<FileInfo>> * file_infos,Env * env)157     BackupMeta(
158         const std::string& meta_filename, const std::string& meta_tmp_filename,
159         std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
160         Env* env)
161         : timestamp_(0),
162           sequence_number_(0),
163           size_(0),
164           meta_filename_(meta_filename),
165           meta_tmp_filename_(meta_tmp_filename),
166           file_infos_(file_infos),
167           env_(env) {}
168 
169     BackupMeta(const BackupMeta&) = delete;
170     BackupMeta& operator=(const BackupMeta&) = delete;
171 
~BackupMeta()172     ~BackupMeta() {}
173 
RecordTimestamp()174     void RecordTimestamp() {
175       env_->GetCurrentTime(&timestamp_);
176     }
GetTimestamp() const177     int64_t GetTimestamp() const {
178       return timestamp_;
179     }
GetSize() const180     uint64_t GetSize() const {
181       return size_;
182     }
GetNumberFiles()183     uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
SetSequenceNumber(uint64_t sequence_number)184     void SetSequenceNumber(uint64_t sequence_number) {
185       sequence_number_ = sequence_number;
186     }
GetSequenceNumber()187     uint64_t GetSequenceNumber() {
188       return sequence_number_;
189     }
190 
GetAppMetadata() const191     const std::string& GetAppMetadata() const { return app_metadata_; }
192 
SetAppMetadata(const std::string & app_metadata)193     void SetAppMetadata(const std::string& app_metadata) {
194       app_metadata_ = app_metadata;
195     }
196 
197     Status AddFile(std::shared_ptr<FileInfo> file_info);
198 
199     Status Delete(bool delete_meta = true);
200 
Empty()201     bool Empty() {
202       return files_.empty();
203     }
204 
GetFile(const std::string & filename) const205     std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
206       auto it = file_infos_->find(filename);
207       if (it == file_infos_->end())
208         return nullptr;
209       return it->second;
210     }
211 
GetFiles()212     const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
213       return files_;
214     }
215 
216     // @param abs_path_to_size Pre-fetched file sizes (bytes).
217     Status LoadFromFile(
218         const std::string& backup_dir,
219         const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
220     Status StoreToFile(bool sync);
221 
GetInfoString()222     std::string GetInfoString() {
223       std::ostringstream ss;
224       ss << "Timestamp: " << timestamp_ << std::endl;
225       char human_size[16];
226       AppendHumanBytes(size_, human_size, sizeof(human_size));
227       ss << "Size: " << human_size << std::endl;
228       ss << "Files:" << std::endl;
229       for (const auto& file : files_) {
230         AppendHumanBytes(file->size, human_size, sizeof(human_size));
231         ss << file->filename << ", size " << human_size << ", refs "
232            << file->refs << std::endl;
233       }
234       return ss.str();
235     }
236 
237    private:
238     int64_t timestamp_;
239     // sequence number is only approximate, should not be used
240     // by clients
241     uint64_t sequence_number_;
242     uint64_t size_;
243     std::string app_metadata_;
244     std::string const meta_filename_;
245     std::string const meta_tmp_filename_;
246     // files with relative paths (without "/" prefix!!)
247     std::vector<std::shared_ptr<FileInfo>> files_;
248     std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
249     Env* env_;
250 
251     static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024;  // 10MB
252   };  // BackupMeta
253 
GetAbsolutePath(const std::string & relative_path="") const254   inline std::string GetAbsolutePath(
255       const std::string &relative_path = "") const {
256     assert(relative_path.size() == 0 || relative_path[0] != '/');
257     return options_.backup_dir + "/" + relative_path;
258   }
GetPrivateDirRel() const259   inline std::string GetPrivateDirRel() const {
260     return "private";
261   }
GetSharedChecksumDirRel() const262   inline std::string GetSharedChecksumDirRel() const {
263     return "shared_checksum";
264   }
GetPrivateFileRel(BackupID backup_id,bool tmp=false,const std::string & file="") const265   inline std::string GetPrivateFileRel(BackupID backup_id,
266                                        bool tmp = false,
267                                        const std::string& file = "") const {
268     assert(file.size() == 0 || file[0] != '/');
269     return GetPrivateDirRel() + "/" + ROCKSDB_NAMESPACE::ToString(backup_id) +
270            (tmp ? ".tmp" : "") + "/" + file;
271   }
GetSharedFileRel(const std::string & file="",bool tmp=false) const272   inline std::string GetSharedFileRel(const std::string& file = "",
273                                       bool tmp = false) const {
274     assert(file.size() == 0 || file[0] != '/');
275     return std::string("shared/") + (tmp ? "." : "") + file +
276            (tmp ? ".tmp" : "");
277   }
GetSharedFileWithChecksumRel(const std::string & file="",bool tmp=false) const278   inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
279                                                   bool tmp = false) const {
280     assert(file.size() == 0 || file[0] != '/');
281     return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file +
282            (tmp ? ".tmp" : "");
283   }
GetSharedFileWithChecksum(const std::string & file,const uint32_t checksum_value,const uint64_t file_size) const284   inline std::string GetSharedFileWithChecksum(const std::string& file,
285                                                const uint32_t checksum_value,
286                                                const uint64_t file_size) const {
287     assert(file.size() == 0 || file[0] != '/');
288     std::string file_copy = file;
289     return file_copy.insert(file_copy.find_last_of('.'),
290                             "_" + ROCKSDB_NAMESPACE::ToString(checksum_value) +
291                                 "_" + ROCKSDB_NAMESPACE::ToString(file_size));
292   }
GetFileFromChecksumFile(const std::string & file) const293   inline std::string GetFileFromChecksumFile(const std::string& file) const {
294     assert(file.size() == 0 || file[0] != '/');
295     std::string file_copy = file;
296     size_t first_underscore = file_copy.find_first_of('_');
297     return file_copy.erase(first_underscore,
298                            file_copy.find_last_of('.') - first_underscore);
299   }
GetBackupMetaDir() const300   inline std::string GetBackupMetaDir() const {
301     return GetAbsolutePath("meta");
302   }
GetBackupMetaFile(BackupID backup_id,bool tmp) const303   inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
304     return GetBackupMetaDir() + "/" + (tmp ? "." : "") +
305            ROCKSDB_NAMESPACE::ToString(backup_id) + (tmp ? ".tmp" : "");
306   }
307 
308   // If size_limit == 0, there is no size limit, copy everything.
309   //
310   // Exactly one of src and contents must be non-empty.
311   //
312   // @param src If non-empty, the file is copied from this pathname.
313   // @param contents If non-empty, the file will be created with these contents.
314   Status CopyOrCreateFile(const std::string& src, const std::string& dst,
315                           const std::string& contents, Env* src_env,
316                           Env* dst_env, const EnvOptions& src_env_options,
317                           bool sync, RateLimiter* rate_limiter,
318                           uint64_t* size = nullptr,
319                           uint32_t* checksum_value = nullptr,
320                           uint64_t size_limit = 0,
__anon2d133cc30102() 321                           std::function<void()> progress_callback = []() {});
322 
323   Status CalculateChecksum(const std::string& src, Env* src_env,
324                            const EnvOptions& src_env_options,
325                            uint64_t size_limit, uint32_t* checksum_value);
326 
327   struct CopyOrCreateResult {
328     uint64_t size;
329     uint32_t checksum_value;
330     Status status;
331   };
332 
333   // Exactly one of src_path and contents must be non-empty. If src_path is
334   // non-empty, the file is copied from this pathname. Otherwise, if contents is
335   // non-empty, the file will be created at dst_path with these contents.
336   struct CopyOrCreateWorkItem {
337     std::string src_path;
338     std::string dst_path;
339     std::string contents;
340     Env* src_env;
341     Env* dst_env;
342     EnvOptions src_env_options;
343     bool sync;
344     RateLimiter* rate_limiter;
345     uint64_t size_limit;
346     std::promise<CopyOrCreateResult> result;
347     std::function<void()> progress_callback;
348 
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem349     CopyOrCreateWorkItem()
350         : src_path(""),
351           dst_path(""),
352           contents(""),
353           src_env(nullptr),
354           dst_env(nullptr),
355           src_env_options(),
356           sync(false),
357           rate_limiter(nullptr),
358           size_limit(0) {}
359 
360     CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
361     CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
362 
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem363     CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
364       *this = std::move(o);
365     }
366 
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem367     CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
368       src_path = std::move(o.src_path);
369       dst_path = std::move(o.dst_path);
370       contents = std::move(o.contents);
371       src_env = o.src_env;
372       dst_env = o.dst_env;
373       src_env_options = std::move(o.src_env_options);
374       sync = o.sync;
375       rate_limiter = o.rate_limiter;
376       size_limit = o.size_limit;
377       result = std::move(o.result);
378       progress_callback = std::move(o.progress_callback);
379       return *this;
380     }
381 
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem382     CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
383                          std::string _contents, Env* _src_env, Env* _dst_env,
384                          EnvOptions _src_env_options, bool _sync,
385                          RateLimiter* _rate_limiter, uint64_t _size_limit,
386                          std::function<void()> _progress_callback = []() {})
387         : src_path(std::move(_src_path)),
388           dst_path(std::move(_dst_path)),
389           contents(std::move(_contents)),
390           src_env(_src_env),
391           dst_env(_dst_env),
392           src_env_options(std::move(_src_env_options)),
393           sync(_sync),
394           rate_limiter(_rate_limiter),
395           size_limit(_size_limit),
396           progress_callback(_progress_callback) {}
397   };
398 
399   struct BackupAfterCopyOrCreateWorkItem {
400     std::future<CopyOrCreateResult> result;
401     bool shared;
402     bool needed_to_copy;
403     Env* backup_env;
404     std::string dst_path_tmp;
405     std::string dst_path;
406     std::string dst_relative;
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem407     BackupAfterCopyOrCreateWorkItem()
408       : shared(false),
409         needed_to_copy(false),
410         backup_env(nullptr),
411         dst_path_tmp(""),
412         dst_path(""),
413         dst_relative("") {}
414 
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem415     BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
416         ROCKSDB_NOEXCEPT {
417       *this = std::move(o);
418     }
419 
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem420     BackupAfterCopyOrCreateWorkItem& operator=(
421         BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
422       result = std::move(o.result);
423       shared = o.shared;
424       needed_to_copy = o.needed_to_copy;
425       backup_env = o.backup_env;
426       dst_path_tmp = std::move(o.dst_path_tmp);
427       dst_path = std::move(o.dst_path);
428       dst_relative = std::move(o.dst_relative);
429       return *this;
430     }
431 
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem432     BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
433                                     bool _shared, bool _needed_to_copy,
434                                     Env* _backup_env, std::string _dst_path_tmp,
435                                     std::string _dst_path,
436                                     std::string _dst_relative)
437         : result(std::move(_result)),
438           shared(_shared),
439           needed_to_copy(_needed_to_copy),
440           backup_env(_backup_env),
441           dst_path_tmp(std::move(_dst_path_tmp)),
442           dst_path(std::move(_dst_path)),
443           dst_relative(std::move(_dst_relative)) {}
444   };
445 
446   struct RestoreAfterCopyOrCreateWorkItem {
447     std::future<CopyOrCreateResult> result;
448     uint32_t checksum_value;
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem449     RestoreAfterCopyOrCreateWorkItem()
450       : checksum_value(0) {}
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem451     RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
452                                      uint32_t _checksum_value)
453         : result(std::move(_result)), checksum_value(_checksum_value) {}
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem454     RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
455         ROCKSDB_NOEXCEPT {
456       *this = std::move(o);
457     }
458 
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem459     RestoreAfterCopyOrCreateWorkItem& operator=(
460         RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
461       result = std::move(o.result);
462       checksum_value = o.checksum_value;
463       return *this;
464     }
465   };
466 
467   bool initialized_;
468   std::mutex byte_report_mutex_;
469   channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
470   std::vector<port::Thread> threads_;
471   std::atomic<CpuPriority> threads_cpu_priority_;
472   // Certain operations like PurgeOldBackups and DeleteBackup will trigger
473   // automatic GarbageCollect (true) unless we've already done one in this
474   // session and have not failed to delete backup files since then (false).
475   bool might_need_garbage_collect_ = true;
476 
477   // Adds a file to the backup work queue to be copied or created if it doesn't
478   // already exist.
479   //
480   // Exactly one of src_dir and contents must be non-empty.
481   //
482   // @param src_dir If non-empty, the file in this directory named fname will be
483   //    copied.
484   // @param fname Name of destination file and, in case of copy, source file.
485   // @param contents If non-empty, the file will be created with these contents.
486   Status AddBackupFileWorkItem(
487       std::unordered_set<std::string>& live_dst_paths,
488       std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
489       BackupID backup_id, bool shared, const std::string& src_dir,
490       const std::string& fname,  // starts with "/"
491       const EnvOptions& src_env_options, RateLimiter* rate_limiter,
492       uint64_t size_bytes, uint64_t size_limit = 0,
493       bool shared_checksum = false,
__anon2d133cc30302() 494       std::function<void()> progress_callback = []() {},
495       const std::string& contents = std::string());
496 
497   // backup state data
498   BackupID latest_backup_id_;
499   BackupID latest_valid_backup_id_;
500   std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
501   std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
502       corrupt_backups_;
503   std::unordered_map<std::string,
504                      std::shared_ptr<FileInfo>> backuped_file_infos_;
505   std::atomic<bool> stop_backup_;
506 
507   // options data
508   BackupableDBOptions options_;
509   Env* db_env_;
510   Env* backup_env_;
511 
512   // directories
513   std::unique_ptr<Directory> backup_directory_;
514   std::unique_ptr<Directory> shared_directory_;
515   std::unique_ptr<Directory> meta_directory_;
516   std::unique_ptr<Directory> private_directory_;
517 
518   static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
519   size_t copy_file_buffer_size_;
520   bool read_only_;
521   BackupStatistics backup_statistics_;
522   static const size_t kMaxAppMetaSize = 1024 * 1024;  // 1MB
523 };
524 
Open(const BackupableDBOptions & options,Env * env,BackupEngine ** backup_engine_ptr)525 Status BackupEngine::Open(const BackupableDBOptions& options, Env* env,
526                           BackupEngine** backup_engine_ptr) {
527   std::unique_ptr<BackupEngineImpl> backup_engine(
528       new BackupEngineImpl(options, env));
529   auto s = backup_engine->Initialize();
530   if (!s.ok()) {
531     *backup_engine_ptr = nullptr;
532     return s;
533   }
534   *backup_engine_ptr = backup_engine.release();
535   return Status::OK();
536 }
537 
BackupEngineImpl(const BackupableDBOptions & options,Env * db_env,bool read_only)538 BackupEngineImpl::BackupEngineImpl(const BackupableDBOptions& options,
539                                    Env* db_env, bool read_only)
540     : initialized_(false),
541       latest_backup_id_(0),
542       latest_valid_backup_id_(0),
543       stop_backup_(false),
544       options_(options),
545       db_env_(db_env),
546       backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
547       copy_file_buffer_size_(kDefaultCopyFileBufferSize),
548       read_only_(read_only) {
549   if (options_.backup_rate_limiter == nullptr &&
550       options_.backup_rate_limit > 0) {
551     options_.backup_rate_limiter.reset(
552         NewGenericRateLimiter(options_.backup_rate_limit));
553   }
554   if (options_.restore_rate_limiter == nullptr &&
555       options_.restore_rate_limit > 0) {
556     options_.restore_rate_limiter.reset(
557         NewGenericRateLimiter(options_.restore_rate_limit));
558   }
559 }
560 
~BackupEngineImpl()561 BackupEngineImpl::~BackupEngineImpl() {
562   files_to_copy_or_create_.sendEof();
563   for (auto& t : threads_) {
564     t.join();
565   }
566   LogFlush(options_.info_log);
567 }
568 
Initialize()569 Status BackupEngineImpl::Initialize() {
570   assert(!initialized_);
571   initialized_ = true;
572   if (read_only_) {
573     ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
574   }
575   options_.Dump(options_.info_log);
576 
577   if (!read_only_) {
578     // we might need to clean up from previous crash or I/O errors
579     might_need_garbage_collect_ = true;
580 
581     if (options_.max_valid_backups_to_open != port::kMaxInt32) {
582       options_.max_valid_backups_to_open = port::kMaxInt32;
583       ROCKS_LOG_WARN(
584           options_.info_log,
585           "`max_valid_backups_to_open` is not set to the default value. Ignoring "
586           "its value since BackupEngine is not read-only.");
587     }
588 
589     // gather the list of directories that we need to create
590     std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
591         directories;
592     directories.emplace_back(GetAbsolutePath(), &backup_directory_);
593     if (options_.share_table_files) {
594       if (options_.share_files_with_checksum) {
595         directories.emplace_back(
596             GetAbsolutePath(GetSharedFileWithChecksumRel()),
597             &shared_directory_);
598       } else {
599         directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
600                                  &shared_directory_);
601       }
602     }
603     directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
604                              &private_directory_);
605     directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
606     // create all the dirs we need
607     for (const auto& d : directories) {
608       auto s = backup_env_->CreateDirIfMissing(d.first);
609       if (s.ok()) {
610         s = backup_env_->NewDirectory(d.first, d.second);
611       }
612       if (!s.ok()) {
613         return s;
614       }
615     }
616   }
617 
618   std::vector<std::string> backup_meta_files;
619   {
620     auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
621     if (s.IsNotFound()) {
622       return Status::NotFound(GetBackupMetaDir() + " is missing");
623     } else if (!s.ok()) {
624       return s;
625     }
626   }
627   // create backups_ structure
628   for (auto& file : backup_meta_files) {
629     if (file == "." || file == "..") {
630       continue;
631     }
632     ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
633     BackupID backup_id = 0;
634     sscanf(file.c_str(), "%u", &backup_id);
635     if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
636       if (!read_only_) {
637         // invalid file name, delete that
638         auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
639         ROCKS_LOG_INFO(options_.info_log,
640                        "Unrecognized meta file %s, deleting -- %s",
641                        file.c_str(), s.ToString().c_str());
642       }
643       continue;
644     }
645     assert(backups_.find(backup_id) == backups_.end());
646     backups_.insert(std::make_pair(
647         backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
648                        GetBackupMetaFile(backup_id, false /* tmp */),
649                        GetBackupMetaFile(backup_id, true /* tmp */),
650                        &backuped_file_infos_, backup_env_))));
651   }
652 
653   latest_backup_id_ = 0;
654   latest_valid_backup_id_ = 0;
655   if (options_.destroy_old_data) {  // Destroy old data
656     assert(!read_only_);
657     ROCKS_LOG_INFO(
658         options_.info_log,
659         "Backup Engine started with destroy_old_data == true, deleting all "
660         "backups");
661     auto s = PurgeOldBackups(0);
662     if (s.ok()) {
663       s = GarbageCollect();
664     }
665     if (!s.ok()) {
666       return s;
667     }
668   } else {  // Load data from storage
669     std::unordered_map<std::string, uint64_t> abs_path_to_size;
670     for (const auto& rel_dir :
671          {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
672       const auto abs_dir = GetAbsolutePath(rel_dir);
673       InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
674     }
675     // load the backups if any, until valid_backups_to_open of the latest
676     // non-corrupted backups have been successfully opened.
677     int valid_backups_to_open = options_.max_valid_backups_to_open;
678     for (auto backup_iter = backups_.rbegin();
679          backup_iter != backups_.rend();
680          ++backup_iter) {
681       assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
682       if (latest_backup_id_ == 0) {
683         latest_backup_id_ = backup_iter->first;
684       }
685       if (valid_backups_to_open == 0) {
686         break;
687       }
688 
689       InsertPathnameToSizeBytes(
690           GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
691           &abs_path_to_size);
692       Status s = backup_iter->second->LoadFromFile(options_.backup_dir,
693                                                    abs_path_to_size);
694       if (s.IsCorruption()) {
695         ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
696                        backup_iter->first, s.ToString().c_str());
697         corrupt_backups_.insert(
698             std::make_pair(backup_iter->first,
699                            std::make_pair(s, std::move(backup_iter->second))));
700       } else if (!s.ok()) {
701         // Distinguish corruption errors from errors in the backup Env.
702         // Errors in the backup Env (i.e., this code path) will cause Open() to
703         // fail, whereas corruption errors would not cause Open() failures.
704         return s;
705       } else {
706         ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
707                        backup_iter->first,
708                        backup_iter->second->GetInfoString().c_str());
709         assert(latest_valid_backup_id_ == 0 ||
710                latest_valid_backup_id_ > backup_iter->first);
711         if (latest_valid_backup_id_ == 0) {
712           latest_valid_backup_id_ = backup_iter->first;
713         }
714         --valid_backups_to_open;
715       }
716     }
717 
718     for (const auto& corrupt : corrupt_backups_) {
719       backups_.erase(backups_.find(corrupt.first));
720     }
721     // erase the backups before max_valid_backups_to_open
722     int num_unopened_backups;
723     if (options_.max_valid_backups_to_open == 0) {
724       num_unopened_backups = 0;
725     } else {
726       num_unopened_backups =
727           std::max(0, static_cast<int>(backups_.size()) -
728                           options_.max_valid_backups_to_open);
729     }
730     for (int i = 0; i < num_unopened_backups; ++i) {
731       assert(backups_.begin()->second->Empty());
732       backups_.erase(backups_.begin());
733     }
734   }
735 
736   ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
737   ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
738                  latest_valid_backup_id_);
739 
740   // set up threads perform copies from files_to_copy_or_create_ in the
741   // background
742   threads_cpu_priority_ = CpuPriority::kNormal;
743   threads_.reserve(options_.max_background_operations);
744   for (int t = 0; t < options_.max_background_operations; t++) {
745     threads_.emplace_back([this]() {
746 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
747 #if __GLIBC_PREREQ(2, 12)
748       pthread_setname_np(pthread_self(), "backup_engine");
749 #endif
750 #endif
751       CpuPriority current_priority = CpuPriority::kNormal;
752       CopyOrCreateWorkItem work_item;
753       while (files_to_copy_or_create_.read(work_item)) {
754         CpuPriority priority = threads_cpu_priority_;
755         if (current_priority != priority) {
756           TEST_SYNC_POINT_CALLBACK(
757               "BackupEngineImpl::Initialize:SetCpuPriority", &priority);
758           port::SetCpuPriority(0, priority);
759           current_priority = priority;
760         }
761         CopyOrCreateResult result;
762         result.status = CopyOrCreateFile(
763             work_item.src_path, work_item.dst_path, work_item.contents,
764             work_item.src_env, work_item.dst_env, work_item.src_env_options,
765             work_item.sync, work_item.rate_limiter, &result.size,
766             &result.checksum_value, work_item.size_limit,
767             work_item.progress_callback);
768         work_item.result.set_value(std::move(result));
769       }
770     });
771   }
772   ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
773 
774   return Status::OK();
775 }
776 
CreateNewBackupWithMetadata(const CreateBackupOptions & options,DB * db,const std::string & app_metadata)777 Status BackupEngineImpl::CreateNewBackupWithMetadata(
778     const CreateBackupOptions& options, DB* db,
779     const std::string& app_metadata) {
780   assert(initialized_);
781   assert(!read_only_);
782   if (app_metadata.size() > kMaxAppMetaSize) {
783     return Status::InvalidArgument("App metadata too large");
784   }
785 
786   if (options.decrease_background_thread_cpu_priority) {
787     if (options.background_thread_cpu_priority < threads_cpu_priority_) {
788       threads_cpu_priority_.store(options.background_thread_cpu_priority);
789     }
790   }
791 
792   BackupID new_backup_id = latest_backup_id_ + 1;
793 
794   assert(backups_.find(new_backup_id) == backups_.end());
795 
796   auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
797   Status s = backup_env_->FileExists(private_dir);
798   if (s.ok()) {
799     // maybe last backup failed and left partial state behind, clean it up.
800     // need to do this before updating backups_ such that a private dir
801     // named after new_backup_id will be cleaned up.
802     // (If an incomplete new backup is followed by an incomplete delete
803     // of the latest full backup, then there could be more than one next
804     // id with a private dir, the last thing to be deleted in delete
805     // backup, but all will be cleaned up with a GarbageCollect.)
806     s = GarbageCollect();
807   } else if (s.IsNotFound()) {
808     // normal case, the new backup's private dir doesn't exist yet
809     s = Status::OK();
810   }
811 
812   auto ret = backups_.insert(std::make_pair(
813       new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
814                          GetBackupMetaFile(new_backup_id, false /* tmp */),
815                          GetBackupMetaFile(new_backup_id, true /* tmp */),
816                          &backuped_file_infos_, backup_env_))));
817   assert(ret.second == true);
818   auto& new_backup = ret.first->second;
819   new_backup->RecordTimestamp();
820   new_backup->SetAppMetadata(app_metadata);
821 
822   auto start_backup = backup_env_->NowMicros();
823 
824   ROCKS_LOG_INFO(options_.info_log,
825                  "Started the backup process -- creating backup %u",
826                  new_backup_id);
827   if (s.ok()) {
828     s = backup_env_->CreateDir(private_dir);
829   }
830 
831   RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
832   if (rate_limiter) {
833     copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
834   }
835 
836   // A set into which we will insert the dst_paths that are calculated for live
837   // files and live WAL files.
838   // This is used to check whether a live files shares a dst_path with another
839   // live file.
840   std::unordered_set<std::string> live_dst_paths;
841 
842   std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
843   // Add a CopyOrCreateWorkItem to the channel for each live file
844   db->DisableFileDeletions();
845   if (s.ok()) {
846     CheckpointImpl checkpoint(db);
847     uint64_t sequence_number = 0;
848     DBOptions db_options = db->GetDBOptions();
849     EnvOptions src_raw_env_options(db_options);
850     s = checkpoint.CreateCustomCheckpoint(
851         db_options,
852         [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
853             FileType) {
854           // custom checkpoint will switch to calling copy_file_cb after it sees
855           // NotSupported returned from link_file_cb.
856           return Status::NotSupported();
857         } /* link_file_cb */,
858         [&](const std::string& src_dirname, const std::string& fname,
859             uint64_t size_limit_bytes, FileType type) {
860           if (type == kLogFile && !options_.backup_log_files) {
861             return Status::OK();
862           }
863           Log(options_.info_log, "add file for backup %s", fname.c_str());
864           uint64_t size_bytes = 0;
865           Status st;
866           if (type == kTableFile) {
867             st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
868           }
869           EnvOptions src_env_options;
870           switch (type) {
871             case kLogFile:
872               src_env_options =
873                   db_env_->OptimizeForLogRead(src_raw_env_options);
874               break;
875             case kTableFile:
876               src_env_options = db_env_->OptimizeForCompactionTableRead(
877                   src_raw_env_options, ImmutableDBOptions(db_options));
878               break;
879             case kDescriptorFile:
880               src_env_options =
881                   db_env_->OptimizeForManifestRead(src_raw_env_options);
882               break;
883             default:
884               // Other backed up files (like options file) are not read by live
885               // DB, so don't need to worry about avoiding mixing buffered and
886               // direct I/O. Just use plain defaults.
887               src_env_options = src_raw_env_options;
888               break;
889           }
890           if (st.ok()) {
891             st = AddBackupFileWorkItem(
892                 live_dst_paths, backup_items_to_finish, new_backup_id,
893                 options_.share_table_files && type == kTableFile, src_dirname,
894                 fname, src_env_options, rate_limiter, size_bytes,
895                 size_limit_bytes,
896                 options_.share_files_with_checksum && type == kTableFile,
897                 options.progress_callback);
898           }
899           return st;
900         } /* copy_file_cb */,
901         [&](const std::string& fname, const std::string& contents, FileType) {
902           Log(options_.info_log, "add file for backup %s", fname.c_str());
903           return AddBackupFileWorkItem(
904               live_dst_paths, backup_items_to_finish, new_backup_id,
905               false /* shared */, "" /* src_dir */, fname,
906               EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
907               0 /* size_limit */, false /* shared_checksum */,
908               options.progress_callback, contents);
909         } /* create_file_cb */,
910         &sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64);
911     if (s.ok()) {
912       new_backup->SetSequenceNumber(sequence_number);
913     }
914   }
915   ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
916   Status item_status;
917   for (auto& item : backup_items_to_finish) {
918     item.result.wait();
919     auto result = item.result.get();
920     item_status = result.status;
921     if (item_status.ok() && item.shared && item.needed_to_copy) {
922       item_status = item.backup_env->RenameFile(item.dst_path_tmp,
923                                                 item.dst_path);
924     }
925     if (item_status.ok()) {
926       item_status = new_backup.get()->AddFile(
927               std::make_shared<FileInfo>(item.dst_relative,
928                                          result.size,
929                                          result.checksum_value));
930     }
931     if (!item_status.ok()) {
932       s = item_status;
933     }
934   }
935 
936   // we copied all the files, enable file deletions
937   db->EnableFileDeletions(false);
938 
939   auto backup_time = backup_env_->NowMicros() - start_backup;
940 
941   if (s.ok()) {
942     // persist the backup metadata on the disk
943     s = new_backup->StoreToFile(options_.sync);
944   }
945   if (s.ok() && options_.sync) {
946     std::unique_ptr<Directory> backup_private_directory;
947     backup_env_->NewDirectory(
948         GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
949         &backup_private_directory);
950     if (backup_private_directory != nullptr) {
951       s = backup_private_directory->Fsync();
952     }
953     if (s.ok() && private_directory_ != nullptr) {
954       s = private_directory_->Fsync();
955     }
956     if (s.ok() && meta_directory_ != nullptr) {
957       s = meta_directory_->Fsync();
958     }
959     if (s.ok() && shared_directory_ != nullptr) {
960       s = shared_directory_->Fsync();
961     }
962     if (s.ok() && backup_directory_ != nullptr) {
963       s = backup_directory_->Fsync();
964     }
965   }
966 
967   if (s.ok()) {
968     backup_statistics_.IncrementNumberSuccessBackup();
969   }
970   if (!s.ok()) {
971     backup_statistics_.IncrementNumberFailBackup();
972     // clean all the files we might have created
973     ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
974                    s.ToString().c_str());
975     ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
976                    backup_statistics_.ToString().c_str());
977     // delete files that we might have already written
978     might_need_garbage_collect_ = true;
979     DeleteBackup(new_backup_id);
980     return s;
981   }
982 
983   // here we know that we succeeded and installed the new backup
984   // in the LATEST_BACKUP file
985   latest_backup_id_ = new_backup_id;
986   latest_valid_backup_id_ = new_backup_id;
987   ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
988 
989   // backup_speed is in byte/second
990   double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
991   ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
992                  new_backup->GetNumberFiles());
993   char human_size[16];
994   AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
995   ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
996   ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
997                  backup_time);
998   ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
999   ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
1000                  backup_statistics_.ToString().c_str());
1001   return s;
1002 }
1003 
PurgeOldBackups(uint32_t num_backups_to_keep)1004 Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
1005   assert(initialized_);
1006   assert(!read_only_);
1007 
1008   // Best effort deletion even with errors
1009   Status overall_status = Status::OK();
1010 
1011   ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
1012                  num_backups_to_keep);
1013   std::vector<BackupID> to_delete;
1014   auto itr = backups_.begin();
1015   while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
1016     to_delete.push_back(itr->first);
1017     itr++;
1018   }
1019   for (auto backup_id : to_delete) {
1020     auto s = DeleteBackupInternal(backup_id);
1021     if (!s.ok()) {
1022       overall_status = s;
1023     }
1024   }
1025   // Clean up after any incomplete backup deletion, potentially from
1026   // earlier session.
1027   if (might_need_garbage_collect_) {
1028     auto s = GarbageCollect();
1029     if (!s.ok() && overall_status.ok()) {
1030       overall_status = s;
1031     }
1032   }
1033   return overall_status;
1034 }
1035 
DeleteBackup(BackupID backup_id)1036 Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
1037   auto s1 = DeleteBackupInternal(backup_id);
1038   auto s2 = Status::OK();
1039 
1040   // Clean up after any incomplete backup deletion, potentially from
1041   // earlier session.
1042   if (might_need_garbage_collect_) {
1043     s2 = GarbageCollect();
1044   }
1045 
1046   if (!s1.ok()) {
1047     return s1;
1048   } else {
1049     return s2;
1050   }
1051 }
1052 
1053 // Does not auto-GarbageCollect
DeleteBackupInternal(BackupID backup_id)1054 Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
1055   assert(initialized_);
1056   assert(!read_only_);
1057 
1058   ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
1059   auto backup = backups_.find(backup_id);
1060   if (backup != backups_.end()) {
1061     auto s = backup->second->Delete();
1062     if (!s.ok()) {
1063       return s;
1064     }
1065     backups_.erase(backup);
1066   } else {
1067     auto corrupt = corrupt_backups_.find(backup_id);
1068     if (corrupt == corrupt_backups_.end()) {
1069       return Status::NotFound("Backup not found");
1070     }
1071     auto s = corrupt->second.second->Delete();
1072     if (!s.ok()) {
1073       return s;
1074     }
1075     corrupt_backups_.erase(corrupt);
1076   }
1077 
1078   // After removing meta file, best effort deletion even with errors.
1079   // (Don't delete other files if we can't delete the meta file right
1080   // now.)
1081   std::vector<std::string> to_delete;
1082   for (auto& itr : backuped_file_infos_) {
1083     if (itr.second->refs == 0) {
1084       Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
1085       ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
1086                      s.ToString().c_str());
1087       to_delete.push_back(itr.first);
1088       if (!s.ok()) {
1089         // Trying again later might work
1090         might_need_garbage_collect_ = true;
1091       }
1092     }
1093   }
1094   for (auto& td : to_delete) {
1095     backuped_file_infos_.erase(td);
1096   }
1097 
1098   // take care of private dirs -- GarbageCollect() will take care of them
1099   // if they are not empty
1100   std::string private_dir = GetPrivateFileRel(backup_id);
1101   Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
1102   ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
1103                  private_dir.c_str(), s.ToString().c_str());
1104   if (!s.ok()) {
1105     // Full gc or trying again later might work
1106     might_need_garbage_collect_ = true;
1107   }
1108   return Status::OK();
1109 }
1110 
GetBackupInfo(std::vector<BackupInfo> * backup_info)1111 void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1112   assert(initialized_);
1113   backup_info->reserve(backups_.size());
1114   for (auto& backup : backups_) {
1115     if (!backup.second->Empty()) {
1116       backup_info->push_back(BackupInfo(
1117           backup.first, backup.second->GetTimestamp(), backup.second->GetSize(),
1118           backup.second->GetNumberFiles(), backup.second->GetAppMetadata()));
1119     }
1120   }
1121 }
1122 
1123 void
GetCorruptedBackups(std::vector<BackupID> * corrupt_backup_ids)1124 BackupEngineImpl::GetCorruptedBackups(
1125     std::vector<BackupID>* corrupt_backup_ids) {
1126   assert(initialized_);
1127   corrupt_backup_ids->reserve(corrupt_backups_.size());
1128   for (auto& backup : corrupt_backups_) {
1129     corrupt_backup_ids->push_back(backup.first);
1130   }
1131 }
1132 
RestoreDBFromBackup(const RestoreOptions & options,BackupID backup_id,const std::string & db_dir,const std::string & wal_dir)1133 Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
1134                                              BackupID backup_id,
1135                                              const std::string& db_dir,
1136                                              const std::string& wal_dir) {
1137   assert(initialized_);
1138   auto corrupt_itr = corrupt_backups_.find(backup_id);
1139   if (corrupt_itr != corrupt_backups_.end()) {
1140     return corrupt_itr->second.first;
1141   }
1142   auto backup_itr = backups_.find(backup_id);
1143   if (backup_itr == backups_.end()) {
1144     return Status::NotFound("Backup not found");
1145   }
1146   auto& backup = backup_itr->second;
1147   if (backup->Empty()) {
1148     return Status::NotFound("Backup not found");
1149   }
1150 
1151   ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
1152   ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
1153                  static_cast<int>(options.keep_log_files));
1154 
1155   // just in case. Ignore errors
1156   db_env_->CreateDirIfMissing(db_dir);
1157   db_env_->CreateDirIfMissing(wal_dir);
1158 
1159   if (options.keep_log_files) {
1160     // delete files in db_dir, but keep all the log files
1161     DeleteChildren(db_dir, 1 << kLogFile);
1162     // move all the files from archive dir to wal_dir
1163     std::string archive_dir = ArchivalDirectory(wal_dir);
1164     std::vector<std::string> archive_files;
1165     db_env_->GetChildren(archive_dir, &archive_files);  // ignore errors
1166     for (const auto& f : archive_files) {
1167       uint64_t number;
1168       FileType type;
1169       bool ok = ParseFileName(f, &number, &type);
1170       if (ok && type == kLogFile) {
1171         ROCKS_LOG_INFO(options_.info_log,
1172                        "Moving log file from archive/ to wal_dir: %s",
1173                        f.c_str());
1174         Status s =
1175             db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
1176         if (!s.ok()) {
1177           // if we can't move log file from archive_dir to wal_dir,
1178           // we should fail, since it might mean data loss
1179           return s;
1180         }
1181       }
1182     }
1183   } else {
1184     DeleteChildren(wal_dir);
1185     DeleteChildren(ArchivalDirectory(wal_dir));
1186     DeleteChildren(db_dir);
1187   }
1188 
1189   RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
1190   if (rate_limiter) {
1191     copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
1192   }
1193   Status s;
1194   std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
1195   for (const auto& file_info : backup->GetFiles()) {
1196     const std::string &file = file_info->filename;
1197     std::string dst;
1198     // 1. extract the filename
1199     size_t slash = file.find_last_of('/');
1200     // file will either be shared/<file>, shared_checksum/<file_crc32_size>
1201     // or private/<number>/<file>
1202     assert(slash != std::string::npos);
1203     dst = file.substr(slash + 1);
1204 
1205     // if the file was in shared_checksum, extract the real file name
1206     // in this case the file is <number>_<checksum>_<size>.<type>
1207     if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
1208       dst = GetFileFromChecksumFile(dst);
1209     }
1210 
1211     // 2. find the filetype
1212     uint64_t number;
1213     FileType type;
1214     bool ok = ParseFileName(dst, &number, &type);
1215     if (!ok) {
1216       return Status::Corruption("Backup corrupted");
1217     }
1218     // 3. Construct the final path
1219     // kLogFile lives in wal_dir and all the rest live in db_dir
1220     dst = ((type == kLogFile) ? wal_dir : db_dir) +
1221       "/" + dst;
1222 
1223     ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
1224                    dst.c_str());
1225     CopyOrCreateWorkItem copy_or_create_work_item(
1226         GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
1227         EnvOptions() /* src_env_options */, false, rate_limiter,
1228         0 /* size_limit */);
1229     RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1230         copy_or_create_work_item.result.get_future(),
1231         file_info->checksum_value);
1232     files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1233     restore_items_to_finish.push_back(
1234         std::move(after_copy_or_create_work_item));
1235   }
1236   Status item_status;
1237   for (auto& item : restore_items_to_finish) {
1238     item.result.wait();
1239     auto result = item.result.get();
1240     item_status = result.status;
1241     // Note: It is possible that both of the following bad-status cases occur
1242     // during copying. But, we only return one status.
1243     if (!item_status.ok()) {
1244       s = item_status;
1245       break;
1246     } else if (item.checksum_value != result.checksum_value) {
1247       s = Status::Corruption("Checksum check failed");
1248       break;
1249     }
1250   }
1251 
1252   ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
1253                  s.ToString().c_str());
1254   return s;
1255 }
1256 
VerifyBackup(BackupID backup_id)1257 Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
1258   assert(initialized_);
1259   auto corrupt_itr = corrupt_backups_.find(backup_id);
1260   if (corrupt_itr != corrupt_backups_.end()) {
1261     return corrupt_itr->second.first;
1262   }
1263 
1264   auto backup_itr = backups_.find(backup_id);
1265   if (backup_itr == backups_.end()) {
1266     return Status::NotFound();
1267   }
1268 
1269   auto& backup = backup_itr->second;
1270   if (backup->Empty()) {
1271     return Status::NotFound();
1272   }
1273 
1274   ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
1275 
1276   std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
1277   for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
1278                               GetSharedFileWithChecksumRel()}) {
1279     const auto abs_dir = GetAbsolutePath(rel_dir);
1280     InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size);
1281   }
1282 
1283   for (const auto& file_info : backup->GetFiles()) {
1284     const auto abs_path = GetAbsolutePath(file_info->filename);
1285     if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
1286       return Status::NotFound("File missing: " + abs_path);
1287     }
1288     if (file_info->size != curr_abs_path_to_size[abs_path]) {
1289       return Status::Corruption("File corrupted: " + abs_path);
1290     }
1291   }
1292   return Status::OK();
1293 }
1294 
CopyOrCreateFile(const std::string & src,const std::string & dst,const std::string & contents,Env * src_env,Env * dst_env,const EnvOptions & src_env_options,bool sync,RateLimiter * rate_limiter,uint64_t * size,uint32_t * checksum_value,uint64_t size_limit,std::function<void ()> progress_callback)1295 Status BackupEngineImpl::CopyOrCreateFile(
1296     const std::string& src, const std::string& dst, const std::string& contents,
1297     Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
1298     RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value,
1299     uint64_t size_limit, std::function<void()> progress_callback) {
1300   assert(src.empty() != contents.empty());
1301   Status s;
1302   std::unique_ptr<WritableFile> dst_file;
1303   std::unique_ptr<SequentialFile> src_file;
1304   EnvOptions dst_env_options;
1305   dst_env_options.use_mmap_writes = false;
1306   // TODO:(gzh) maybe use direct reads/writes here if possible
1307   if (size != nullptr) {
1308     *size = 0;
1309   }
1310   if (checksum_value != nullptr) {
1311     *checksum_value = 0;
1312   }
1313 
1314   // Check if size limit is set. if not, set it to very big number
1315   if (size_limit == 0) {
1316     size_limit = std::numeric_limits<uint64_t>::max();
1317   }
1318 
1319   s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options);
1320   if (s.ok() && !src.empty()) {
1321     s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1322   }
1323   if (!s.ok()) {
1324     return s;
1325   }
1326 
1327   std::unique_ptr<WritableFileWriter> dest_writer(new WritableFileWriter(
1328       NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options));
1329   std::unique_ptr<SequentialFileReader> src_reader;
1330   std::unique_ptr<char[]> buf;
1331   if (!src.empty()) {
1332     src_reader.reset(new SequentialFileReader(
1333         NewLegacySequentialFileWrapper(src_file), src));
1334     buf.reset(new char[copy_file_buffer_size_]);
1335   }
1336 
1337   Slice data;
1338   uint64_t processed_buffer_size = 0;
1339   do {
1340     if (stop_backup_.load(std::memory_order_acquire)) {
1341       return Status::Incomplete("Backup stopped");
1342     }
1343     if (!src.empty()) {
1344       size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
1345                                   ? copy_file_buffer_size_
1346                                   : static_cast<size_t>(size_limit);
1347       s = src_reader->Read(buffer_to_read, &data, buf.get());
1348       processed_buffer_size += buffer_to_read;
1349     } else {
1350       data = contents;
1351     }
1352     size_limit -= data.size();
1353 
1354     if (!s.ok()) {
1355       return s;
1356     }
1357 
1358     if (size != nullptr) {
1359       *size += data.size();
1360     }
1361     if (checksum_value != nullptr) {
1362       *checksum_value =
1363           crc32c::Extend(*checksum_value, data.data(), data.size());
1364     }
1365     s = dest_writer->Append(data);
1366     if (rate_limiter != nullptr) {
1367       rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
1368                             RateLimiter::OpType::kWrite);
1369     }
1370     if (processed_buffer_size > options_.callback_trigger_interval_size) {
1371       processed_buffer_size -= options_.callback_trigger_interval_size;
1372       std::lock_guard<std::mutex> lock(byte_report_mutex_);
1373       progress_callback();
1374     }
1375   } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
1376 
1377   if (s.ok() && sync) {
1378     s = dest_writer->Sync(false);
1379   }
1380   if (s.ok()) {
1381     s = dest_writer->Close();
1382   }
1383   return s;
1384 }
1385 
1386 // fname will always start with "/"
AddBackupFileWorkItem(std::unordered_set<std::string> & live_dst_paths,std::vector<BackupAfterCopyOrCreateWorkItem> & backup_items_to_finish,BackupID backup_id,bool shared,const std::string & src_dir,const std::string & fname,const EnvOptions & src_env_options,RateLimiter * rate_limiter,uint64_t size_bytes,uint64_t size_limit,bool shared_checksum,std::function<void ()> progress_callback,const std::string & contents)1387 Status BackupEngineImpl::AddBackupFileWorkItem(
1388     std::unordered_set<std::string>& live_dst_paths,
1389     std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
1390     BackupID backup_id, bool shared, const std::string& src_dir,
1391     const std::string& fname, const EnvOptions& src_env_options,
1392     RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
1393     bool shared_checksum, std::function<void()> progress_callback,
1394     const std::string& contents) {
1395   assert(!fname.empty() && fname[0] == '/');
1396   assert(contents.empty() != src_dir.empty());
1397 
1398   std::string dst_relative = fname.substr(1);
1399   std::string dst_relative_tmp;
1400   Status s;
1401   uint32_t checksum_value = 0;
1402 
1403   if (shared && shared_checksum) {
1404     // add checksum and file length to the file name
1405     s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit,
1406                           &checksum_value);
1407     if (!s.ok()) {
1408       return s;
1409     }
1410     if (size_bytes == port::kMaxUint64) {
1411       return Status::NotFound("File missing: " + src_dir + fname);
1412     }
1413     dst_relative =
1414         GetSharedFileWithChecksum(dst_relative, checksum_value, size_bytes);
1415     dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
1416     dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
1417   } else if (shared) {
1418     dst_relative_tmp = GetSharedFileRel(dst_relative, true);
1419     dst_relative = GetSharedFileRel(dst_relative, false);
1420   } else {
1421     dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
1422   }
1423 
1424   // We copy into `temp_dest_path` and, once finished, rename it to
1425   // `final_dest_path`. This allows files to atomically appear at
1426   // `final_dest_path`. We can copy directly to the final path when atomicity
1427   // is unnecessary, like for files in private backup directories.
1428   const std::string* copy_dest_path;
1429   std::string temp_dest_path;
1430   std::string final_dest_path = GetAbsolutePath(dst_relative);
1431   if (!dst_relative_tmp.empty()) {
1432     temp_dest_path = GetAbsolutePath(dst_relative_tmp);
1433     copy_dest_path = &temp_dest_path;
1434   } else {
1435     copy_dest_path = &final_dest_path;
1436   }
1437 
1438   // if it's shared, we also need to check if it exists -- if it does, no need
1439   // to copy it again.
1440   bool need_to_copy = true;
1441   // true if final_dest_path is the same path as another live file
1442   const bool same_path =
1443       live_dst_paths.find(final_dest_path) != live_dst_paths.end();
1444 
1445   bool file_exists = false;
1446   if (shared && !same_path) {
1447     Status exist = backup_env_->FileExists(final_dest_path);
1448     if (exist.ok()) {
1449       file_exists = true;
1450     } else if (exist.IsNotFound()) {
1451       file_exists = false;
1452     } else {
1453       assert(s.IsIOError());
1454       return exist;
1455     }
1456   }
1457 
1458   if (!contents.empty()) {
1459     need_to_copy = false;
1460   } else if (shared && (same_path || file_exists)) {
1461     need_to_copy = false;
1462     if (shared_checksum) {
1463       ROCKS_LOG_INFO(options_.info_log,
1464                      "%s already present, with checksum %u and size %" PRIu64,
1465                      fname.c_str(), checksum_value, size_bytes);
1466     } else if (backuped_file_infos_.find(dst_relative) ==
1467                backuped_file_infos_.end() && !same_path) {
1468       // file already exists, but it's not referenced by any backup. overwrite
1469       // the file
1470       ROCKS_LOG_INFO(
1471           options_.info_log,
1472           "%s already present, but not referenced by any backup. We will "
1473           "overwrite the file.",
1474           fname.c_str());
1475       need_to_copy = true;
1476       backup_env_->DeleteFile(final_dest_path);
1477     } else {
1478       // the file is present and referenced by a backup
1479       ROCKS_LOG_INFO(options_.info_log,
1480                      "%s already present, calculate checksum", fname.c_str());
1481       s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
1482                             size_limit, &checksum_value);
1483     }
1484   }
1485   live_dst_paths.insert(final_dest_path);
1486 
1487   if (!contents.empty() || need_to_copy) {
1488     ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
1489                    copy_dest_path->c_str());
1490     CopyOrCreateWorkItem copy_or_create_work_item(
1491         src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
1492         db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
1493         size_limit, progress_callback);
1494     BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1495         copy_or_create_work_item.result.get_future(), shared, need_to_copy,
1496         backup_env_, temp_dest_path, final_dest_path, dst_relative);
1497     files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1498     backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1499   } else {
1500     std::promise<CopyOrCreateResult> promise_result;
1501     BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1502         promise_result.get_future(), shared, need_to_copy, backup_env_,
1503         temp_dest_path, final_dest_path, dst_relative);
1504     backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1505     CopyOrCreateResult result;
1506     result.status = s;
1507     result.size = size_bytes;
1508     result.checksum_value = checksum_value;
1509     promise_result.set_value(std::move(result));
1510   }
1511   return s;
1512 }
1513 
CalculateChecksum(const std::string & src,Env * src_env,const EnvOptions & src_env_options,uint64_t size_limit,uint32_t * checksum_value)1514 Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
1515                                            const EnvOptions& src_env_options,
1516                                            uint64_t size_limit,
1517                                            uint32_t* checksum_value) {
1518   *checksum_value = 0;
1519   if (size_limit == 0) {
1520     size_limit = std::numeric_limits<uint64_t>::max();
1521   }
1522 
1523   std::unique_ptr<SequentialFile> src_file;
1524   Status s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1525   if (!s.ok()) {
1526     return s;
1527   }
1528 
1529   std::unique_ptr<SequentialFileReader> src_reader(
1530       new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src));
1531   std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
1532   Slice data;
1533 
1534   do {
1535     if (stop_backup_.load(std::memory_order_acquire)) {
1536       return Status::Incomplete("Backup stopped");
1537     }
1538     size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
1539       copy_file_buffer_size_ : static_cast<size_t>(size_limit);
1540     s = src_reader->Read(buffer_to_read, &data, buf.get());
1541 
1542     if (!s.ok()) {
1543       return s;
1544     }
1545 
1546     size_limit -= data.size();
1547     *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
1548   } while (data.size() > 0 && size_limit > 0);
1549 
1550   return s;
1551 }
1552 
DeleteChildren(const std::string & dir,uint32_t file_type_filter)1553 void BackupEngineImpl::DeleteChildren(const std::string& dir,
1554                                       uint32_t file_type_filter) {
1555   std::vector<std::string> children;
1556   db_env_->GetChildren(dir, &children);  // ignore errors
1557 
1558   for (const auto& f : children) {
1559     uint64_t number;
1560     FileType type;
1561     bool ok = ParseFileName(f, &number, &type);
1562     if (ok && (file_type_filter & (1 << type))) {
1563       // don't delete this file
1564       continue;
1565     }
1566     db_env_->DeleteFile(dir + "/" + f);  // ignore errors
1567   }
1568 }
1569 
InsertPathnameToSizeBytes(const std::string & dir,Env * env,std::unordered_map<std::string,uint64_t> * result)1570 Status BackupEngineImpl::InsertPathnameToSizeBytes(
1571     const std::string& dir, Env* env,
1572     std::unordered_map<std::string, uint64_t>* result) {
1573   assert(result != nullptr);
1574   std::vector<Env::FileAttributes> files_attrs;
1575   Status status = env->FileExists(dir);
1576   if (status.ok()) {
1577     status = env->GetChildrenFileAttributes(dir, &files_attrs);
1578   } else if (status.IsNotFound()) {
1579     // Insert no entries can be considered success
1580     status = Status::OK();
1581   }
1582   const bool slash_needed = dir.empty() || dir.back() != '/';
1583   for (const auto& file_attrs : files_attrs) {
1584     result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
1585                     file_attrs.size_bytes);
1586   }
1587   return status;
1588 }
1589 
GarbageCollect()1590 Status BackupEngineImpl::GarbageCollect() {
1591   assert(!read_only_);
1592 
1593   // We will make a best effort to remove all garbage even in the presence
1594   // of inconsistencies or I/O failures that inhibit finding garbage.
1595   Status overall_status = Status::OK();
1596   // If all goes well, we don't need another auto-GC this session
1597   might_need_garbage_collect_ = false;
1598 
1599   ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
1600 
1601   // delete obsolete shared files
1602   for (bool with_checksum : {false, true}) {
1603     std::vector<std::string> shared_children;
1604     {
1605       std::string shared_path;
1606       if (with_checksum) {
1607         shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
1608       } else {
1609         shared_path = GetAbsolutePath(GetSharedFileRel());
1610       }
1611       auto s = backup_env_->FileExists(shared_path);
1612       if (s.ok()) {
1613         s = backup_env_->GetChildren(shared_path, &shared_children);
1614       } else if (s.IsNotFound()) {
1615         s = Status::OK();
1616       }
1617       if (!s.ok()) {
1618         overall_status = s;
1619         // Trying again later might work
1620         might_need_garbage_collect_ = true;
1621       }
1622     }
1623     for (auto& child : shared_children) {
1624       if (child == "." || child == "..") {
1625         continue;
1626       }
1627       std::string rel_fname;
1628       if (with_checksum) {
1629         rel_fname = GetSharedFileWithChecksumRel(child);
1630       } else {
1631         rel_fname = GetSharedFileRel(child);
1632       }
1633       auto child_itr = backuped_file_infos_.find(rel_fname);
1634       // if it's not refcounted, delete it
1635       if (child_itr == backuped_file_infos_.end() ||
1636           child_itr->second->refs == 0) {
1637         // this might be a directory, but DeleteFile will just fail in that
1638         // case, so we're good
1639         Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
1640         ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1641                        rel_fname.c_str(), s.ToString().c_str());
1642         backuped_file_infos_.erase(rel_fname);
1643         if (!s.ok()) {
1644           // Trying again later might work
1645           might_need_garbage_collect_ = true;
1646         }
1647       }
1648     }
1649   }
1650 
1651   // delete obsolete private files
1652   std::vector<std::string> private_children;
1653   {
1654     auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
1655                                       &private_children);
1656     if (!s.ok()) {
1657       overall_status = s;
1658       // Trying again later might work
1659       might_need_garbage_collect_ = true;
1660     }
1661   }
1662   for (auto& child : private_children) {
1663     if (child == "." || child == "..") {
1664       continue;
1665     }
1666 
1667     BackupID backup_id = 0;
1668     bool tmp_dir = child.find(".tmp") != std::string::npos;
1669     sscanf(child.c_str(), "%u", &backup_id);
1670     if (!tmp_dir &&  // if it's tmp_dir, delete it
1671         (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
1672       // it's either not a number or it's still alive. continue
1673       continue;
1674     }
1675     // here we have to delete the dir and all its children
1676     std::string full_private_path =
1677         GetAbsolutePath(GetPrivateFileRel(backup_id));
1678     std::vector<std::string> subchildren;
1679     backup_env_->GetChildren(full_private_path, &subchildren);
1680     for (auto& subchild : subchildren) {
1681       if (subchild == "." || subchild == "..") {
1682         continue;
1683       }
1684       Status s = backup_env_->DeleteFile(full_private_path + subchild);
1685       ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1686                      (full_private_path + subchild).c_str(),
1687                      s.ToString().c_str());
1688       if (!s.ok()) {
1689         // Trying again later might work
1690         might_need_garbage_collect_ = true;
1691       }
1692     }
1693     // finally delete the private dir
1694     Status s = backup_env_->DeleteDir(full_private_path);
1695     ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
1696                    full_private_path.c_str(), s.ToString().c_str());
1697     if (!s.ok()) {
1698       // Trying again later might work
1699       might_need_garbage_collect_ = true;
1700     }
1701   }
1702 
1703   assert(overall_status.ok() || might_need_garbage_collect_);
1704   return overall_status;
1705 }
1706 
1707 // ------- BackupMeta class --------
1708 
AddFile(std::shared_ptr<FileInfo> file_info)1709 Status BackupEngineImpl::BackupMeta::AddFile(
1710     std::shared_ptr<FileInfo> file_info) {
1711   auto itr = file_infos_->find(file_info->filename);
1712   if (itr == file_infos_->end()) {
1713     auto ret = file_infos_->insert({file_info->filename, file_info});
1714     if (ret.second) {
1715       itr = ret.first;
1716       itr->second->refs = 1;
1717     } else {
1718       // if this happens, something is seriously wrong
1719       return Status::Corruption("In memory metadata insertion error");
1720     }
1721   } else {
1722     if (itr->second->checksum_value != file_info->checksum_value) {
1723       return Status::Corruption(
1724           "Checksum mismatch for existing backup file. Delete old backups and "
1725           "try again.");
1726     }
1727     ++itr->second->refs;  // increase refcount if already present
1728   }
1729 
1730   size_ += file_info->size;
1731   files_.push_back(itr->second);
1732 
1733   return Status::OK();
1734 }
1735 
Delete(bool delete_meta)1736 Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
1737   Status s;
1738   for (const auto& file : files_) {
1739     --file->refs;  // decrease refcount
1740   }
1741   files_.clear();
1742   // delete meta file
1743   if (delete_meta) {
1744     s = env_->FileExists(meta_filename_);
1745     if (s.ok()) {
1746       s = env_->DeleteFile(meta_filename_);
1747     } else if (s.IsNotFound()) {
1748       s = Status::OK();  // nothing to delete
1749     }
1750   }
1751   timestamp_ = 0;
1752   return s;
1753 }
1754 
1755 Slice kMetaDataPrefix("metadata ");
1756 
1757 // each backup meta file is of the format:
1758 // <timestamp>
1759 // <seq number>
1760 // <metadata(literal string)> <metadata> (optional)
1761 // <number of files>
1762 // <file1> <crc32(literal string)> <crc32_value>
1763 // <file2> <crc32(literal string)> <crc32_value>
1764 // ...
LoadFromFile(const std::string & backup_dir,const std::unordered_map<std::string,uint64_t> & abs_path_to_size)1765 Status BackupEngineImpl::BackupMeta::LoadFromFile(
1766     const std::string& backup_dir,
1767     const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
1768   assert(Empty());
1769   Status s;
1770   std::unique_ptr<SequentialFile> backup_meta_file;
1771   s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
1772   if (!s.ok()) {
1773     return s;
1774   }
1775 
1776   std::unique_ptr<SequentialFileReader> backup_meta_reader(
1777       new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file),
1778                                meta_filename_));
1779   std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
1780   Slice data;
1781   s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
1782 
1783   if (!s.ok() || data.size() == max_backup_meta_file_size_) {
1784     return s.ok() ? Status::Corruption("File size too big") : s;
1785   }
1786   buf[data.size()] = 0;
1787 
1788   uint32_t num_files = 0;
1789   char *next;
1790   timestamp_ = strtoull(data.data(), &next, 10);
1791   data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1792   sequence_number_ = strtoull(data.data(), &next, 10);
1793   data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1794 
1795   if (data.starts_with(kMetaDataPrefix)) {
1796     // app metadata present
1797     data.remove_prefix(kMetaDataPrefix.size());
1798     Slice hex_encoded_metadata = GetSliceUntil(&data, '\n');
1799     bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_);
1800     if (!decode_success) {
1801       return Status::Corruption(
1802           "Failed to decode stored hex encoded app metadata");
1803     }
1804   }
1805 
1806   num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
1807   data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1808 
1809   std::vector<std::shared_ptr<FileInfo>> files;
1810 
1811   Slice checksum_prefix("crc32 ");
1812 
1813   for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
1814     auto line = GetSliceUntil(&data, '\n');
1815     std::string filename = GetSliceUntil(&line, ' ').ToString();
1816 
1817     uint64_t size;
1818     const std::shared_ptr<FileInfo> file_info = GetFile(filename);
1819     if (file_info) {
1820       size = file_info->size;
1821     } else {
1822       std::string abs_path = backup_dir + "/" + filename;
1823       try {
1824         size = abs_path_to_size.at(abs_path);
1825       } catch (std::out_of_range&) {
1826         return Status::Corruption("Size missing for pathname: " + abs_path);
1827       }
1828     }
1829 
1830     if (line.empty()) {
1831       return Status::Corruption("File checksum is missing for " + filename +
1832                                 " in " + meta_filename_);
1833     }
1834 
1835     uint32_t checksum_value = 0;
1836     if (line.starts_with(checksum_prefix)) {
1837       line.remove_prefix(checksum_prefix.size());
1838       checksum_value = static_cast<uint32_t>(
1839           strtoul(line.data(), nullptr, 10));
1840       if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
1841         return Status::Corruption("Invalid checksum value for " + filename +
1842                                   " in " + meta_filename_);
1843       }
1844     } else {
1845       return Status::Corruption("Unknown checksum type for " + filename +
1846                                 " in " + meta_filename_);
1847     }
1848 
1849     files.emplace_back(new FileInfo(filename, size, checksum_value));
1850   }
1851 
1852   if (s.ok() && data.size() > 0) {
1853     // file has to be read completely. if not, we count it as corruption
1854     s = Status::Corruption("Tailing data in backup meta file in " +
1855                            meta_filename_);
1856   }
1857 
1858   if (s.ok()) {
1859     files_.reserve(files.size());
1860     for (const auto& file_info : files) {
1861       s = AddFile(file_info);
1862       if (!s.ok()) {
1863         break;
1864       }
1865     }
1866   }
1867 
1868   return s;
1869 }
1870 
StoreToFile(bool sync)1871 Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
1872   Status s;
1873   std::unique_ptr<WritableFile> backup_meta_file;
1874   EnvOptions env_options;
1875   env_options.use_mmap_writes = false;
1876   env_options.use_direct_writes = false;
1877   s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
1878   if (!s.ok()) {
1879     return s;
1880   }
1881 
1882   std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
1883   size_t len = 0, buf_size = max_backup_meta_file_size_;
1884   len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
1885   len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
1886                   sequence_number_);
1887   if (!app_metadata_.empty()) {
1888     std::string hex_encoded_metadata =
1889         Slice(app_metadata_).ToString(/* hex */ true);
1890 
1891     // +1 to accommodate newline character
1892     size_t hex_meta_strlen = kMetaDataPrefix.ToString().length() + hex_encoded_metadata.length() + 1;
1893     if (hex_meta_strlen >= buf_size) {
1894       return Status::Corruption("Buffer too small to fit backup metadata");
1895     }
1896     else if (len + hex_meta_strlen >= buf_size) {
1897       backup_meta_file->Append(Slice(buf.get(), len));
1898       buf.reset();
1899       std::unique_ptr<char[]> new_reset_buf(
1900           new char[max_backup_meta_file_size_]);
1901       buf.swap(new_reset_buf);
1902       len = 0;
1903     }
1904     len += snprintf(buf.get() + len, buf_size - len, "%s%s\n",
1905                     kMetaDataPrefix.ToString().c_str(),
1906                     hex_encoded_metadata.c_str());
1907   }
1908 
1909   char writelen_temp[19];
1910   if (len + snprintf(writelen_temp, sizeof(writelen_temp),
1911                      "%" ROCKSDB_PRIszt "\n", files_.size()) >= buf_size) {
1912     backup_meta_file->Append(Slice(buf.get(), len));
1913     buf.reset();
1914     std::unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
1915     buf.swap(new_reset_buf);
1916     len = 0;
1917   }
1918   {
1919     const char *const_write = writelen_temp;
1920     len += snprintf(buf.get() + len, buf_size - len, "%s", const_write);
1921   }
1922 
1923   for (const auto& file : files_) {
1924     // use crc32 for now, switch to something else if needed
1925 
1926     size_t newlen = len + file->filename.length() + snprintf(writelen_temp,
1927       sizeof(writelen_temp), " crc32 %u\n", file->checksum_value);
1928     const char *const_write = writelen_temp;
1929     if (newlen >= buf_size) {
1930       backup_meta_file->Append(Slice(buf.get(), len));
1931       buf.reset();
1932       std::unique_ptr<char[]> new_reset_buf(
1933           new char[max_backup_meta_file_size_]);
1934       buf.swap(new_reset_buf);
1935       len = 0;
1936     }
1937     len += snprintf(buf.get() + len, buf_size - len, "%s%s",
1938                     file->filename.c_str(), const_write);
1939   }
1940 
1941   s = backup_meta_file->Append(Slice(buf.get(), len));
1942   if (s.ok() && sync) {
1943     s = backup_meta_file->Sync();
1944   }
1945   if (s.ok()) {
1946     s = backup_meta_file->Close();
1947   }
1948   if (s.ok()) {
1949     s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
1950   }
1951   return s;
1952 }
1953 
1954 // -------- BackupEngineReadOnlyImpl ---------
1955 class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
1956  public:
BackupEngineReadOnlyImpl(const BackupableDBOptions & options,Env * db_env)1957   BackupEngineReadOnlyImpl(const BackupableDBOptions& options, Env* db_env)
1958       : backup_engine_(new BackupEngineImpl(options, db_env, true)) {}
1959 
~BackupEngineReadOnlyImpl()1960   ~BackupEngineReadOnlyImpl() override {}
1961 
1962   // The returned BackupInfos are in chronological order, which means the
1963   // latest backup comes last.
GetBackupInfo(std::vector<BackupInfo> * backup_info)1964   void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
1965     backup_engine_->GetBackupInfo(backup_info);
1966   }
1967 
GetCorruptedBackups(std::vector<BackupID> * corrupt_backup_ids)1968   void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override {
1969     backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
1970   }
1971 
1972   using BackupEngineReadOnly::RestoreDBFromBackup;
RestoreDBFromBackup(const RestoreOptions & options,BackupID backup_id,const std::string & db_dir,const std::string & wal_dir)1973   Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
1974                              const std::string& db_dir,
1975                              const std::string& wal_dir) override {
1976     return backup_engine_->RestoreDBFromBackup(options, backup_id, db_dir,
1977                                                wal_dir);
1978   }
1979 
1980   using BackupEngineReadOnly::RestoreDBFromLatestBackup;
RestoreDBFromLatestBackup(const RestoreOptions & options,const std::string & db_dir,const std::string & wal_dir)1981   Status RestoreDBFromLatestBackup(const RestoreOptions& options,
1982                                    const std::string& db_dir,
1983                                    const std::string& wal_dir) override {
1984     return backup_engine_->RestoreDBFromLatestBackup(options, db_dir, wal_dir);
1985   }
1986 
VerifyBackup(BackupID backup_id)1987   Status VerifyBackup(BackupID backup_id) override {
1988     return backup_engine_->VerifyBackup(backup_id);
1989   }
1990 
Initialize()1991   Status Initialize() { return backup_engine_->Initialize(); }
1992 
1993  private:
1994   std::unique_ptr<BackupEngineImpl> backup_engine_;
1995 };
1996 
Open(const BackupableDBOptions & options,Env * env,BackupEngineReadOnly ** backup_engine_ptr)1997 Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env,
1998                                   BackupEngineReadOnly** backup_engine_ptr) {
1999   if (options.destroy_old_data) {
2000     return Status::InvalidArgument(
2001         "Can't destroy old data with ReadOnly BackupEngine");
2002   }
2003   std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
2004       new BackupEngineReadOnlyImpl(options, env));
2005   auto s = backup_engine->Initialize();
2006   if (!s.ok()) {
2007     *backup_engine_ptr = nullptr;
2008     return s;
2009   }
2010   *backup_engine_ptr = backup_engine.release();
2011   return Status::OK();
2012 }
2013 
2014 }  // namespace ROCKSDB_NAMESPACE
2015 
2016 #endif  // ROCKSDB_LITE
2017