1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10 
11 #include <cinttypes>
12 #include <set>
13 #include <unordered_set>
14 #include "db/event_helpers.h"
15 #include "db/memtable_list.h"
16 #include "file/file_util.h"
17 #include "file/filename.h"
18 #include "file/sst_file_manager_impl.h"
19 #include "util/autovector.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
MinLogNumberToKeep()23 uint64_t DBImpl::MinLogNumberToKeep() {
24   if (allow_2pc()) {
25     return versions_->min_log_number_to_keep_2pc();
26   } else {
27     return versions_->MinLogNumberWithUnflushedData();
28   }
29 }
30 
MinObsoleteSstNumberToKeep()31 uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
32   mutex_.AssertHeld();
33   if (!pending_outputs_.empty()) {
34     return *pending_outputs_.begin();
35   }
36   return std::numeric_limits<uint64_t>::max();
37 }
38 
39 // * Returns the list of live files in 'sst_live'
40 // If it's doing full scan:
41 // * Returns the list of all files in the filesystem in
42 // 'full_scan_candidate_files'.
43 // Otherwise, gets obsolete files from VersionSet.
44 // no_full_scan = true -- never do the full scan using GetChildren()
45 // force = false -- don't force the full scan, except every
46 //  mutable_db_options_.delete_obsolete_files_period_micros
47 // force = true -- force the full scan
FindObsoleteFiles(JobContext * job_context,bool force,bool no_full_scan)48 void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
49                                bool no_full_scan) {
50   mutex_.AssertHeld();
51 
52   // if deletion is disabled, do nothing
53   if (disable_delete_obsolete_files_ > 0) {
54     return;
55   }
56 
57   bool doing_the_full_scan = false;
58 
59   // logic for figuring out if we're doing the full scan
60   if (no_full_scan) {
61     doing_the_full_scan = false;
62   } else if (force ||
63              mutable_db_options_.delete_obsolete_files_period_micros == 0) {
64     doing_the_full_scan = true;
65   } else {
66     const uint64_t now_micros = env_->NowMicros();
67     if ((delete_obsolete_files_last_run_ +
68          mutable_db_options_.delete_obsolete_files_period_micros) <
69         now_micros) {
70       doing_the_full_scan = true;
71       delete_obsolete_files_last_run_ = now_micros;
72     }
73   }
74 
75   // don't delete files that might be currently written to from compaction
76   // threads
77   // Since job_context->min_pending_output is set, until file scan finishes,
78   // mutex_ cannot be released. Otherwise, we might see no min_pending_output
79   // here but later find newer generated unfinalized files while scanning.
80   if (!pending_outputs_.empty()) {
81     job_context->min_pending_output = *pending_outputs_.begin();
82   } else {
83     // delete all of them
84     job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
85   }
86 
87   // Get obsolete files.  This function will also update the list of
88   // pending files in VersionSet().
89   versions_->GetObsoleteFiles(&job_context->sst_delete_files,
90                               &job_context->manifest_delete_files,
91                               job_context->min_pending_output);
92 
93   // Mark the elements in job_context->sst_delete_files as grabbedForPurge
94   // so that other threads calling FindObsoleteFiles with full_scan=true
95   // will not add these files to candidate list for purge.
96   for (const auto& sst_to_del : job_context->sst_delete_files) {
97     MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
98   }
99 
100   // store the current filenum, lognum, etc
101   job_context->manifest_file_number = versions_->manifest_file_number();
102   job_context->pending_manifest_file_number =
103       versions_->pending_manifest_file_number();
104   job_context->log_number = MinLogNumberToKeep();
105   job_context->prev_log_number = versions_->prev_log_number();
106 
107   versions_->AddLiveFiles(&job_context->sst_live);
108   if (doing_the_full_scan) {
109     InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
110                                   dbname_);
111     std::set<std::string> paths;
112     for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
113          path_id++) {
114       paths.insert(immutable_db_options_.db_paths[path_id].path);
115     }
116 
117     // Note that if cf_paths is not specified in the ColumnFamilyOptions
118     // of a particular column family, we use db_paths as the cf_paths
119     // setting. Hence, there can be multiple duplicates of files from db_paths
120     // in the following code. The duplicate are removed while identifying
121     // unique files in PurgeObsoleteFiles.
122     for (auto cfd : *versions_->GetColumnFamilySet()) {
123       for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size();
124            path_id++) {
125         auto& path = cfd->ioptions()->cf_paths[path_id].path;
126 
127         if (paths.find(path) == paths.end()) {
128           paths.insert(path);
129         }
130       }
131     }
132 
133     for (auto& path : paths) {
134       // set of all files in the directory. We'll exclude files that are still
135       // alive in the subsequent processings.
136       std::vector<std::string> files;
137       env_->GetChildren(path, &files);  // Ignore errors
138       for (const std::string& file : files) {
139         uint64_t number;
140         FileType type;
141         // 1. If we cannot parse the file name, we skip;
142         // 2. If the file with file_number equals number has already been
143         // grabbed for purge by another compaction job, or it has already been
144         // schedule for purge, we also skip it if we
145         // are doing full scan in order to avoid double deletion of the same
146         // file under race conditions. See
147         // https://github.com/facebook/rocksdb/issues/3573
148         if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) ||
149             !ShouldPurge(number)) {
150           continue;
151         }
152 
153         // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
154         job_context->full_scan_candidate_files.emplace_back("/" + file, path);
155       }
156     }
157 
158     // Add log files in wal_dir
159     if (immutable_db_options_.wal_dir != dbname_) {
160       std::vector<std::string> log_files;
161       env_->GetChildren(immutable_db_options_.wal_dir,
162                         &log_files);  // Ignore errors
163       for (const std::string& log_file : log_files) {
164         job_context->full_scan_candidate_files.emplace_back(
165             log_file, immutable_db_options_.wal_dir);
166       }
167     }
168     // Add info log files in db_log_dir
169     if (!immutable_db_options_.db_log_dir.empty() &&
170         immutable_db_options_.db_log_dir != dbname_) {
171       std::vector<std::string> info_log_files;
172       // Ignore errors
173       env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
174       for (std::string& log_file : info_log_files) {
175         job_context->full_scan_candidate_files.emplace_back(
176             log_file, immutable_db_options_.db_log_dir);
177       }
178     }
179   }
180 
181   // logs_ is empty when called during recovery, in which case there can't yet
182   // be any tracked obsolete logs
183   if (!alive_log_files_.empty() && !logs_.empty()) {
184     uint64_t min_log_number = job_context->log_number;
185     size_t num_alive_log_files = alive_log_files_.size();
186     // find newly obsoleted log files
187     while (alive_log_files_.begin()->number < min_log_number) {
188       auto& earliest = *alive_log_files_.begin();
189       if (immutable_db_options_.recycle_log_file_num >
190           log_recycle_files_.size()) {
191         ROCKS_LOG_INFO(immutable_db_options_.info_log,
192                        "adding log %" PRIu64 " to recycle list\n",
193                        earliest.number);
194         log_recycle_files_.push_back(earliest.number);
195       } else {
196         job_context->log_delete_files.push_back(earliest.number);
197       }
198       if (job_context->size_log_to_delete == 0) {
199         job_context->prev_total_log_size = total_log_size_;
200         job_context->num_alive_log_files = num_alive_log_files;
201       }
202       job_context->size_log_to_delete += earliest.size;
203       total_log_size_ -= earliest.size;
204       if (two_write_queues_) {
205         log_write_mutex_.Lock();
206       }
207       alive_log_files_.pop_front();
208       if (two_write_queues_) {
209         log_write_mutex_.Unlock();
210       }
211       // Current log should always stay alive since it can't have
212       // number < MinLogNumber().
213       assert(alive_log_files_.size());
214     }
215     while (!logs_.empty() && logs_.front().number < min_log_number) {
216       auto& log = logs_.front();
217       if (log.getting_synced) {
218         log_sync_cv_.Wait();
219         // logs_ could have changed while we were waiting.
220         continue;
221       }
222       logs_to_free_.push_back(log.ReleaseWriter());
223       {
224         InstrumentedMutexLock wl(&log_write_mutex_);
225         logs_.pop_front();
226       }
227     }
228     // Current log cannot be obsolete.
229     assert(!logs_.empty());
230   }
231 
232   // We're just cleaning up for DB::Write().
233   assert(job_context->logs_to_free.empty());
234   job_context->logs_to_free = logs_to_free_;
235   job_context->log_recycle_files.assign(log_recycle_files_.begin(),
236                                         log_recycle_files_.end());
237   if (job_context->HaveSomethingToDelete()) {
238     ++pending_purge_obsolete_files_;
239   }
240   logs_to_free_.clear();
241 }
242 
243 namespace {
CompareCandidateFile(const JobContext::CandidateFileInfo & first,const JobContext::CandidateFileInfo & second)244 bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
245                           const JobContext::CandidateFileInfo& second) {
246   if (first.file_name > second.file_name) {
247     return true;
248   } else if (first.file_name < second.file_name) {
249     return false;
250   } else {
251     return (first.file_path > second.file_path);
252   }
253 }
254 };  // namespace
255 
256 // Delete obsolete files and log status and information of file deletion
DeleteObsoleteFileImpl(int job_id,const std::string & fname,const std::string & path_to_sync,FileType type,uint64_t number)257 void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
258                                     const std::string& path_to_sync,
259                                     FileType type, uint64_t number) {
260   Status file_deletion_status;
261   if (type == kTableFile || type == kLogFile) {
262     file_deletion_status =
263         DeleteDBFile(&immutable_db_options_, fname, path_to_sync,
264                      /*force_bg=*/false, /*force_fg=*/!wal_in_db_path_);
265   } else {
266     file_deletion_status = env_->DeleteFile(fname);
267   }
268   TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
269                            &file_deletion_status);
270   if (file_deletion_status.ok()) {
271     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
272                     "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
273                     fname.c_str(), type, number,
274                     file_deletion_status.ToString().c_str());
275   } else if (env_->FileExists(fname).IsNotFound()) {
276     ROCKS_LOG_INFO(
277         immutable_db_options_.info_log,
278         "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
279         " -- %s\n",
280         job_id, fname.c_str(), type, number,
281         file_deletion_status.ToString().c_str());
282   } else {
283     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
284                     "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
285                     job_id, fname.c_str(), type, number,
286                     file_deletion_status.ToString().c_str());
287   }
288   if (type == kTableFile) {
289     EventHelpers::LogAndNotifyTableFileDeletion(
290         &event_logger_, job_id, number, fname, file_deletion_status, GetName(),
291         immutable_db_options_.listeners);
292   }
293 }
294 
295 // Diffs the files listed in filenames and those that do not
296 // belong to live files are possibly removed. Also, removes all the
297 // files in sst_delete_files and log_delete_files.
298 // It is not necessary to hold the mutex when invoking this method.
PurgeObsoleteFiles(JobContext & state,bool schedule_only)299 void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
300   TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
301   // we'd better have sth to delete
302   assert(state.HaveSomethingToDelete());
303 
304   // FindObsoleteFiles() should've populated this so nonzero
305   assert(state.manifest_file_number != 0);
306 
307   // Now, convert live list to an unordered map, WITHOUT mutex held;
308   // set is slow.
309   std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
310   for (const FileDescriptor& fd : state.sst_live) {
311     sst_live_map[fd.GetNumber()] = &fd;
312   }
313   std::unordered_set<uint64_t> log_recycle_files_set(
314       state.log_recycle_files.begin(), state.log_recycle_files.end());
315 
316   auto candidate_files = state.full_scan_candidate_files;
317   candidate_files.reserve(
318       candidate_files.size() + state.sst_delete_files.size() +
319       state.log_delete_files.size() + state.manifest_delete_files.size());
320   // We may ignore the dbname when generating the file names.
321   for (auto& file : state.sst_delete_files) {
322     candidate_files.emplace_back(
323         MakeTableFileName(file.metadata->fd.GetNumber()), file.path);
324     if (file.metadata->table_reader_handle) {
325       table_cache_->Release(file.metadata->table_reader_handle);
326     }
327     file.DeleteMetadata();
328   }
329 
330   for (auto file_num : state.log_delete_files) {
331     if (file_num > 0) {
332       candidate_files.emplace_back(LogFileName(file_num),
333                                    immutable_db_options_.wal_dir);
334     }
335   }
336   for (const auto& filename : state.manifest_delete_files) {
337     candidate_files.emplace_back(filename, dbname_);
338   }
339 
340   // dedup state.candidate_files so we don't try to delete the same
341   // file twice
342   std::sort(candidate_files.begin(), candidate_files.end(),
343             CompareCandidateFile);
344   candidate_files.erase(
345       std::unique(candidate_files.begin(), candidate_files.end()),
346       candidate_files.end());
347 
348   if (state.prev_total_log_size > 0) {
349     ROCKS_LOG_INFO(immutable_db_options_.info_log,
350                    "[JOB %d] Try to delete WAL files size %" PRIu64
351                    ", prev total WAL file size %" PRIu64
352                    ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
353                    state.job_id, state.size_log_to_delete,
354                    state.prev_total_log_size, state.num_alive_log_files);
355   }
356 
357   std::vector<std::string> old_info_log_files;
358   InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
359                                 dbname_);
360 
361   // File numbers of most recent two OPTIONS file in candidate_files (found in
362   // previos FindObsoleteFiles(full_scan=true))
363   // At this point, there must not be any duplicate file numbers in
364   // candidate_files.
365   uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
366   uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min();
367   for (const auto& candidate_file : candidate_files) {
368     const std::string& fname = candidate_file.file_name;
369     uint64_t number;
370     FileType type;
371     if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) ||
372         type != kOptionsFile) {
373       continue;
374     }
375     if (number > optsfile_num1) {
376       optsfile_num2 = optsfile_num1;
377       optsfile_num1 = number;
378     } else if (number > optsfile_num2) {
379       optsfile_num2 = number;
380     }
381   }
382 
383   // Close WALs before trying to delete them.
384   for (const auto w : state.logs_to_free) {
385     // TODO: maybe check the return value of Close.
386     w->Close();
387   }
388 
389   bool own_files = OwnTablesAndLogs();
390   std::unordered_set<uint64_t> files_to_del;
391   for (const auto& candidate_file : candidate_files) {
392     const std::string& to_delete = candidate_file.file_name;
393     uint64_t number;
394     FileType type;
395     // Ignore file if we cannot recognize it.
396     if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
397       continue;
398     }
399 
400     bool keep = true;
401     switch (type) {
402       case kLogFile:
403         keep = ((number >= state.log_number) ||
404                 (number == state.prev_log_number) ||
405                 (log_recycle_files_set.find(number) !=
406                  log_recycle_files_set.end()));
407         break;
408       case kDescriptorFile:
409         // Keep my manifest file, and any newer incarnations'
410         // (can happen during manifest roll)
411         keep = (number >= state.manifest_file_number);
412         break;
413       case kTableFile:
414         // If the second condition is not there, this makes
415         // DontDeletePendingOutputs fail
416         keep = (sst_live_map.find(number) != sst_live_map.end()) ||
417                number >= state.min_pending_output;
418         if (!keep) {
419           files_to_del.insert(number);
420         }
421         break;
422       case kTempFile:
423         // Any temp files that are currently being written to must
424         // be recorded in pending_outputs_, which is inserted into "live".
425         // Also, SetCurrentFile creates a temp file when writing out new
426         // manifest, which is equal to state.pending_manifest_file_number. We
427         // should not delete that file
428         //
429         // TODO(yhchiang): carefully modify the third condition to safely
430         //                 remove the temp options files.
431         keep = (sst_live_map.find(number) != sst_live_map.end()) ||
432                (number == state.pending_manifest_file_number) ||
433                (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
434         break;
435       case kInfoLogFile:
436         keep = true;
437         if (number != 0) {
438           old_info_log_files.push_back(to_delete);
439         }
440         break;
441       case kOptionsFile:
442         keep = (number >= optsfile_num2);
443         TEST_SYNC_POINT_CALLBACK(
444             "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:1",
445             reinterpret_cast<void*>(&number));
446         TEST_SYNC_POINT_CALLBACK(
447             "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:2",
448             reinterpret_cast<void*>(&keep));
449         break;
450       case kCurrentFile:
451       case kDBLockFile:
452       case kIdentityFile:
453       case kMetaDatabase:
454       case kBlobFile:
455         keep = true;
456         break;
457     }
458 
459     if (keep) {
460       continue;
461     }
462 
463     std::string fname;
464     std::string dir_to_sync;
465     if (type == kTableFile) {
466       // evict from cache
467       TableCache::Evict(table_cache_.get(), number);
468       fname = MakeTableFileName(candidate_file.file_path, number);
469       dir_to_sync = candidate_file.file_path;
470     } else {
471       dir_to_sync =
472           (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_;
473       fname = dir_to_sync +
474               ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
475                        (!to_delete.empty() && to_delete.front() == '/')
476                    ? ""
477                    : "/") +
478               to_delete;
479     }
480 
481 #ifndef ROCKSDB_LITE
482     if (type == kLogFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
483                              immutable_db_options_.wal_size_limit_mb > 0)) {
484       wal_manager_.ArchiveWALFile(fname, number);
485       continue;
486     }
487 #endif  // !ROCKSDB_LITE
488 
489     // If I do not own these files, e.g. secondary instance with max_open_files
490     // = -1, then no need to delete or schedule delete these files since they
491     // will be removed by their owner, e.g. the primary instance.
492     if (!own_files) {
493       continue;
494     }
495     Status file_deletion_status;
496     if (schedule_only) {
497       InstrumentedMutexLock guard_lock(&mutex_);
498       SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
499     } else {
500       DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
501     }
502   }
503 
504   {
505     // After purging obsolete files, remove them from files_grabbed_for_purge_.
506     InstrumentedMutexLock guard_lock(&mutex_);
507     autovector<uint64_t> to_be_removed;
508     for (auto fn : files_grabbed_for_purge_) {
509       if (files_to_del.count(fn) != 0) {
510         to_be_removed.emplace_back(fn);
511       }
512     }
513     for (auto fn : to_be_removed) {
514       files_grabbed_for_purge_.erase(fn);
515     }
516   }
517 
518   // Delete old info log files.
519   size_t old_info_log_file_count = old_info_log_files.size();
520   if (old_info_log_file_count != 0 &&
521       old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
522     std::sort(old_info_log_files.begin(), old_info_log_files.end());
523     size_t end =
524         old_info_log_file_count - immutable_db_options_.keep_log_file_num;
525     for (unsigned int i = 0; i <= end; i++) {
526       std::string& to_delete = old_info_log_files.at(i);
527       std::string full_path_to_delete =
528           (immutable_db_options_.db_log_dir.empty()
529                ? dbname_
530                : immutable_db_options_.db_log_dir) +
531           "/" + to_delete;
532       ROCKS_LOG_INFO(immutable_db_options_.info_log,
533                      "[JOB %d] Delete info log file %s\n", state.job_id,
534                      full_path_to_delete.c_str());
535       Status s = env_->DeleteFile(full_path_to_delete);
536       if (!s.ok()) {
537         if (env_->FileExists(full_path_to_delete).IsNotFound()) {
538           ROCKS_LOG_INFO(
539               immutable_db_options_.info_log,
540               "[JOB %d] Tried to delete non-existing info log file %s FAILED "
541               "-- %s\n",
542               state.job_id, to_delete.c_str(), s.ToString().c_str());
543         } else {
544           ROCKS_LOG_ERROR(immutable_db_options_.info_log,
545                           "[JOB %d] Delete info log file %s FAILED -- %s\n",
546                           state.job_id, to_delete.c_str(),
547                           s.ToString().c_str());
548         }
549       }
550     }
551   }
552 #ifndef ROCKSDB_LITE
553   wal_manager_.PurgeObsoleteWALFiles();
554 #endif  // ROCKSDB_LITE
555   LogFlush(immutable_db_options_.info_log);
556   InstrumentedMutexLock l(&mutex_);
557   --pending_purge_obsolete_files_;
558   assert(pending_purge_obsolete_files_ >= 0);
559   if (pending_purge_obsolete_files_ == 0) {
560     bg_cv_.SignalAll();
561   }
562   TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
563 }
564 
DeleteObsoleteFiles()565 void DBImpl::DeleteObsoleteFiles() {
566   mutex_.AssertHeld();
567   JobContext job_context(next_job_id_.fetch_add(1));
568   FindObsoleteFiles(&job_context, true);
569 
570   mutex_.Unlock();
571   if (job_context.HaveSomethingToDelete()) {
572     PurgeObsoleteFiles(job_context);
573   }
574   job_context.Clean();
575   mutex_.Lock();
576 }
577 
FindMinPrepLogReferencedByMemTable(VersionSet * vset,const ColumnFamilyData * cfd_to_flush,const autovector<MemTable * > & memtables_to_flush)578 uint64_t FindMinPrepLogReferencedByMemTable(
579     VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
580     const autovector<MemTable*>& memtables_to_flush) {
581   uint64_t min_log = 0;
582 
583   // we must look through the memtables for two phase transactions
584   // that have been committed but not yet flushed
585   for (auto loop_cfd : *vset->GetColumnFamilySet()) {
586     if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) {
587       continue;
588     }
589 
590     auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
591         memtables_to_flush);
592 
593     if (log > 0 && (min_log == 0 || log < min_log)) {
594       min_log = log;
595     }
596 
597     log = loop_cfd->mem()->GetMinLogContainingPrepSection();
598 
599     if (log > 0 && (min_log == 0 || log < min_log)) {
600       min_log = log;
601     }
602   }
603 
604   return min_log;
605 }
606 
PrecomputeMinLogNumberToKeep(VersionSet * vset,const ColumnFamilyData & cfd_to_flush,autovector<VersionEdit * > edit_list,const autovector<MemTable * > & memtables_to_flush,LogsWithPrepTracker * prep_tracker)607 uint64_t PrecomputeMinLogNumberToKeep(
608     VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
609     autovector<VersionEdit*> edit_list,
610     const autovector<MemTable*>& memtables_to_flush,
611     LogsWithPrepTracker* prep_tracker) {
612   assert(vset != nullptr);
613   assert(prep_tracker != nullptr);
614   // Calculate updated min_log_number_to_keep
615   // Since the function should only be called in 2pc mode, log number in
616   // the version edit should be sufficient.
617 
618   // Precompute the min log number containing unflushed data for the column
619   // family being flushed (`cfd_to_flush`).
620   uint64_t cf_min_log_number_to_keep = 0;
621   for (auto& e : edit_list) {
622     if (e->HasLogNumber()) {
623       cf_min_log_number_to_keep =
624           std::max(cf_min_log_number_to_keep, e->GetLogNumber());
625     }
626   }
627   if (cf_min_log_number_to_keep == 0) {
628     // No version edit contains information on log number. The log number
629     // for this column family should stay the same as it is.
630     cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber();
631   }
632 
633   // Get min log number containing unflushed data for other column families.
634   uint64_t min_log_number_to_keep =
635       vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush);
636   if (cf_min_log_number_to_keep != 0) {
637     min_log_number_to_keep =
638         std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
639   }
640 
641   // if are 2pc we must consider logs containing prepared
642   // sections of outstanding transactions.
643   //
644   // We must check min logs with outstanding prep before we check
645   // logs references by memtables because a log referenced by the
646   // first data structure could transition to the second under us.
647   //
648   // TODO: iterating over all column families under db mutex.
649   // should find more optimal solution
650   auto min_log_in_prep_heap =
651       prep_tracker->FindMinLogContainingOutstandingPrep();
652 
653   if (min_log_in_prep_heap != 0 &&
654       min_log_in_prep_heap < min_log_number_to_keep) {
655     min_log_number_to_keep = min_log_in_prep_heap;
656   }
657 
658   uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
659       vset, &cfd_to_flush, memtables_to_flush);
660 
661   if (min_log_refed_by_mem != 0 &&
662       min_log_refed_by_mem < min_log_number_to_keep) {
663     min_log_number_to_keep = min_log_refed_by_mem;
664   }
665   return min_log_number_to_keep;
666 }
667 
CleanupFilesAfterRecovery()668 Status DBImpl::CleanupFilesAfterRecovery() {
669   mutex_.AssertHeld();
670   std::vector<std::string> paths;
671   paths.push_back(dbname_);
672   for (const auto& db_path : immutable_db_options_.db_paths) {
673     paths.push_back(db_path.path);
674   }
675   for (const auto* cfd : *versions_->GetColumnFamilySet()) {
676     for (const auto& cf_path : cfd->ioptions()->cf_paths) {
677       paths.push_back(cf_path.path);
678     }
679   }
680   // Dedup paths
681   std::sort(paths.begin(), paths.end());
682   paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
683 
684   uint64_t next_file_number = versions_->current_next_file_number();
685   uint64_t largest_file_number = next_file_number;
686   std::set<std::string> files_to_delete;
687   for (const auto& path : paths) {
688     std::vector<std::string> files;
689     env_->GetChildren(path, &files);
690     for (const auto& fname : files) {
691       uint64_t number = 0;
692       FileType type;
693       if (!ParseFileName(fname, &number, &type)) {
694         continue;
695       }
696       const std::string normalized_fpath = NormalizePath(path + fname);
697       largest_file_number = std::max(largest_file_number, number);
698       if (type == kTableFile && number >= next_file_number &&
699           files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
700         files_to_delete.insert(normalized_fpath);
701       }
702     }
703   }
704   if (largest_file_number > next_file_number) {
705     versions_->next_file_number_.store(largest_file_number + 1);
706   }
707   mutex_.Unlock();
708   Status s;
709   for (const auto& fname : files_to_delete) {
710     s = env_->DeleteFile(fname);
711     if (!s.ok()) {
712       break;
713     }
714   }
715   mutex_.Lock();
716   return s;
717 }
718 
719 }  // namespace ROCKSDB_NAMESPACE
720