1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2012 Facebook.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file.
9 
10 #ifndef ROCKSDB_LITE
11 
12 #include "utilities/checkpoint/checkpoint_impl.h"
13 
14 #include <algorithm>
15 #include <cinttypes>
16 #include <string>
17 #include <vector>
18 
19 #include "db/wal_manager.h"
20 #include "file/file_util.h"
21 #include "file/filename.h"
22 #include "port/port.h"
23 #include "rocksdb/db.h"
24 #include "rocksdb/env.h"
25 #include "rocksdb/metadata.h"
26 #include "rocksdb/transaction_log.h"
27 #include "rocksdb/utilities/checkpoint.h"
28 #include "test_util/sync_point.h"
29 
30 namespace ROCKSDB_NAMESPACE {
31 
Create(DB * db,Checkpoint ** checkpoint_ptr)32 Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
33   *checkpoint_ptr = new CheckpointImpl(db);
34   return Status::OK();
35 }
36 
CreateCheckpoint(const std::string &,uint64_t,uint64_t *)37 Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
38                                     uint64_t /*log_size_for_flush*/,
39                                     uint64_t* /*sequence_number_ptr*/) {
40   return Status::NotSupported("");
41 }
42 
CleanStagingDirectory(const std::string & full_private_path,Logger * info_log)43 void CheckpointImpl::CleanStagingDirectory(
44     const std::string& full_private_path, Logger* info_log) {
45     std::vector<std::string> subchildren;
46   Status s = db_->GetEnv()->FileExists(full_private_path);
47   if (s.IsNotFound()) {
48     return;
49   }
50   ROCKS_LOG_INFO(info_log, "File exists %s -- %s",
51                  full_private_path.c_str(), s.ToString().c_str());
52   db_->GetEnv()->GetChildren(full_private_path, &subchildren);
53   for (auto& subchild : subchildren) {
54     std::string subchild_path = full_private_path + "/" + subchild;
55     s = db_->GetEnv()->DeleteFile(subchild_path);
56     ROCKS_LOG_INFO(info_log, "Delete file %s -- %s",
57                    subchild_path.c_str(), s.ToString().c_str());
58   }
59   // finally delete the private dir
60   s = db_->GetEnv()->DeleteDir(full_private_path);
61   ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s",
62                  full_private_path.c_str(), s.ToString().c_str());
63 }
64 
ExportColumnFamily(ColumnFamilyHandle *,const std::string &,ExportImportFilesMetaData **)65 Status Checkpoint::ExportColumnFamily(
66     ColumnFamilyHandle* /*handle*/, const std::string& /*export_dir*/,
67     ExportImportFilesMetaData** /*metadata*/) {
68   return Status::NotSupported("");
69 }
70 
71 // Builds an openable snapshot of RocksDB
CreateCheckpoint(const std::string & checkpoint_dir,uint64_t log_size_for_flush,uint64_t * sequence_number_ptr)72 Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
73                                         uint64_t log_size_for_flush,
74                                         uint64_t* sequence_number_ptr) {
75   DBOptions db_options = db_->GetDBOptions();
76 
77   Status s = db_->GetEnv()->FileExists(checkpoint_dir);
78   if (s.ok()) {
79     return Status::InvalidArgument("Directory exists");
80   } else if (!s.IsNotFound()) {
81     assert(s.IsIOError());
82     return s;
83   }
84 
85   ROCKS_LOG_INFO(
86       db_options.info_log,
87       "Started the snapshot process -- creating snapshot in directory %s",
88       checkpoint_dir.c_str());
89 
90   size_t final_nonslash_idx = checkpoint_dir.find_last_not_of('/');
91   if (final_nonslash_idx == std::string::npos) {
92     // npos means it's only slashes or empty. Non-empty means it's the root
93     // directory, but it shouldn't be because we verified above the directory
94     // doesn't exist.
95     assert(checkpoint_dir.empty());
96     return Status::InvalidArgument("invalid checkpoint directory name");
97   }
98 
99   std::string full_private_path =
100       checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
101   ROCKS_LOG_INFO(
102       db_options.info_log,
103       "Snapshot process -- using temporary directory %s",
104       full_private_path.c_str());
105   CleanStagingDirectory(full_private_path, db_options.info_log.get());
106   // create snapshot directory
107   s = db_->GetEnv()->CreateDir(full_private_path);
108   uint64_t sequence_number = 0;
109   if (s.ok()) {
110     db_->DisableFileDeletions();
111     s = CreateCustomCheckpoint(
112         db_options,
113         [&](const std::string& src_dirname, const std::string& fname,
114             FileType) {
115           ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str());
116           return db_->GetFileSystem()->LinkFile(src_dirname + fname,
117                                                 full_private_path + fname,
118                                                 IOOptions(), nullptr);
119         } /* link_file_cb */,
120         [&](const std::string& src_dirname, const std::string& fname,
121             uint64_t size_limit_bytes, FileType) {
122           ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
123           return CopyFile(db_->GetFileSystem(), src_dirname + fname,
124                           full_private_path + fname, size_limit_bytes,
125                           db_options.use_fsync);
126         } /* copy_file_cb */,
127         [&](const std::string& fname, const std::string& contents, FileType) {
128           ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
129           return CreateFile(db_->GetFileSystem(), full_private_path + fname,
130                             contents, db_options.use_fsync);
131         } /* create_file_cb */,
132         &sequence_number, log_size_for_flush);
133     // we copied all the files, enable file deletions
134     db_->EnableFileDeletions(false);
135   }
136 
137   if (s.ok()) {
138     // move tmp private backup to real snapshot directory
139     s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
140   }
141   if (s.ok()) {
142     std::unique_ptr<Directory> checkpoint_directory;
143     db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
144     if (checkpoint_directory != nullptr) {
145       s = checkpoint_directory->Fsync();
146     }
147   }
148 
149   if (s.ok()) {
150     if (sequence_number_ptr != nullptr) {
151       *sequence_number_ptr = sequence_number;
152     }
153     // here we know that we succeeded and installed the new snapshot
154     ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good");
155     ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64,
156                    sequence_number);
157   } else {
158     // clean all the files we might have created
159     ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s",
160                    s.ToString().c_str());
161     CleanStagingDirectory(full_private_path, db_options.info_log.get());
162   }
163   return s;
164 }
165 
CreateCustomCheckpoint(const DBOptions & db_options,std::function<Status (const std::string & src_dirname,const std::string & src_fname,FileType type)> link_file_cb,std::function<Status (const std::string & src_dirname,const std::string & src_fname,uint64_t size_limit_bytes,FileType type)> copy_file_cb,std::function<Status (const std::string & fname,const std::string & contents,FileType type)> create_file_cb,uint64_t * sequence_number,uint64_t log_size_for_flush)166 Status CheckpointImpl::CreateCustomCheckpoint(
167     const DBOptions& db_options,
168     std::function<Status(const std::string& src_dirname,
169                          const std::string& src_fname, FileType type)>
170         link_file_cb,
171     std::function<Status(const std::string& src_dirname,
172                          const std::string& src_fname,
173                          uint64_t size_limit_bytes, FileType type)>
174         copy_file_cb,
175     std::function<Status(const std::string& fname, const std::string& contents,
176                          FileType type)>
177         create_file_cb,
178     uint64_t* sequence_number, uint64_t log_size_for_flush) {
179   Status s;
180   std::vector<std::string> live_files;
181   uint64_t manifest_file_size = 0;
182   uint64_t min_log_num = port::kMaxUint64;
183   *sequence_number = db_->GetLatestSequenceNumber();
184   bool same_fs = true;
185   VectorLogPtr live_wal_files;
186 
187   bool flush_memtable = true;
188   if (s.ok()) {
189     if (!db_options.allow_2pc) {
190       if (log_size_for_flush == port::kMaxUint64) {
191         flush_memtable = false;
192       } else if (log_size_for_flush > 0) {
193         // If out standing log files are small, we skip the flush.
194         s = db_->GetSortedWalFiles(live_wal_files);
195 
196         if (!s.ok()) {
197           return s;
198         }
199 
200         // Don't flush column families if total log size is smaller than
201         // log_size_for_flush. We copy the log files instead.
202         // We may be able to cover 2PC case too.
203         uint64_t total_wal_size = 0;
204         for (auto& wal : live_wal_files) {
205           total_wal_size += wal->SizeFileBytes();
206         }
207         if (total_wal_size < log_size_for_flush) {
208           flush_memtable = false;
209         }
210         live_wal_files.clear();
211       }
212     }
213 
214     // this will return live_files prefixed with "/"
215     s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
216 
217     if (s.ok() && db_options.allow_2pc) {
218       // If 2PC is enabled, we need to get minimum log number after the flush.
219       // Need to refetch the live files to recapture the snapshot.
220       if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
221                                &min_log_num)) {
222         return Status::InvalidArgument(
223             "2PC enabled but cannot fine the min log number to keep.");
224       }
225       // We need to refetch live files with flush to handle this case:
226       // A previous 000001.log contains the prepare record of transaction tnx1.
227       // The current log file is 000002.log, and sequence_number points to this
228       // file.
229       // After calling GetLiveFiles(), 000003.log is created.
230       // Then tnx1 is committed. The commit record is written to 000003.log.
231       // Now we fetch min_log_num, which will be 3.
232       // Then only 000002.log and 000003.log will be copied, and 000001.log will
233       // be skipped. 000003.log contains commit message of tnx1, but we don't
234       // have respective prepare record for it.
235       // In order to avoid this situation, we need to force flush to make sure
236       // all transactions committed before getting min_log_num will be flushed
237       // to SST files.
238       // We cannot get min_log_num before calling the GetLiveFiles() for the
239       // first time, because if we do that, all the logs files will be included,
240       // far more than needed.
241       s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
242     }
243 
244     TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
245     TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
246     db_->FlushWAL(false /* sync */);
247   }
248   // if we have more than one column family, we need to also get WAL files
249   if (s.ok()) {
250     s = db_->GetSortedWalFiles(live_wal_files);
251   }
252   if (!s.ok()) {
253     return s;
254   }
255 
256   size_t wal_size = live_wal_files.size();
257 
258   // copy/hard link live_files
259   std::string manifest_fname, current_fname;
260   for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
261     uint64_t number;
262     FileType type;
263     bool ok = ParseFileName(live_files[i], &number, &type);
264     if (!ok) {
265       s = Status::Corruption("Can't parse file name. This is very bad");
266       break;
267     }
268     // we should only get sst, options, manifest and current files here
269     assert(type == kTableFile || type == kDescriptorFile ||
270            type == kCurrentFile || type == kOptionsFile);
271     assert(live_files[i].size() > 0 && live_files[i][0] == '/');
272     if (type == kCurrentFile) {
273       // We will craft the current file manually to ensure it's consistent with
274       // the manifest number. This is necessary because current's file contents
275       // can change during checkpoint creation.
276       current_fname = live_files[i];
277       continue;
278     } else if (type == kDescriptorFile) {
279       manifest_fname = live_files[i];
280     }
281     std::string src_fname = live_files[i];
282 
283     // rules:
284     // * if it's kTableFile, then it's shared
285     // * if it's kDescriptorFile, limit the size to manifest_file_size
286     // * always copy if cross-device link
287     if ((type == kTableFile) && same_fs) {
288       s = link_file_cb(db_->GetName(), src_fname, type);
289       if (s.IsNotSupported()) {
290         same_fs = false;
291         s = Status::OK();
292       }
293     }
294     if ((type != kTableFile) || (!same_fs)) {
295       s = copy_file_cb(db_->GetName(), src_fname,
296                        (type == kDescriptorFile) ? manifest_file_size : 0,
297                        type);
298     }
299   }
300   if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
301     create_file_cb(current_fname, manifest_fname.substr(1) + "\n",
302                    kCurrentFile);
303   }
304   ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
305                  live_wal_files.size());
306 
307   // Link WAL files. Copy exact size of last one because it is the only one
308   // that has changes after the last flush.
309   for (size_t i = 0; s.ok() && i < wal_size; ++i) {
310     if ((live_wal_files[i]->Type() == kAliveLogFile) &&
311         (!flush_memtable ||
312          live_wal_files[i]->StartSequence() >= *sequence_number ||
313          live_wal_files[i]->LogNumber() >= min_log_num)) {
314       if (i + 1 == wal_size) {
315         s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
316                          live_wal_files[i]->SizeFileBytes(), kLogFile);
317         break;
318       }
319       if (same_fs) {
320         // we only care about live log files
321         s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
322                          kLogFile);
323         if (s.IsNotSupported()) {
324           same_fs = false;
325           s = Status::OK();
326         }
327       }
328       if (!same_fs) {
329         s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0,
330                          kLogFile);
331       }
332     }
333   }
334 
335   return s;
336 }
337 
338 // Exports all live SST files of a specified Column Family onto export_dir,
339 // returning SST files information in metadata.
ExportColumnFamily(ColumnFamilyHandle * handle,const std::string & export_dir,ExportImportFilesMetaData ** metadata)340 Status CheckpointImpl::ExportColumnFamily(
341     ColumnFamilyHandle* handle, const std::string& export_dir,
342     ExportImportFilesMetaData** metadata) {
343   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handle);
344   const auto cf_name = cfh->GetName();
345   const auto db_options = db_->GetDBOptions();
346 
347   assert(metadata != nullptr);
348   assert(*metadata == nullptr);
349   auto s = db_->GetEnv()->FileExists(export_dir);
350   if (s.ok()) {
351     return Status::InvalidArgument("Specified export_dir exists");
352   } else if (!s.IsNotFound()) {
353     assert(s.IsIOError());
354     return s;
355   }
356 
357   const auto final_nonslash_idx = export_dir.find_last_not_of('/');
358   if (final_nonslash_idx == std::string::npos) {
359     return Status::InvalidArgument("Specified export_dir invalid");
360   }
361   ROCKS_LOG_INFO(db_options.info_log,
362                  "[%s] export column family onto export directory %s",
363                  cf_name.c_str(), export_dir.c_str());
364 
365   // Create a temporary export directory.
366   const auto tmp_export_dir =
367       export_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
368   s = db_->GetEnv()->CreateDir(tmp_export_dir);
369 
370   if (s.ok()) {
371     s = db_->Flush(ROCKSDB_NAMESPACE::FlushOptions(), handle);
372   }
373 
374   ColumnFamilyMetaData db_metadata;
375   if (s.ok()) {
376     // Export live sst files with file deletions disabled.
377     s = db_->DisableFileDeletions();
378     if (s.ok()) {
379       db_->GetColumnFamilyMetaData(handle, &db_metadata);
380 
381       s = ExportFilesInMetaData(
382           db_options, db_metadata,
383           [&](const std::string& src_dirname, const std::string& fname) {
384             ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s",
385                            cf_name.c_str(), fname.c_str());
386             return db_->GetEnv()->LinkFile(src_dirname + fname,
387                                            tmp_export_dir + fname);
388           } /*link_file_cb*/,
389           [&](const std::string& src_dirname, const std::string& fname) {
390             ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s",
391                            cf_name.c_str(), fname.c_str());
392             return CopyFile(db_->GetFileSystem(), src_dirname + fname,
393                             tmp_export_dir + fname, 0, db_options.use_fsync);
394           } /*copy_file_cb*/);
395 
396       const auto enable_status = db_->EnableFileDeletions(false /*force*/);
397       if (s.ok()) {
398         s = enable_status;
399       }
400     }
401   }
402 
403   auto moved_to_user_specified_dir = false;
404   if (s.ok()) {
405     // Move temporary export directory to the actual export directory.
406     s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir);
407   }
408 
409   if (s.ok()) {
410     // Fsync export directory.
411     moved_to_user_specified_dir = true;
412     std::unique_ptr<Directory> dir_ptr;
413     s = db_->GetEnv()->NewDirectory(export_dir, &dir_ptr);
414     if (s.ok()) {
415       assert(dir_ptr != nullptr);
416       s = dir_ptr->Fsync();
417     }
418   }
419 
420   if (s.ok()) {
421     // Export of files succeeded. Fill in the metadata information.
422     auto result_metadata = new ExportImportFilesMetaData();
423     result_metadata->db_comparator_name = handle->GetComparator()->Name();
424     for (const auto& level_metadata : db_metadata.levels) {
425       for (const auto& file_metadata : level_metadata.files) {
426         LiveFileMetaData live_file_metadata;
427         live_file_metadata.size = file_metadata.size;
428         live_file_metadata.name = std::move(file_metadata.name);
429         live_file_metadata.file_number = file_metadata.file_number;
430         live_file_metadata.db_path = export_dir;
431         live_file_metadata.smallest_seqno = file_metadata.smallest_seqno;
432         live_file_metadata.largest_seqno = file_metadata.largest_seqno;
433         live_file_metadata.smallestkey = std::move(file_metadata.smallestkey);
434         live_file_metadata.largestkey = std::move(file_metadata.largestkey);
435         live_file_metadata.oldest_blob_file_number =
436             file_metadata.oldest_blob_file_number;
437         live_file_metadata.level = level_metadata.level;
438         result_metadata->files.push_back(live_file_metadata);
439       }
440       *metadata = result_metadata;
441     }
442     ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.",
443                    cf_name.c_str());
444   } else {
445     // Failure: Clean up all the files/directories created.
446     ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s",
447                    cf_name.c_str(), s.ToString().c_str());
448     std::vector<std::string> subchildren;
449     const auto cleanup_dir =
450         moved_to_user_specified_dir ? export_dir : tmp_export_dir;
451     db_->GetEnv()->GetChildren(cleanup_dir, &subchildren);
452     for (const auto& subchild : subchildren) {
453       const auto subchild_path = cleanup_dir + "/" + subchild;
454       const auto status = db_->GetEnv()->DeleteFile(subchild_path);
455       if (!status.ok()) {
456         ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s",
457                        subchild_path.c_str(), status.ToString().c_str());
458       }
459     }
460     const auto status = db_->GetEnv()->DeleteDir(cleanup_dir);
461     if (!status.ok()) {
462       ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s",
463                      cleanup_dir.c_str(), status.ToString().c_str());
464     }
465   }
466   return s;
467 }
468 
ExportFilesInMetaData(const DBOptions & db_options,const ColumnFamilyMetaData & metadata,std::function<Status (const std::string & src_dirname,const std::string & src_fname)> link_file_cb,std::function<Status (const std::string & src_dirname,const std::string & src_fname)> copy_file_cb)469 Status CheckpointImpl::ExportFilesInMetaData(
470     const DBOptions& db_options, const ColumnFamilyMetaData& metadata,
471     std::function<Status(const std::string& src_dirname,
472                          const std::string& src_fname)>
473         link_file_cb,
474     std::function<Status(const std::string& src_dirname,
475                          const std::string& src_fname)>
476         copy_file_cb) {
477   Status s;
478   auto hardlink_file = true;
479 
480   // Copy/hard link files in metadata.
481   size_t num_files = 0;
482   for (const auto& level_metadata : metadata.levels) {
483     for (const auto& file_metadata : level_metadata.files) {
484       uint64_t number;
485       FileType type;
486       const auto ok = ParseFileName(file_metadata.name, &number, &type);
487       if (!ok) {
488         s = Status::Corruption("Could not parse file name");
489         break;
490       }
491 
492       // We should only get sst files here.
493       assert(type == kTableFile);
494       assert(file_metadata.size > 0 && file_metadata.name[0] == '/');
495       const auto src_fname = file_metadata.name;
496       ++num_files;
497 
498       if (hardlink_file) {
499         s = link_file_cb(db_->GetName(), src_fname);
500         if (num_files == 1 && s.IsNotSupported()) {
501           // Fallback to copy if link failed due to cross-device directories.
502           hardlink_file = false;
503           s = Status::OK();
504         }
505       }
506       if (!hardlink_file) {
507         s = copy_file_cb(db_->GetName(), src_fname);
508       }
509       if (!s.ok()) {
510         break;
511       }
512     }
513   }
514   ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt,
515                  num_files);
516 
517   return s;
518 }
519 }  // namespace ROCKSDB_NAMESPACE
520 
521 #endif  // ROCKSDB_LITE
522