1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <cinttypes>
12 #include <set>
13 #include <unordered_set>
14 #include "db/event_helpers.h"
15 #include "db/memtable_list.h"
16 #include "file/file_util.h"
17 #include "file/filename.h"
18 #include "file/sst_file_manager_impl.h"
19 #include "util/autovector.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
MinLogNumberToKeep()23 uint64_t DBImpl::MinLogNumberToKeep() {
24 if (allow_2pc()) {
25 return versions_->min_log_number_to_keep_2pc();
26 } else {
27 return versions_->MinLogNumberWithUnflushedData();
28 }
29 }
30
MinObsoleteSstNumberToKeep()31 uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
32 mutex_.AssertHeld();
33 if (!pending_outputs_.empty()) {
34 return *pending_outputs_.begin();
35 }
36 return std::numeric_limits<uint64_t>::max();
37 }
38
39 // * Returns the list of live files in 'sst_live'
40 // If it's doing full scan:
41 // * Returns the list of all files in the filesystem in
42 // 'full_scan_candidate_files'.
43 // Otherwise, gets obsolete files from VersionSet.
44 // no_full_scan = true -- never do the full scan using GetChildren()
45 // force = false -- don't force the full scan, except every
46 // mutable_db_options_.delete_obsolete_files_period_micros
47 // force = true -- force the full scan
FindObsoleteFiles(JobContext * job_context,bool force,bool no_full_scan)48 void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
49 bool no_full_scan) {
50 mutex_.AssertHeld();
51
52 // if deletion is disabled, do nothing
53 if (disable_delete_obsolete_files_ > 0) {
54 return;
55 }
56
57 bool doing_the_full_scan = false;
58
59 // logic for figuring out if we're doing the full scan
60 if (no_full_scan) {
61 doing_the_full_scan = false;
62 } else if (force ||
63 mutable_db_options_.delete_obsolete_files_period_micros == 0) {
64 doing_the_full_scan = true;
65 } else {
66 const uint64_t now_micros = env_->NowMicros();
67 if ((delete_obsolete_files_last_run_ +
68 mutable_db_options_.delete_obsolete_files_period_micros) <
69 now_micros) {
70 doing_the_full_scan = true;
71 delete_obsolete_files_last_run_ = now_micros;
72 }
73 }
74
75 // don't delete files that might be currently written to from compaction
76 // threads
77 // Since job_context->min_pending_output is set, until file scan finishes,
78 // mutex_ cannot be released. Otherwise, we might see no min_pending_output
79 // here but later find newer generated unfinalized files while scanning.
80 if (!pending_outputs_.empty()) {
81 job_context->min_pending_output = *pending_outputs_.begin();
82 } else {
83 // delete all of them
84 job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
85 }
86
87 // Get obsolete files. This function will also update the list of
88 // pending files in VersionSet().
89 versions_->GetObsoleteFiles(&job_context->sst_delete_files,
90 &job_context->manifest_delete_files,
91 job_context->min_pending_output);
92
93 // Mark the elements in job_context->sst_delete_files as grabbedForPurge
94 // so that other threads calling FindObsoleteFiles with full_scan=true
95 // will not add these files to candidate list for purge.
96 for (const auto& sst_to_del : job_context->sst_delete_files) {
97 MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
98 }
99
100 // store the current filenum, lognum, etc
101 job_context->manifest_file_number = versions_->manifest_file_number();
102 job_context->pending_manifest_file_number =
103 versions_->pending_manifest_file_number();
104 job_context->log_number = MinLogNumberToKeep();
105 job_context->prev_log_number = versions_->prev_log_number();
106
107 versions_->AddLiveFiles(&job_context->sst_live);
108 if (doing_the_full_scan) {
109 InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
110 dbname_);
111 std::set<std::string> paths;
112 for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
113 path_id++) {
114 paths.insert(immutable_db_options_.db_paths[path_id].path);
115 }
116
117 // Note that if cf_paths is not specified in the ColumnFamilyOptions
118 // of a particular column family, we use db_paths as the cf_paths
119 // setting. Hence, there can be multiple duplicates of files from db_paths
120 // in the following code. The duplicate are removed while identifying
121 // unique files in PurgeObsoleteFiles.
122 for (auto cfd : *versions_->GetColumnFamilySet()) {
123 for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size();
124 path_id++) {
125 auto& path = cfd->ioptions()->cf_paths[path_id].path;
126
127 if (paths.find(path) == paths.end()) {
128 paths.insert(path);
129 }
130 }
131 }
132
133 for (auto& path : paths) {
134 // set of all files in the directory. We'll exclude files that are still
135 // alive in the subsequent processings.
136 std::vector<std::string> files;
137 env_->GetChildren(path, &files); // Ignore errors
138 for (const std::string& file : files) {
139 uint64_t number;
140 FileType type;
141 // 1. If we cannot parse the file name, we skip;
142 // 2. If the file with file_number equals number has already been
143 // grabbed for purge by another compaction job, or it has already been
144 // schedule for purge, we also skip it if we
145 // are doing full scan in order to avoid double deletion of the same
146 // file under race conditions. See
147 // https://github.com/facebook/rocksdb/issues/3573
148 if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) ||
149 !ShouldPurge(number)) {
150 continue;
151 }
152
153 // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
154 job_context->full_scan_candidate_files.emplace_back("/" + file, path);
155 }
156 }
157
158 // Add log files in wal_dir
159 if (immutable_db_options_.wal_dir != dbname_) {
160 std::vector<std::string> log_files;
161 env_->GetChildren(immutable_db_options_.wal_dir,
162 &log_files); // Ignore errors
163 for (const std::string& log_file : log_files) {
164 job_context->full_scan_candidate_files.emplace_back(
165 log_file, immutable_db_options_.wal_dir);
166 }
167 }
168 // Add info log files in db_log_dir
169 if (!immutable_db_options_.db_log_dir.empty() &&
170 immutable_db_options_.db_log_dir != dbname_) {
171 std::vector<std::string> info_log_files;
172 // Ignore errors
173 env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
174 for (std::string& log_file : info_log_files) {
175 job_context->full_scan_candidate_files.emplace_back(
176 log_file, immutable_db_options_.db_log_dir);
177 }
178 }
179 }
180
181 // logs_ is empty when called during recovery, in which case there can't yet
182 // be any tracked obsolete logs
183 if (!alive_log_files_.empty() && !logs_.empty()) {
184 uint64_t min_log_number = job_context->log_number;
185 size_t num_alive_log_files = alive_log_files_.size();
186 // find newly obsoleted log files
187 while (alive_log_files_.begin()->number < min_log_number) {
188 auto& earliest = *alive_log_files_.begin();
189 if (immutable_db_options_.recycle_log_file_num >
190 log_recycle_files_.size()) {
191 ROCKS_LOG_INFO(immutable_db_options_.info_log,
192 "adding log %" PRIu64 " to recycle list\n",
193 earliest.number);
194 log_recycle_files_.push_back(earliest.number);
195 } else {
196 job_context->log_delete_files.push_back(earliest.number);
197 }
198 if (job_context->size_log_to_delete == 0) {
199 job_context->prev_total_log_size = total_log_size_;
200 job_context->num_alive_log_files = num_alive_log_files;
201 }
202 job_context->size_log_to_delete += earliest.size;
203 total_log_size_ -= earliest.size;
204 if (two_write_queues_) {
205 log_write_mutex_.Lock();
206 }
207 alive_log_files_.pop_front();
208 if (two_write_queues_) {
209 log_write_mutex_.Unlock();
210 }
211 // Current log should always stay alive since it can't have
212 // number < MinLogNumber().
213 assert(alive_log_files_.size());
214 }
215 while (!logs_.empty() && logs_.front().number < min_log_number) {
216 auto& log = logs_.front();
217 if (log.getting_synced) {
218 log_sync_cv_.Wait();
219 // logs_ could have changed while we were waiting.
220 continue;
221 }
222 logs_to_free_.push_back(log.ReleaseWriter());
223 {
224 InstrumentedMutexLock wl(&log_write_mutex_);
225 logs_.pop_front();
226 }
227 }
228 // Current log cannot be obsolete.
229 assert(!logs_.empty());
230 }
231
232 // We're just cleaning up for DB::Write().
233 assert(job_context->logs_to_free.empty());
234 job_context->logs_to_free = logs_to_free_;
235 job_context->log_recycle_files.assign(log_recycle_files_.begin(),
236 log_recycle_files_.end());
237 if (job_context->HaveSomethingToDelete()) {
238 ++pending_purge_obsolete_files_;
239 }
240 logs_to_free_.clear();
241 }
242
243 namespace {
CompareCandidateFile(const JobContext::CandidateFileInfo & first,const JobContext::CandidateFileInfo & second)244 bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
245 const JobContext::CandidateFileInfo& second) {
246 if (first.file_name > second.file_name) {
247 return true;
248 } else if (first.file_name < second.file_name) {
249 return false;
250 } else {
251 return (first.file_path > second.file_path);
252 }
253 }
254 }; // namespace
255
256 // Delete obsolete files and log status and information of file deletion
DeleteObsoleteFileImpl(int job_id,const std::string & fname,const std::string & path_to_sync,FileType type,uint64_t number)257 void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
258 const std::string& path_to_sync,
259 FileType type, uint64_t number) {
260 Status file_deletion_status;
261 if (type == kTableFile || type == kLogFile) {
262 file_deletion_status =
263 DeleteDBFile(&immutable_db_options_, fname, path_to_sync,
264 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path_);
265 } else {
266 file_deletion_status = env_->DeleteFile(fname);
267 }
268 TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
269 &file_deletion_status);
270 if (file_deletion_status.ok()) {
271 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
272 "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
273 fname.c_str(), type, number,
274 file_deletion_status.ToString().c_str());
275 } else if (env_->FileExists(fname).IsNotFound()) {
276 ROCKS_LOG_INFO(
277 immutable_db_options_.info_log,
278 "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
279 " -- %s\n",
280 job_id, fname.c_str(), type, number,
281 file_deletion_status.ToString().c_str());
282 } else {
283 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
284 "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
285 job_id, fname.c_str(), type, number,
286 file_deletion_status.ToString().c_str());
287 }
288 if (type == kTableFile) {
289 EventHelpers::LogAndNotifyTableFileDeletion(
290 &event_logger_, job_id, number, fname, file_deletion_status, GetName(),
291 immutable_db_options_.listeners);
292 }
293 }
294
295 // Diffs the files listed in filenames and those that do not
296 // belong to live files are possibly removed. Also, removes all the
297 // files in sst_delete_files and log_delete_files.
298 // It is not necessary to hold the mutex when invoking this method.
PurgeObsoleteFiles(JobContext & state,bool schedule_only)299 void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
300 TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
301 // we'd better have sth to delete
302 assert(state.HaveSomethingToDelete());
303
304 // FindObsoleteFiles() should've populated this so nonzero
305 assert(state.manifest_file_number != 0);
306
307 // Now, convert live list to an unordered map, WITHOUT mutex held;
308 // set is slow.
309 std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
310 for (const FileDescriptor& fd : state.sst_live) {
311 sst_live_map[fd.GetNumber()] = &fd;
312 }
313 std::unordered_set<uint64_t> log_recycle_files_set(
314 state.log_recycle_files.begin(), state.log_recycle_files.end());
315
316 auto candidate_files = state.full_scan_candidate_files;
317 candidate_files.reserve(
318 candidate_files.size() + state.sst_delete_files.size() +
319 state.log_delete_files.size() + state.manifest_delete_files.size());
320 // We may ignore the dbname when generating the file names.
321 for (auto& file : state.sst_delete_files) {
322 candidate_files.emplace_back(
323 MakeTableFileName(file.metadata->fd.GetNumber()), file.path);
324 if (file.metadata->table_reader_handle) {
325 table_cache_->Release(file.metadata->table_reader_handle);
326 }
327 file.DeleteMetadata();
328 }
329
330 for (auto file_num : state.log_delete_files) {
331 if (file_num > 0) {
332 candidate_files.emplace_back(LogFileName(file_num),
333 immutable_db_options_.wal_dir);
334 }
335 }
336 for (const auto& filename : state.manifest_delete_files) {
337 candidate_files.emplace_back(filename, dbname_);
338 }
339
340 // dedup state.candidate_files so we don't try to delete the same
341 // file twice
342 std::sort(candidate_files.begin(), candidate_files.end(),
343 CompareCandidateFile);
344 candidate_files.erase(
345 std::unique(candidate_files.begin(), candidate_files.end()),
346 candidate_files.end());
347
348 if (state.prev_total_log_size > 0) {
349 ROCKS_LOG_INFO(immutable_db_options_.info_log,
350 "[JOB %d] Try to delete WAL files size %" PRIu64
351 ", prev total WAL file size %" PRIu64
352 ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
353 state.job_id, state.size_log_to_delete,
354 state.prev_total_log_size, state.num_alive_log_files);
355 }
356
357 std::vector<std::string> old_info_log_files;
358 InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
359 dbname_);
360
361 // File numbers of most recent two OPTIONS file in candidate_files (found in
362 // previos FindObsoleteFiles(full_scan=true))
363 // At this point, there must not be any duplicate file numbers in
364 // candidate_files.
365 uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
366 uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min();
367 for (const auto& candidate_file : candidate_files) {
368 const std::string& fname = candidate_file.file_name;
369 uint64_t number;
370 FileType type;
371 if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) ||
372 type != kOptionsFile) {
373 continue;
374 }
375 if (number > optsfile_num1) {
376 optsfile_num2 = optsfile_num1;
377 optsfile_num1 = number;
378 } else if (number > optsfile_num2) {
379 optsfile_num2 = number;
380 }
381 }
382
383 // Close WALs before trying to delete them.
384 for (const auto w : state.logs_to_free) {
385 // TODO: maybe check the return value of Close.
386 w->Close();
387 }
388
389 bool own_files = OwnTablesAndLogs();
390 std::unordered_set<uint64_t> files_to_del;
391 for (const auto& candidate_file : candidate_files) {
392 const std::string& to_delete = candidate_file.file_name;
393 uint64_t number;
394 FileType type;
395 // Ignore file if we cannot recognize it.
396 if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
397 continue;
398 }
399
400 bool keep = true;
401 switch (type) {
402 case kLogFile:
403 keep = ((number >= state.log_number) ||
404 (number == state.prev_log_number) ||
405 (log_recycle_files_set.find(number) !=
406 log_recycle_files_set.end()));
407 break;
408 case kDescriptorFile:
409 // Keep my manifest file, and any newer incarnations'
410 // (can happen during manifest roll)
411 keep = (number >= state.manifest_file_number);
412 break;
413 case kTableFile:
414 // If the second condition is not there, this makes
415 // DontDeletePendingOutputs fail
416 keep = (sst_live_map.find(number) != sst_live_map.end()) ||
417 number >= state.min_pending_output;
418 if (!keep) {
419 files_to_del.insert(number);
420 }
421 break;
422 case kTempFile:
423 // Any temp files that are currently being written to must
424 // be recorded in pending_outputs_, which is inserted into "live".
425 // Also, SetCurrentFile creates a temp file when writing out new
426 // manifest, which is equal to state.pending_manifest_file_number. We
427 // should not delete that file
428 //
429 // TODO(yhchiang): carefully modify the third condition to safely
430 // remove the temp options files.
431 keep = (sst_live_map.find(number) != sst_live_map.end()) ||
432 (number == state.pending_manifest_file_number) ||
433 (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
434 break;
435 case kInfoLogFile:
436 keep = true;
437 if (number != 0) {
438 old_info_log_files.push_back(to_delete);
439 }
440 break;
441 case kOptionsFile:
442 keep = (number >= optsfile_num2);
443 TEST_SYNC_POINT_CALLBACK(
444 "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:1",
445 reinterpret_cast<void*>(&number));
446 TEST_SYNC_POINT_CALLBACK(
447 "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:2",
448 reinterpret_cast<void*>(&keep));
449 break;
450 case kCurrentFile:
451 case kDBLockFile:
452 case kIdentityFile:
453 case kMetaDatabase:
454 case kBlobFile:
455 keep = true;
456 break;
457 }
458
459 if (keep) {
460 continue;
461 }
462
463 std::string fname;
464 std::string dir_to_sync;
465 if (type == kTableFile) {
466 // evict from cache
467 TableCache::Evict(table_cache_.get(), number);
468 fname = MakeTableFileName(candidate_file.file_path, number);
469 dir_to_sync = candidate_file.file_path;
470 } else {
471 dir_to_sync =
472 (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_;
473 fname = dir_to_sync +
474 ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
475 (!to_delete.empty() && to_delete.front() == '/')
476 ? ""
477 : "/") +
478 to_delete;
479 }
480
481 #ifndef ROCKSDB_LITE
482 if (type == kLogFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
483 immutable_db_options_.wal_size_limit_mb > 0)) {
484 wal_manager_.ArchiveWALFile(fname, number);
485 continue;
486 }
487 #endif // !ROCKSDB_LITE
488
489 // If I do not own these files, e.g. secondary instance with max_open_files
490 // = -1, then no need to delete or schedule delete these files since they
491 // will be removed by their owner, e.g. the primary instance.
492 if (!own_files) {
493 continue;
494 }
495 Status file_deletion_status;
496 if (schedule_only) {
497 InstrumentedMutexLock guard_lock(&mutex_);
498 SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
499 } else {
500 DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
501 }
502 }
503
504 {
505 // After purging obsolete files, remove them from files_grabbed_for_purge_.
506 InstrumentedMutexLock guard_lock(&mutex_);
507 autovector<uint64_t> to_be_removed;
508 for (auto fn : files_grabbed_for_purge_) {
509 if (files_to_del.count(fn) != 0) {
510 to_be_removed.emplace_back(fn);
511 }
512 }
513 for (auto fn : to_be_removed) {
514 files_grabbed_for_purge_.erase(fn);
515 }
516 }
517
518 // Delete old info log files.
519 size_t old_info_log_file_count = old_info_log_files.size();
520 if (old_info_log_file_count != 0 &&
521 old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
522 std::sort(old_info_log_files.begin(), old_info_log_files.end());
523 size_t end =
524 old_info_log_file_count - immutable_db_options_.keep_log_file_num;
525 for (unsigned int i = 0; i <= end; i++) {
526 std::string& to_delete = old_info_log_files.at(i);
527 std::string full_path_to_delete =
528 (immutable_db_options_.db_log_dir.empty()
529 ? dbname_
530 : immutable_db_options_.db_log_dir) +
531 "/" + to_delete;
532 ROCKS_LOG_INFO(immutable_db_options_.info_log,
533 "[JOB %d] Delete info log file %s\n", state.job_id,
534 full_path_to_delete.c_str());
535 Status s = env_->DeleteFile(full_path_to_delete);
536 if (!s.ok()) {
537 if (env_->FileExists(full_path_to_delete).IsNotFound()) {
538 ROCKS_LOG_INFO(
539 immutable_db_options_.info_log,
540 "[JOB %d] Tried to delete non-existing info log file %s FAILED "
541 "-- %s\n",
542 state.job_id, to_delete.c_str(), s.ToString().c_str());
543 } else {
544 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
545 "[JOB %d] Delete info log file %s FAILED -- %s\n",
546 state.job_id, to_delete.c_str(),
547 s.ToString().c_str());
548 }
549 }
550 }
551 }
552 #ifndef ROCKSDB_LITE
553 wal_manager_.PurgeObsoleteWALFiles();
554 #endif // ROCKSDB_LITE
555 LogFlush(immutable_db_options_.info_log);
556 InstrumentedMutexLock l(&mutex_);
557 --pending_purge_obsolete_files_;
558 assert(pending_purge_obsolete_files_ >= 0);
559 if (pending_purge_obsolete_files_ == 0) {
560 bg_cv_.SignalAll();
561 }
562 TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
563 }
564
DeleteObsoleteFiles()565 void DBImpl::DeleteObsoleteFiles() {
566 mutex_.AssertHeld();
567 JobContext job_context(next_job_id_.fetch_add(1));
568 FindObsoleteFiles(&job_context, true);
569
570 mutex_.Unlock();
571 if (job_context.HaveSomethingToDelete()) {
572 PurgeObsoleteFiles(job_context);
573 }
574 job_context.Clean();
575 mutex_.Lock();
576 }
577
FindMinPrepLogReferencedByMemTable(VersionSet * vset,const ColumnFamilyData * cfd_to_flush,const autovector<MemTable * > & memtables_to_flush)578 uint64_t FindMinPrepLogReferencedByMemTable(
579 VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
580 const autovector<MemTable*>& memtables_to_flush) {
581 uint64_t min_log = 0;
582
583 // we must look through the memtables for two phase transactions
584 // that have been committed but not yet flushed
585 for (auto loop_cfd : *vset->GetColumnFamilySet()) {
586 if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) {
587 continue;
588 }
589
590 auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
591 memtables_to_flush);
592
593 if (log > 0 && (min_log == 0 || log < min_log)) {
594 min_log = log;
595 }
596
597 log = loop_cfd->mem()->GetMinLogContainingPrepSection();
598
599 if (log > 0 && (min_log == 0 || log < min_log)) {
600 min_log = log;
601 }
602 }
603
604 return min_log;
605 }
606
PrecomputeMinLogNumberToKeep(VersionSet * vset,const ColumnFamilyData & cfd_to_flush,autovector<VersionEdit * > edit_list,const autovector<MemTable * > & memtables_to_flush,LogsWithPrepTracker * prep_tracker)607 uint64_t PrecomputeMinLogNumberToKeep(
608 VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
609 autovector<VersionEdit*> edit_list,
610 const autovector<MemTable*>& memtables_to_flush,
611 LogsWithPrepTracker* prep_tracker) {
612 assert(vset != nullptr);
613 assert(prep_tracker != nullptr);
614 // Calculate updated min_log_number_to_keep
615 // Since the function should only be called in 2pc mode, log number in
616 // the version edit should be sufficient.
617
618 // Precompute the min log number containing unflushed data for the column
619 // family being flushed (`cfd_to_flush`).
620 uint64_t cf_min_log_number_to_keep = 0;
621 for (auto& e : edit_list) {
622 if (e->HasLogNumber()) {
623 cf_min_log_number_to_keep =
624 std::max(cf_min_log_number_to_keep, e->GetLogNumber());
625 }
626 }
627 if (cf_min_log_number_to_keep == 0) {
628 // No version edit contains information on log number. The log number
629 // for this column family should stay the same as it is.
630 cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber();
631 }
632
633 // Get min log number containing unflushed data for other column families.
634 uint64_t min_log_number_to_keep =
635 vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush);
636 if (cf_min_log_number_to_keep != 0) {
637 min_log_number_to_keep =
638 std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
639 }
640
641 // if are 2pc we must consider logs containing prepared
642 // sections of outstanding transactions.
643 //
644 // We must check min logs with outstanding prep before we check
645 // logs references by memtables because a log referenced by the
646 // first data structure could transition to the second under us.
647 //
648 // TODO: iterating over all column families under db mutex.
649 // should find more optimal solution
650 auto min_log_in_prep_heap =
651 prep_tracker->FindMinLogContainingOutstandingPrep();
652
653 if (min_log_in_prep_heap != 0 &&
654 min_log_in_prep_heap < min_log_number_to_keep) {
655 min_log_number_to_keep = min_log_in_prep_heap;
656 }
657
658 uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
659 vset, &cfd_to_flush, memtables_to_flush);
660
661 if (min_log_refed_by_mem != 0 &&
662 min_log_refed_by_mem < min_log_number_to_keep) {
663 min_log_number_to_keep = min_log_refed_by_mem;
664 }
665 return min_log_number_to_keep;
666 }
667
CleanupFilesAfterRecovery()668 Status DBImpl::CleanupFilesAfterRecovery() {
669 mutex_.AssertHeld();
670 std::vector<std::string> paths;
671 paths.push_back(dbname_);
672 for (const auto& db_path : immutable_db_options_.db_paths) {
673 paths.push_back(db_path.path);
674 }
675 for (const auto* cfd : *versions_->GetColumnFamilySet()) {
676 for (const auto& cf_path : cfd->ioptions()->cf_paths) {
677 paths.push_back(cf_path.path);
678 }
679 }
680 // Dedup paths
681 std::sort(paths.begin(), paths.end());
682 paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
683
684 uint64_t next_file_number = versions_->current_next_file_number();
685 uint64_t largest_file_number = next_file_number;
686 std::set<std::string> files_to_delete;
687 for (const auto& path : paths) {
688 std::vector<std::string> files;
689 env_->GetChildren(path, &files);
690 for (const auto& fname : files) {
691 uint64_t number = 0;
692 FileType type;
693 if (!ParseFileName(fname, &number, &type)) {
694 continue;
695 }
696 const std::string normalized_fpath = NormalizePath(path + fname);
697 largest_file_number = std::max(largest_file_number, number);
698 if (type == kTableFile && number >= next_file_number &&
699 files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
700 files_to_delete.insert(normalized_fpath);
701 }
702 }
703 }
704 if (largest_file_number > next_file_number) {
705 versions_->next_file_number_.store(largest_file_number + 1);
706 }
707 mutex_.Unlock();
708 Status s;
709 for (const auto& fname : files_to_delete) {
710 s = env_->DeleteFile(fname);
711 if (!s.ok()) {
712 break;
713 }
714 }
715 mutex_.Lock();
716 return s;
717 }
718
719 } // namespace ROCKSDB_NAMESPACE
720