1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <cinttypes>
12
13 #include "db/builder.h"
14 #include "db/error_handler.h"
15 #include "db/event_helpers.h"
16 #include "file/sst_file_manager_impl.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "monitoring/perf_context_imp.h"
19 #include "monitoring/thread_status_updater.h"
20 #include "monitoring/thread_status_util.h"
21 #include "test_util/sync_point.h"
22 #include "util/cast_util.h"
23 #include "util/concurrent_task_limiter_impl.h"
24
25 namespace ROCKSDB_NAMESPACE {
26
EnoughRoomForCompaction(ColumnFamilyData * cfd,const std::vector<CompactionInputFiles> & inputs,bool * sfm_reserved_compact_space,LogBuffer * log_buffer)27 bool DBImpl::EnoughRoomForCompaction(
28 ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
29 bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
30 // Check if we have enough room to do the compaction
31 bool enough_room = true;
32 #ifndef ROCKSDB_LITE
33 auto sfm = static_cast<SstFileManagerImpl*>(
34 immutable_db_options_.sst_file_manager.get());
35 if (sfm) {
36 // Pass the current bg_error_ to SFM so it can decide what checks to
37 // perform. If this DB instance hasn't seen any error yet, the SFM can be
38 // optimistic and not do disk space checks
39 enough_room =
40 sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError());
41 if (enough_room) {
42 *sfm_reserved_compact_space = true;
43 }
44 }
45 #else
46 (void)cfd;
47 (void)inputs;
48 (void)sfm_reserved_compact_space;
49 #endif // ROCKSDB_LITE
50 if (!enough_room) {
51 // Just in case tests want to change the value of enough_room
52 TEST_SYNC_POINT_CALLBACK(
53 "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
54 ROCKS_LOG_BUFFER(log_buffer,
55 "Cancelled compaction because not enough room");
56 RecordTick(stats_, COMPACTION_CANCELLED, 1);
57 }
58 return enough_room;
59 }
60
RequestCompactionToken(ColumnFamilyData * cfd,bool force,std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)61 bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
62 std::unique_ptr<TaskLimiterToken>* token,
63 LogBuffer* log_buffer) {
64 assert(*token == nullptr);
65 auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
66 cfd->ioptions()->compaction_thread_limiter.get());
67 if (limiter == nullptr) {
68 return true;
69 }
70 *token = limiter->GetToken(force);
71 if (*token != nullptr) {
72 ROCKS_LOG_BUFFER(log_buffer,
73 "Thread limiter [%s] increase [%s] compaction task, "
74 "force: %s, tasks after: %d",
75 limiter->GetName().c_str(), cfd->GetName().c_str(),
76 force ? "true" : "false", limiter->GetOutstandingTask());
77 return true;
78 }
79 return false;
80 }
81
SyncClosedLogs(JobContext * job_context)82 IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
83 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
84 mutex_.AssertHeld();
85 autovector<log::Writer*, 1> logs_to_sync;
86 uint64_t current_log_number = logfile_number_;
87 while (logs_.front().number < current_log_number &&
88 logs_.front().getting_synced) {
89 log_sync_cv_.Wait();
90 }
91 for (auto it = logs_.begin();
92 it != logs_.end() && it->number < current_log_number; ++it) {
93 auto& log = *it;
94 assert(!log.getting_synced);
95 log.getting_synced = true;
96 logs_to_sync.push_back(log.writer);
97 }
98
99 IOStatus io_s;
100 if (!logs_to_sync.empty()) {
101 mutex_.Unlock();
102
103 for (log::Writer* log : logs_to_sync) {
104 ROCKS_LOG_INFO(immutable_db_options_.info_log,
105 "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
106 log->get_log_number());
107 io_s = log->file()->Sync(immutable_db_options_.use_fsync);
108 if (!io_s.ok()) {
109 break;
110 }
111
112 if (immutable_db_options_.recycle_log_file_num > 0) {
113 io_s = log->Close();
114 if (!io_s.ok()) {
115 break;
116 }
117 }
118 }
119 if (io_s.ok()) {
120 io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
121 }
122
123 mutex_.Lock();
124
125 // "number <= current_log_number - 1" is equivalent to
126 // "number < current_log_number".
127 MarkLogsSynced(current_log_number - 1, true, io_s);
128 if (!io_s.ok()) {
129 error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
130 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
131 return io_s;
132 }
133 }
134 return io_s;
135 }
136
FlushMemTableToOutputFile(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,bool * made_progress,JobContext * job_context,SuperVersionContext * superversion_context,std::vector<SequenceNumber> & snapshot_seqs,SequenceNumber earliest_write_conflict_snapshot,SnapshotChecker * snapshot_checker,LogBuffer * log_buffer,Env::Priority thread_pri)137 Status DBImpl::FlushMemTableToOutputFile(
138 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
139 bool* made_progress, JobContext* job_context,
140 SuperVersionContext* superversion_context,
141 std::vector<SequenceNumber>& snapshot_seqs,
142 SequenceNumber earliest_write_conflict_snapshot,
143 SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
144 Env::Priority thread_pri) {
145 mutex_.AssertHeld();
146 assert(cfd->imm()->NumNotFlushed() != 0);
147 assert(cfd->imm()->IsFlushPending());
148
149 FlushJob flush_job(
150 dbname_, cfd, immutable_db_options_, mutable_cf_options,
151 nullptr /* memtable_id */, file_options_for_compaction_, versions_.get(),
152 &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
153 snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
154 GetDataDir(cfd, 0U),
155 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
156 &event_logger_, mutable_cf_options.report_bg_io_stats,
157 true /* sync_output_directory */, true /* write_manifest */, thread_pri);
158 FileMetaData file_meta;
159
160 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
161 flush_job.PickMemTable();
162 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
163
164 #ifndef ROCKSDB_LITE
165 // may temporarily unlock and lock the mutex.
166 NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
167 #endif // ROCKSDB_LITE
168
169 Status s;
170 IOStatus io_s;
171 if (logfile_number_ > 0 &&
172 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
173 // If there are more than one column families, we need to make sure that
174 // all the log files except the most recent one are synced. Otherwise if
175 // the host crashes after flushing and before WAL is persistent, the
176 // flushed SST may contain data from write batches whose updates to
177 // other column families are missing.
178 // SyncClosedLogs() may unlock and re-lock the db_mutex.
179 io_s = SyncClosedLogs(job_context);
180 s = io_s;
181 } else {
182 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
183 }
184
185 // Within flush_job.Run, rocksdb may call event listener to notify
186 // file creation and deletion.
187 //
188 // Note that flush_job.Run will unlock and lock the db_mutex,
189 // and EventListener callback will be called when the db_mutex
190 // is unlocked by the current thread.
191 if (s.ok()) {
192 s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
193 } else {
194 flush_job.Cancel();
195 }
196 io_s = flush_job.io_status();
197
198 if (s.ok()) {
199 InstallSuperVersionAndScheduleWork(cfd, superversion_context,
200 mutable_cf_options);
201 if (made_progress) {
202 *made_progress = true;
203 }
204 VersionStorageInfo::LevelSummaryStorage tmp;
205 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
206 cfd->GetName().c_str(),
207 cfd->current()->storage_info()->LevelSummary(&tmp));
208 }
209
210 if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
211 if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
212 !io_s.IsColumnFamilyDropped()) {
213 error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
214 } else {
215 Status new_bg_error = s;
216 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
217 }
218 }
219 if (s.ok()) {
220 #ifndef ROCKSDB_LITE
221 // may temporarily unlock and lock the mutex.
222 NotifyOnFlushCompleted(cfd, mutable_cf_options,
223 flush_job.GetCommittedFlushJobsInfo());
224 auto sfm = static_cast<SstFileManagerImpl*>(
225 immutable_db_options_.sst_file_manager.get());
226 if (sfm) {
227 // Notify sst_file_manager that a new file was added
228 std::string file_path = MakeTableFileName(
229 cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
230 sfm->OnAddFile(file_path);
231 if (sfm->IsMaxAllowedSpaceReached()) {
232 Status new_bg_error =
233 Status::SpaceLimit("Max allowed space was reached");
234 TEST_SYNC_POINT_CALLBACK(
235 "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
236 &new_bg_error);
237 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
238 }
239 }
240 #endif // ROCKSDB_LITE
241 }
242 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
243 return s;
244 }
245
FlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)246 Status DBImpl::FlushMemTablesToOutputFiles(
247 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
248 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
249 if (immutable_db_options_.atomic_flush) {
250 return AtomicFlushMemTablesToOutputFiles(
251 bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
252 }
253 std::vector<SequenceNumber> snapshot_seqs;
254 SequenceNumber earliest_write_conflict_snapshot;
255 SnapshotChecker* snapshot_checker;
256 GetSnapshotContext(job_context, &snapshot_seqs,
257 &earliest_write_conflict_snapshot, &snapshot_checker);
258 Status status;
259 for (auto& arg : bg_flush_args) {
260 ColumnFamilyData* cfd = arg.cfd_;
261 MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
262 SuperVersionContext* superversion_context = arg.superversion_context_;
263 Status s = FlushMemTableToOutputFile(
264 cfd, mutable_cf_options, made_progress, job_context,
265 superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
266 snapshot_checker, log_buffer, thread_pri);
267 if (!s.ok()) {
268 status = s;
269 if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
270 // At this point, DB is not shutting down, nor is cfd dropped.
271 // Something is wrong, thus we break out of the loop.
272 break;
273 }
274 }
275 }
276 return status;
277 }
278
279 /*
280 * Atomically flushes multiple column families.
281 *
282 * For each column family, all memtables with ID smaller than or equal to the
283 * ID specified in bg_flush_args will be flushed. Only after all column
284 * families finish flush will this function commit to MANIFEST. If any of the
285 * column families are not flushed successfully, this function does not have
286 * any side-effect on the state of the database.
287 */
AtomicFlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)288 Status DBImpl::AtomicFlushMemTablesToOutputFiles(
289 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
290 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
291 mutex_.AssertHeld();
292
293 autovector<ColumnFamilyData*> cfds;
294 for (const auto& arg : bg_flush_args) {
295 cfds.emplace_back(arg.cfd_);
296 }
297
298 #ifndef NDEBUG
299 for (const auto cfd : cfds) {
300 assert(cfd->imm()->NumNotFlushed() != 0);
301 assert(cfd->imm()->IsFlushPending());
302 }
303 #endif /* !NDEBUG */
304
305 std::vector<SequenceNumber> snapshot_seqs;
306 SequenceNumber earliest_write_conflict_snapshot;
307 SnapshotChecker* snapshot_checker;
308 GetSnapshotContext(job_context, &snapshot_seqs,
309 &earliest_write_conflict_snapshot, &snapshot_checker);
310
311 autovector<FSDirectory*> distinct_output_dirs;
312 autovector<std::string> distinct_output_dir_paths;
313 std::vector<std::unique_ptr<FlushJob>> jobs;
314 std::vector<MutableCFOptions> all_mutable_cf_options;
315 int num_cfs = static_cast<int>(cfds.size());
316 all_mutable_cf_options.reserve(num_cfs);
317 for (int i = 0; i < num_cfs; ++i) {
318 auto cfd = cfds[i];
319 FSDirectory* data_dir = GetDataDir(cfd, 0U);
320 const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
321
322 // Add to distinct output directories if eligible. Use linear search. Since
323 // the number of elements in the vector is not large, performance should be
324 // tolerable.
325 bool found = false;
326 for (const auto& path : distinct_output_dir_paths) {
327 if (path == curr_path) {
328 found = true;
329 break;
330 }
331 }
332 if (!found) {
333 distinct_output_dir_paths.emplace_back(curr_path);
334 distinct_output_dirs.emplace_back(data_dir);
335 }
336
337 all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
338 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
339 const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
340 jobs.emplace_back(new FlushJob(
341 dbname_, cfd, immutable_db_options_, mutable_cf_options,
342 max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
343 &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
344 snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
345 data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
346 stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
347 false /* sync_output_directory */, false /* write_manifest */,
348 thread_pri));
349 jobs.back()->PickMemTable();
350 }
351
352 std::vector<FileMetaData> file_meta(num_cfs);
353 Status s;
354 IOStatus io_s;
355 assert(num_cfs == static_cast<int>(jobs.size()));
356
357 #ifndef ROCKSDB_LITE
358 for (int i = 0; i != num_cfs; ++i) {
359 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
360 // may temporarily unlock and lock the mutex.
361 NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
362 job_context->job_id);
363 }
364 #endif /* !ROCKSDB_LITE */
365
366 if (logfile_number_ > 0) {
367 // TODO (yanqin) investigate whether we should sync the closed logs for
368 // single column family case.
369 io_s = SyncClosedLogs(job_context);
370 s = io_s;
371 }
372
373 // exec_status stores the execution status of flush_jobs as
374 // <bool /* executed */, Status /* status code */>
375 autovector<std::pair<bool, Status>> exec_status;
376 autovector<IOStatus> io_status;
377 for (int i = 0; i != num_cfs; ++i) {
378 // Initially all jobs are not executed, with status OK.
379 exec_status.emplace_back(false, Status::OK());
380 io_status.emplace_back(IOStatus::OK());
381 }
382
383 if (s.ok()) {
384 // TODO (yanqin): parallelize jobs with threads.
385 for (int i = 1; i != num_cfs; ++i) {
386 exec_status[i].second =
387 jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
388 exec_status[i].first = true;
389 io_status[i] = jobs[i]->io_status();
390 }
391 if (num_cfs > 1) {
392 TEST_SYNC_POINT(
393 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
394 TEST_SYNC_POINT(
395 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
396 }
397 assert(exec_status.size() > 0);
398 assert(!file_meta.empty());
399 exec_status[0].second =
400 jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
401 exec_status[0].first = true;
402 io_status[0] = jobs[0]->io_status();
403
404 Status error_status;
405 for (const auto& e : exec_status) {
406 if (!e.second.ok()) {
407 s = e.second;
408 if (!e.second.IsShutdownInProgress() &&
409 !e.second.IsColumnFamilyDropped()) {
410 // If a flush job did not return OK, and the CF is not dropped, and
411 // the DB is not shutting down, then we have to return this result to
412 // caller later.
413 error_status = e.second;
414 }
415 }
416 }
417
418 s = error_status.ok() ? s : error_status;
419 }
420
421 if (io_s.ok()) {
422 IOStatus io_error = IOStatus::OK();
423 for (int i = 0; i != static_cast<int>(io_status.size()); i++) {
424 if (!io_status[i].ok() && !io_status[i].IsShutdownInProgress() &&
425 !io_status[i].IsColumnFamilyDropped()) {
426 io_error = io_status[i];
427 }
428 }
429 io_s = io_error;
430 if (s.ok() && !io_s.ok()) {
431 s = io_s;
432 }
433 }
434
435 if (s.IsColumnFamilyDropped()) {
436 s = Status::OK();
437 }
438
439 if (s.ok() || s.IsShutdownInProgress()) {
440 // Sync on all distinct output directories.
441 for (auto dir : distinct_output_dirs) {
442 if (dir != nullptr) {
443 Status error_status = dir->Fsync(IOOptions(), nullptr);
444 if (!error_status.ok()) {
445 s = error_status;
446 break;
447 }
448 }
449 }
450 } else {
451 // Need to undo atomic flush if something went wrong, i.e. s is not OK and
452 // it is not because of CF drop.
453 // Have to cancel the flush jobs that have NOT executed because we need to
454 // unref the versions.
455 for (int i = 0; i != num_cfs; ++i) {
456 if (!exec_status[i].first) {
457 jobs[i]->Cancel();
458 }
459 }
460 for (int i = 0; i != num_cfs; ++i) {
461 if (exec_status[i].first && exec_status[i].second.ok()) {
462 auto& mems = jobs[i]->GetMemTables();
463 cfds[i]->imm()->RollbackMemtableFlush(mems,
464 file_meta[i].fd.GetNumber());
465 }
466 }
467 }
468
469 if (s.ok()) {
470 auto wait_to_install_func = [&]() {
471 bool ready = true;
472 for (size_t i = 0; i != cfds.size(); ++i) {
473 const auto& mems = jobs[i]->GetMemTables();
474 if (cfds[i]->IsDropped()) {
475 // If the column family is dropped, then do not wait.
476 continue;
477 } else if (!mems.empty() &&
478 cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
479 // If a flush job needs to install the flush result for mems and
480 // mems[0] is not the earliest memtable, it means another thread must
481 // be installing flush results for the same column family, then the
482 // current thread needs to wait.
483 ready = false;
484 break;
485 } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
486 bg_flush_args[i].max_memtable_id_) {
487 // If a flush job does not need to install flush results, then it has
488 // to wait until all memtables up to max_memtable_id_ (inclusive) are
489 // installed.
490 ready = false;
491 break;
492 }
493 }
494 return ready;
495 };
496
497 bool resuming_from_bg_err = error_handler_.IsDBStopped();
498 while ((!error_handler_.IsDBStopped() ||
499 error_handler_.GetRecoveryError().ok()) &&
500 !wait_to_install_func()) {
501 atomic_flush_install_cv_.Wait();
502 }
503
504 s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
505 : error_handler_.GetBGError();
506 }
507
508 if (s.ok()) {
509 autovector<ColumnFamilyData*> tmp_cfds;
510 autovector<const autovector<MemTable*>*> mems_list;
511 autovector<const MutableCFOptions*> mutable_cf_options_list;
512 autovector<FileMetaData*> tmp_file_meta;
513 for (int i = 0; i != num_cfs; ++i) {
514 const auto& mems = jobs[i]->GetMemTables();
515 if (!cfds[i]->IsDropped() && !mems.empty()) {
516 tmp_cfds.emplace_back(cfds[i]);
517 mems_list.emplace_back(&mems);
518 mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
519 tmp_file_meta.emplace_back(&file_meta[i]);
520 }
521 }
522
523 s = InstallMemtableAtomicFlushResults(
524 nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
525 versions_.get(), &mutex_, tmp_file_meta,
526 &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
527 }
528
529 if (s.ok()) {
530 assert(num_cfs ==
531 static_cast<int>(job_context->superversion_contexts.size()));
532 for (int i = 0; i != num_cfs; ++i) {
533 if (cfds[i]->IsDropped()) {
534 continue;
535 }
536 InstallSuperVersionAndScheduleWork(cfds[i],
537 &job_context->superversion_contexts[i],
538 all_mutable_cf_options[i]);
539 VersionStorageInfo::LevelSummaryStorage tmp;
540 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
541 cfds[i]->GetName().c_str(),
542 cfds[i]->current()->storage_info()->LevelSummary(&tmp));
543 }
544 if (made_progress) {
545 *made_progress = true;
546 }
547 #ifndef ROCKSDB_LITE
548 auto sfm = static_cast<SstFileManagerImpl*>(
549 immutable_db_options_.sst_file_manager.get());
550 assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
551 for (int i = 0; i != num_cfs; ++i) {
552 if (cfds[i]->IsDropped()) {
553 continue;
554 }
555 NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
556 jobs[i]->GetCommittedFlushJobsInfo());
557 if (sfm) {
558 std::string file_path = MakeTableFileName(
559 cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
560 sfm->OnAddFile(file_path);
561 if (sfm->IsMaxAllowedSpaceReached() &&
562 error_handler_.GetBGError().ok()) {
563 Status new_bg_error =
564 Status::SpaceLimit("Max allowed space was reached");
565 error_handler_.SetBGError(new_bg_error,
566 BackgroundErrorReason::kFlush);
567 }
568 }
569 }
570 #endif // ROCKSDB_LITE
571 }
572
573 // Need to undo atomic flush if something went wrong, i.e. s is not OK and
574 // it is not because of CF drop.
575 if (!s.ok() && !s.IsColumnFamilyDropped()) {
576 if (!io_s.ok() && io_s.IsColumnFamilyDropped()) {
577 error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
578 } else {
579 Status new_bg_error = s;
580 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
581 }
582 }
583
584 return s;
585 }
586
NotifyOnFlushBegin(ColumnFamilyData * cfd,FileMetaData * file_meta,const MutableCFOptions & mutable_cf_options,int job_id)587 void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
588 const MutableCFOptions& mutable_cf_options,
589 int job_id) {
590 #ifndef ROCKSDB_LITE
591 if (immutable_db_options_.listeners.size() == 0U) {
592 return;
593 }
594 mutex_.AssertHeld();
595 if (shutting_down_.load(std::memory_order_acquire)) {
596 return;
597 }
598 bool triggered_writes_slowdown =
599 (cfd->current()->storage_info()->NumLevelFiles(0) >=
600 mutable_cf_options.level0_slowdown_writes_trigger);
601 bool triggered_writes_stop =
602 (cfd->current()->storage_info()->NumLevelFiles(0) >=
603 mutable_cf_options.level0_stop_writes_trigger);
604 // release lock while notifying events
605 mutex_.Unlock();
606 {
607 FlushJobInfo info{};
608 info.cf_id = cfd->GetID();
609 info.cf_name = cfd->GetName();
610 // TODO(yhchiang): make db_paths dynamic in case flush does not
611 // go to L0 in the future.
612 const uint64_t file_number = file_meta->fd.GetNumber();
613 info.file_path =
614 MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
615 info.file_number = file_number;
616 info.thread_id = env_->GetThreadID();
617 info.job_id = job_id;
618 info.triggered_writes_slowdown = triggered_writes_slowdown;
619 info.triggered_writes_stop = triggered_writes_stop;
620 info.smallest_seqno = file_meta->fd.smallest_seqno;
621 info.largest_seqno = file_meta->fd.largest_seqno;
622 info.flush_reason = cfd->GetFlushReason();
623 for (auto listener : immutable_db_options_.listeners) {
624 listener->OnFlushBegin(this, info);
625 }
626 }
627 mutex_.Lock();
628 // no need to signal bg_cv_ as it will be signaled at the end of the
629 // flush process.
630 #else
631 (void)cfd;
632 (void)file_meta;
633 (void)mutable_cf_options;
634 (void)job_id;
635 #endif // ROCKSDB_LITE
636 }
637
NotifyOnFlushCompleted(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,std::list<std::unique_ptr<FlushJobInfo>> * flush_jobs_info)638 void DBImpl::NotifyOnFlushCompleted(
639 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
640 std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
641 #ifndef ROCKSDB_LITE
642 assert(flush_jobs_info != nullptr);
643 if (immutable_db_options_.listeners.size() == 0U) {
644 return;
645 }
646 mutex_.AssertHeld();
647 if (shutting_down_.load(std::memory_order_acquire)) {
648 return;
649 }
650 bool triggered_writes_slowdown =
651 (cfd->current()->storage_info()->NumLevelFiles(0) >=
652 mutable_cf_options.level0_slowdown_writes_trigger);
653 bool triggered_writes_stop =
654 (cfd->current()->storage_info()->NumLevelFiles(0) >=
655 mutable_cf_options.level0_stop_writes_trigger);
656 // release lock while notifying events
657 mutex_.Unlock();
658 {
659 for (auto& info : *flush_jobs_info) {
660 info->triggered_writes_slowdown = triggered_writes_slowdown;
661 info->triggered_writes_stop = triggered_writes_stop;
662 for (auto listener : immutable_db_options_.listeners) {
663 listener->OnFlushCompleted(this, *info);
664 }
665 }
666 flush_jobs_info->clear();
667 }
668 mutex_.Lock();
669 // no need to signal bg_cv_ as it will be signaled at the end of the
670 // flush process.
671 #else
672 (void)cfd;
673 (void)mutable_cf_options;
674 (void)flush_jobs_info;
675 #endif // ROCKSDB_LITE
676 }
677
CompactRange(const CompactRangeOptions & options,ColumnFamilyHandle * column_family,const Slice * begin,const Slice * end)678 Status DBImpl::CompactRange(const CompactRangeOptions& options,
679 ColumnFamilyHandle* column_family,
680 const Slice* begin, const Slice* end) {
681 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
682 auto cfd = cfh->cfd();
683
684 if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
685 return Status::InvalidArgument("Invalid target path ID");
686 }
687
688 bool flush_needed = true;
689 if (begin != nullptr && end != nullptr) {
690 // TODO(ajkr): We could also optimize away the flush in certain cases where
691 // one/both sides of the interval are unbounded. But it requires more
692 // changes to RangesOverlapWithMemtables.
693 Range range(*begin, *end);
694 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
695 cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
696 CleanupSuperVersion(super_version);
697 }
698
699 Status s;
700 if (flush_needed) {
701 FlushOptions fo;
702 fo.allow_write_stall = options.allow_write_stall;
703 if (immutable_db_options_.atomic_flush) {
704 autovector<ColumnFamilyData*> cfds;
705 mutex_.Lock();
706 SelectColumnFamiliesForAtomicFlush(&cfds);
707 mutex_.Unlock();
708 s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
709 false /* writes_stopped */);
710 } else {
711 s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
712 false /* writes_stopped*/);
713 }
714 if (!s.ok()) {
715 LogFlush(immutable_db_options_.info_log);
716 return s;
717 }
718 }
719
720 constexpr int kInvalidLevel = -1;
721 int final_output_level = kInvalidLevel;
722 bool exclusive = options.exclusive_manual_compaction;
723 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
724 cfd->NumberLevels() > 1) {
725 // Always compact all files together.
726 final_output_level = cfd->NumberLevels() - 1;
727 // if bottom most level is reserved
728 if (immutable_db_options_.allow_ingest_behind) {
729 final_output_level--;
730 }
731 s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
732 final_output_level, options, begin, end, exclusive,
733 false, port::kMaxUint64);
734 } else {
735 int first_overlapped_level = kInvalidLevel;
736 int max_overlapped_level = kInvalidLevel;
737 {
738 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
739 Version* current_version = super_version->current;
740 ReadOptions ro;
741 ro.total_order_seek = true;
742 bool overlap;
743 for (int level = 0;
744 level < current_version->storage_info()->num_non_empty_levels();
745 level++) {
746 overlap = true;
747 if (begin != nullptr && end != nullptr) {
748 Status status = current_version->OverlapWithLevelIterator(
749 ro, file_options_, *begin, *end, level, &overlap);
750 if (!status.ok()) {
751 overlap = current_version->storage_info()->OverlapInLevel(
752 level, begin, end);
753 }
754 } else {
755 overlap = current_version->storage_info()->OverlapInLevel(level,
756 begin, end);
757 }
758 if (overlap) {
759 if (first_overlapped_level == kInvalidLevel) {
760 first_overlapped_level = level;
761 }
762 max_overlapped_level = level;
763 }
764 }
765 CleanupSuperVersion(super_version);
766 }
767 if (s.ok() && first_overlapped_level != kInvalidLevel) {
768 // max_file_num_to_ignore can be used to filter out newly created SST
769 // files, useful for bottom level compaction in a manual compaction
770 uint64_t max_file_num_to_ignore = port::kMaxUint64;
771 uint64_t next_file_number = versions_->current_next_file_number();
772 final_output_level = max_overlapped_level;
773 int output_level;
774 for (int level = first_overlapped_level; level <= max_overlapped_level;
775 level++) {
776 // in case the compaction is universal or if we're compacting the
777 // bottom-most level, the output level will be the same as input one.
778 // level 0 can never be the bottommost level (i.e. if all files are in
779 // level 0, we will compact to level 1)
780 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
781 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
782 output_level = level;
783 } else if (level == max_overlapped_level && level > 0) {
784 if (options.bottommost_level_compaction ==
785 BottommostLevelCompaction::kSkip) {
786 // Skip bottommost level compaction
787 continue;
788 } else if (options.bottommost_level_compaction ==
789 BottommostLevelCompaction::kIfHaveCompactionFilter &&
790 cfd->ioptions()->compaction_filter == nullptr &&
791 cfd->ioptions()->compaction_filter_factory == nullptr) {
792 // Skip bottommost level compaction since we don't have a compaction
793 // filter
794 continue;
795 }
796 output_level = level;
797 // update max_file_num_to_ignore only for bottom level compaction
798 // because data in newly compacted files in middle levels may still
799 // need to be pushed down
800 max_file_num_to_ignore = next_file_number;
801 } else {
802 output_level = level + 1;
803 if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
804 cfd->ioptions()->level_compaction_dynamic_level_bytes &&
805 level == 0) {
806 output_level = ColumnFamilyData::kCompactToBaseLevel;
807 }
808 }
809 s = RunManualCompaction(cfd, level, output_level, options, begin, end,
810 exclusive, false, max_file_num_to_ignore);
811 if (!s.ok()) {
812 break;
813 }
814 if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
815 final_output_level = cfd->NumberLevels() - 1;
816 } else if (output_level > final_output_level) {
817 final_output_level = output_level;
818 }
819 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
820 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
821 }
822 }
823 }
824 if (!s.ok() || final_output_level == kInvalidLevel) {
825 LogFlush(immutable_db_options_.info_log);
826 return s;
827 }
828
829 if (options.change_level) {
830 ROCKS_LOG_INFO(immutable_db_options_.info_log,
831 "[RefitLevel] waiting for background threads to stop");
832 s = PauseBackgroundWork();
833 if (s.ok()) {
834 s = ReFitLevel(cfd, final_output_level, options.target_level);
835 }
836 ContinueBackgroundWork();
837 }
838 LogFlush(immutable_db_options_.info_log);
839
840 {
841 InstrumentedMutexLock l(&mutex_);
842 // an automatic compaction that has been scheduled might have been
843 // preempted by the manual compactions. Need to schedule it back.
844 MaybeScheduleFlushOrCompaction();
845 }
846
847 return s;
848 }
849
CompactFiles(const CompactionOptions & compact_options,ColumnFamilyHandle * column_family,const std::vector<std::string> & input_file_names,const int output_level,const int output_path_id,std::vector<std::string> * const output_file_names,CompactionJobInfo * compaction_job_info)850 Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
851 ColumnFamilyHandle* column_family,
852 const std::vector<std::string>& input_file_names,
853 const int output_level, const int output_path_id,
854 std::vector<std::string>* const output_file_names,
855 CompactionJobInfo* compaction_job_info) {
856 #ifdef ROCKSDB_LITE
857 (void)compact_options;
858 (void)column_family;
859 (void)input_file_names;
860 (void)output_level;
861 (void)output_path_id;
862 (void)output_file_names;
863 (void)compaction_job_info;
864 // not supported in lite version
865 return Status::NotSupported("Not supported in ROCKSDB LITE");
866 #else
867 if (column_family == nullptr) {
868 return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
869 }
870
871 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
872 assert(cfd);
873
874 Status s;
875 JobContext job_context(0, true);
876 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
877 immutable_db_options_.info_log.get());
878
879 // Perform CompactFiles
880 TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
881 {
882 InstrumentedMutexLock l(&mutex_);
883
884 // This call will unlock/lock the mutex to wait for current running
885 // IngestExternalFile() calls to finish.
886 WaitForIngestFile();
887
888 // We need to get current after `WaitForIngestFile`, because
889 // `IngestExternalFile` may add files that overlap with `input_file_names`
890 auto* current = cfd->current();
891 current->Ref();
892
893 s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
894 output_file_names, output_level, output_path_id,
895 &job_context, &log_buffer, compaction_job_info);
896
897 current->Unref();
898 }
899
900 // Find and delete obsolete files
901 {
902 InstrumentedMutexLock l(&mutex_);
903 // If !s.ok(), this means that Compaction failed. In that case, we want
904 // to delete all obsolete files we might have created and we force
905 // FindObsoleteFiles(). This is because job_context does not
906 // catch all created files if compaction failed.
907 FindObsoleteFiles(&job_context, !s.ok());
908 } // release the mutex
909
910 // delete unnecessary files if any, this is done outside the mutex
911 if (job_context.HaveSomethingToClean() ||
912 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
913 // Have to flush the info logs before bg_compaction_scheduled_--
914 // because if bg_flush_scheduled_ becomes 0 and the lock is
915 // released, the deconstructor of DB can kick in and destroy all the
916 // states of DB so info_log might not be available after that point.
917 // It also applies to access other states that DB owns.
918 log_buffer.FlushBufferToLog();
919 if (job_context.HaveSomethingToDelete()) {
920 // no mutex is locked here. No need to Unlock() and Lock() here.
921 PurgeObsoleteFiles(job_context);
922 }
923 job_context.Clean();
924 }
925
926 return s;
927 #endif // ROCKSDB_LITE
928 }
929
930 #ifndef ROCKSDB_LITE
CompactFilesImpl(const CompactionOptions & compact_options,ColumnFamilyData * cfd,Version * version,const std::vector<std::string> & input_file_names,std::vector<std::string> * const output_file_names,const int output_level,int output_path_id,JobContext * job_context,LogBuffer * log_buffer,CompactionJobInfo * compaction_job_info)931 Status DBImpl::CompactFilesImpl(
932 const CompactionOptions& compact_options, ColumnFamilyData* cfd,
933 Version* version, const std::vector<std::string>& input_file_names,
934 std::vector<std::string>* const output_file_names, const int output_level,
935 int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
936 CompactionJobInfo* compaction_job_info) {
937 mutex_.AssertHeld();
938
939 if (shutting_down_.load(std::memory_order_acquire)) {
940 return Status::ShutdownInProgress();
941 }
942 if (manual_compaction_paused_.load(std::memory_order_acquire)) {
943 return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
944 }
945
946 std::unordered_set<uint64_t> input_set;
947 for (const auto& file_name : input_file_names) {
948 input_set.insert(TableFileNameToNumber(file_name));
949 }
950
951 ColumnFamilyMetaData cf_meta;
952 // TODO(yhchiang): can directly use version here if none of the
953 // following functions call is pluggable to external developers.
954 version->GetColumnFamilyMetaData(&cf_meta);
955
956 if (output_path_id < 0) {
957 if (cfd->ioptions()->cf_paths.size() == 1U) {
958 output_path_id = 0;
959 } else {
960 return Status::NotSupported(
961 "Automatic output path selection is not "
962 "yet supported in CompactFiles()");
963 }
964 }
965
966 Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
967 &input_set, cf_meta, output_level);
968 if (!s.ok()) {
969 return s;
970 }
971
972 std::vector<CompactionInputFiles> input_files;
973 s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
974 &input_files, &input_set, version->storage_info(), compact_options);
975 if (!s.ok()) {
976 return s;
977 }
978
979 for (const auto& inputs : input_files) {
980 if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
981 return Status::Aborted(
982 "Some of the necessary compaction input "
983 "files are already being compacted");
984 }
985 }
986 bool sfm_reserved_compact_space = false;
987 // First check if we have enough room to do the compaction
988 bool enough_room = EnoughRoomForCompaction(
989 cfd, input_files, &sfm_reserved_compact_space, log_buffer);
990
991 if (!enough_room) {
992 // m's vars will get set properly at the end of this function,
993 // as long as status == CompactionTooLarge
994 return Status::CompactionTooLarge();
995 }
996
997 // At this point, CompactFiles will be run.
998 bg_compaction_scheduled_++;
999
1000 std::unique_ptr<Compaction> c;
1001 assert(cfd->compaction_picker());
1002 c.reset(cfd->compaction_picker()->CompactFiles(
1003 compact_options, input_files, output_level, version->storage_info(),
1004 *cfd->GetLatestMutableCFOptions(), output_path_id));
1005 // we already sanitized the set of input files and checked for conflicts
1006 // without releasing the lock, so we're guaranteed a compaction can be formed.
1007 assert(c != nullptr);
1008
1009 c->SetInputVersion(version);
1010 // deletion compaction currently not allowed in CompactFiles.
1011 assert(!c->deletion_compaction());
1012
1013 std::vector<SequenceNumber> snapshot_seqs;
1014 SequenceNumber earliest_write_conflict_snapshot;
1015 SnapshotChecker* snapshot_checker;
1016 GetSnapshotContext(job_context, &snapshot_seqs,
1017 &earliest_write_conflict_snapshot, &snapshot_checker);
1018
1019 std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1020 new std::list<uint64_t>::iterator(
1021 CaptureCurrentFileNumberInPendingOutputs()));
1022
1023 assert(is_snapshot_supported_ || snapshots_.empty());
1024 CompactionJobStats compaction_job_stats;
1025 CompactionJob compaction_job(
1026 job_context->job_id, c.get(), immutable_db_options_,
1027 file_options_for_compaction_, versions_.get(), &shutting_down_,
1028 preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
1029 GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
1030 &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
1031 snapshot_checker, table_cache_, &event_logger_,
1032 c->mutable_cf_options()->paranoid_file_checks,
1033 c->mutable_cf_options()->report_bg_io_stats, dbname_,
1034 &compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_);
1035
1036 // Creating a compaction influences the compaction score because the score
1037 // takes running compactions into account (by skipping files that are already
1038 // being compacted). Since we just changed compaction score, we recalculate it
1039 // here.
1040 version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
1041 *c->mutable_cf_options());
1042
1043 compaction_job.Prepare();
1044
1045 mutex_.Unlock();
1046 TEST_SYNC_POINT("CompactFilesImpl:0");
1047 TEST_SYNC_POINT("CompactFilesImpl:1");
1048 compaction_job.Run();
1049 TEST_SYNC_POINT("CompactFilesImpl:2");
1050 TEST_SYNC_POINT("CompactFilesImpl:3");
1051 mutex_.Lock();
1052
1053 Status status = compaction_job.Install(*c->mutable_cf_options());
1054 if (status.ok()) {
1055 InstallSuperVersionAndScheduleWork(c->column_family_data(),
1056 &job_context->superversion_contexts[0],
1057 *c->mutable_cf_options());
1058 }
1059 c->ReleaseCompactionFiles(s);
1060 #ifndef ROCKSDB_LITE
1061 // Need to make sure SstFileManager does its bookkeeping
1062 auto sfm = static_cast<SstFileManagerImpl*>(
1063 immutable_db_options_.sst_file_manager.get());
1064 if (sfm && sfm_reserved_compact_space) {
1065 sfm->OnCompactionCompletion(c.get());
1066 }
1067 #endif // ROCKSDB_LITE
1068
1069 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1070
1071 if (compaction_job_info != nullptr) {
1072 BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
1073 job_context->job_id, version, compaction_job_info);
1074 }
1075
1076 if (status.ok()) {
1077 // Done
1078 } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
1079 // Ignore compaction errors found during shutting down
1080 } else if (status.IsManualCompactionPaused()) {
1081 // Don't report stopping manual compaction as error
1082 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1083 "[%s] [JOB %d] Stopping manual compaction",
1084 c->column_family_data()->GetName().c_str(),
1085 job_context->job_id);
1086 } else {
1087 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1088 "[%s] [JOB %d] Compaction error: %s",
1089 c->column_family_data()->GetName().c_str(),
1090 job_context->job_id, status.ToString().c_str());
1091 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
1092 }
1093
1094 if (output_file_names != nullptr) {
1095 for (const auto& newf : c->edit()->GetNewFiles()) {
1096 (*output_file_names)
1097 .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
1098 newf.second.fd.GetNumber(),
1099 newf.second.fd.GetPathId()));
1100 }
1101 }
1102
1103 c.reset();
1104
1105 bg_compaction_scheduled_--;
1106 if (bg_compaction_scheduled_ == 0) {
1107 bg_cv_.SignalAll();
1108 }
1109 MaybeScheduleFlushOrCompaction();
1110 TEST_SYNC_POINT("CompactFilesImpl:End");
1111
1112 return status;
1113 }
1114 #endif // ROCKSDB_LITE
1115
PauseBackgroundWork()1116 Status DBImpl::PauseBackgroundWork() {
1117 InstrumentedMutexLock guard_lock(&mutex_);
1118 bg_compaction_paused_++;
1119 while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
1120 bg_flush_scheduled_ > 0) {
1121 bg_cv_.Wait();
1122 }
1123 bg_work_paused_++;
1124 return Status::OK();
1125 }
1126
ContinueBackgroundWork()1127 Status DBImpl::ContinueBackgroundWork() {
1128 InstrumentedMutexLock guard_lock(&mutex_);
1129 if (bg_work_paused_ == 0) {
1130 return Status::InvalidArgument();
1131 }
1132 assert(bg_work_paused_ > 0);
1133 assert(bg_compaction_paused_ > 0);
1134 bg_compaction_paused_--;
1135 bg_work_paused_--;
1136 // It's sufficient to check just bg_work_paused_ here since
1137 // bg_work_paused_ is always no greater than bg_compaction_paused_
1138 if (bg_work_paused_ == 0) {
1139 MaybeScheduleFlushOrCompaction();
1140 }
1141 return Status::OK();
1142 }
1143
NotifyOnCompactionBegin(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & job_stats,int job_id)1144 void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1145 const Status& st,
1146 const CompactionJobStats& job_stats,
1147 int job_id) {
1148 #ifndef ROCKSDB_LITE
1149 if (immutable_db_options_.listeners.empty()) {
1150 return;
1151 }
1152 mutex_.AssertHeld();
1153 if (shutting_down_.load(std::memory_order_acquire)) {
1154 return;
1155 }
1156 if (c->is_manual_compaction() &&
1157 manual_compaction_paused_.load(std::memory_order_acquire)) {
1158 return;
1159 }
1160 Version* current = cfd->current();
1161 current->Ref();
1162 // release lock while notifying events
1163 mutex_.Unlock();
1164 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
1165 {
1166 CompactionJobInfo info{};
1167 info.cf_name = cfd->GetName();
1168 info.status = st;
1169 info.thread_id = env_->GetThreadID();
1170 info.job_id = job_id;
1171 info.base_input_level = c->start_level();
1172 info.output_level = c->output_level();
1173 info.stats = job_stats;
1174 info.table_properties = c->GetOutputTableProperties();
1175 info.compaction_reason = c->compaction_reason();
1176 info.compression = c->output_compression();
1177 for (size_t i = 0; i < c->num_input_levels(); ++i) {
1178 for (const auto fmd : *c->inputs(i)) {
1179 const FileDescriptor& desc = fmd->fd;
1180 const uint64_t file_number = desc.GetNumber();
1181 auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
1182 file_number, desc.GetPathId());
1183 info.input_files.push_back(fn);
1184 info.input_file_infos.push_back(CompactionFileInfo{
1185 static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
1186 if (info.table_properties.count(fn) == 0) {
1187 std::shared_ptr<const TableProperties> tp;
1188 auto s = current->GetTableProperties(&tp, fmd, &fn);
1189 if (s.ok()) {
1190 info.table_properties[fn] = tp;
1191 }
1192 }
1193 }
1194 }
1195 for (const auto& newf : c->edit()->GetNewFiles()) {
1196 const FileMetaData& meta = newf.second;
1197 const FileDescriptor& desc = meta.fd;
1198 const uint64_t file_number = desc.GetNumber();
1199 info.output_files.push_back(TableFileName(
1200 c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
1201 info.output_file_infos.push_back(CompactionFileInfo{
1202 newf.first, file_number, meta.oldest_blob_file_number});
1203 }
1204 for (auto listener : immutable_db_options_.listeners) {
1205 listener->OnCompactionBegin(this, info);
1206 }
1207 }
1208 mutex_.Lock();
1209 current->Unref();
1210 #else
1211 (void)cfd;
1212 (void)c;
1213 (void)st;
1214 (void)job_stats;
1215 (void)job_id;
1216 #endif // ROCKSDB_LITE
1217 }
1218
NotifyOnCompactionCompleted(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id)1219 void DBImpl::NotifyOnCompactionCompleted(
1220 ColumnFamilyData* cfd, Compaction* c, const Status& st,
1221 const CompactionJobStats& compaction_job_stats, const int job_id) {
1222 #ifndef ROCKSDB_LITE
1223 if (immutable_db_options_.listeners.size() == 0U) {
1224 return;
1225 }
1226 mutex_.AssertHeld();
1227 if (shutting_down_.load(std::memory_order_acquire)) {
1228 return;
1229 }
1230 if (c->is_manual_compaction() &&
1231 manual_compaction_paused_.load(std::memory_order_acquire)) {
1232 return;
1233 }
1234 Version* current = cfd->current();
1235 current->Ref();
1236 // release lock while notifying events
1237 mutex_.Unlock();
1238 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
1239 {
1240 CompactionJobInfo info{};
1241 BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
1242 &info);
1243 for (auto listener : immutable_db_options_.listeners) {
1244 listener->OnCompactionCompleted(this, info);
1245 }
1246 }
1247 mutex_.Lock();
1248 current->Unref();
1249 // no need to signal bg_cv_ as it will be signaled at the end of the
1250 // flush process.
1251 #else
1252 (void)cfd;
1253 (void)c;
1254 (void)st;
1255 (void)compaction_job_stats;
1256 (void)job_id;
1257 #endif // ROCKSDB_LITE
1258 }
1259
1260 // REQUIREMENT: block all background work by calling PauseBackgroundWork()
1261 // before calling this function
ReFitLevel(ColumnFamilyData * cfd,int level,int target_level)1262 Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
1263 assert(level < cfd->NumberLevels());
1264 if (target_level >= cfd->NumberLevels()) {
1265 return Status::InvalidArgument("Target level exceeds number of levels");
1266 }
1267
1268 SuperVersionContext sv_context(/* create_superversion */ true);
1269
1270 Status status;
1271
1272 InstrumentedMutexLock guard_lock(&mutex_);
1273
1274 // only allow one thread refitting
1275 if (refitting_level_) {
1276 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1277 "[ReFitLevel] another thread is refitting");
1278 return Status::NotSupported("another thread is refitting");
1279 }
1280 refitting_level_ = true;
1281
1282 const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1283 // move to a smaller level
1284 int to_level = target_level;
1285 if (target_level < 0) {
1286 to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1287 }
1288
1289 auto* vstorage = cfd->current()->storage_info();
1290 if (to_level > level) {
1291 if (level == 0) {
1292 return Status::NotSupported(
1293 "Cannot change from level 0 to other levels.");
1294 }
1295 // Check levels are empty for a trivial move
1296 for (int l = level + 1; l <= to_level; l++) {
1297 if (vstorage->NumLevelFiles(l) > 0) {
1298 return Status::NotSupported(
1299 "Levels between source and target are not empty for a move.");
1300 }
1301 }
1302 }
1303 if (to_level != level) {
1304 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1305 "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
1306 cfd->current()->DebugString().data());
1307
1308 VersionEdit edit;
1309 edit.SetColumnFamily(cfd->GetID());
1310 for (const auto& f : vstorage->LevelFiles(level)) {
1311 edit.DeleteFile(level, f->fd.GetNumber());
1312 edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
1313 f->fd.GetFileSize(), f->smallest, f->largest,
1314 f->fd.smallest_seqno, f->fd.largest_seqno,
1315 f->marked_for_compaction, f->oldest_blob_file_number,
1316 f->oldest_ancester_time, f->file_creation_time,
1317 f->file_checksum, f->file_checksum_func_name);
1318 }
1319 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1320 "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
1321 edit.DebugString().data());
1322
1323 status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
1324 directories_.GetDbDir());
1325 InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
1326
1327 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
1328 cfd->GetName().c_str(), status.ToString().data());
1329
1330 if (status.ok()) {
1331 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1332 "[%s] After refitting:\n%s", cfd->GetName().c_str(),
1333 cfd->current()->DebugString().data());
1334 }
1335 }
1336
1337 sv_context.Clean();
1338 refitting_level_ = false;
1339
1340 return status;
1341 }
1342
NumberLevels(ColumnFamilyHandle * column_family)1343 int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
1344 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1345 return cfh->cfd()->NumberLevels();
1346 }
1347
MaxMemCompactionLevel(ColumnFamilyHandle *)1348 int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
1349 return 0;
1350 }
1351
Level0StopWriteTrigger(ColumnFamilyHandle * column_family)1352 int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
1353 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1354 InstrumentedMutexLock l(&mutex_);
1355 return cfh->cfd()
1356 ->GetSuperVersion()
1357 ->mutable_cf_options.level0_stop_writes_trigger;
1358 }
1359
Flush(const FlushOptions & flush_options,ColumnFamilyHandle * column_family)1360 Status DBImpl::Flush(const FlushOptions& flush_options,
1361 ColumnFamilyHandle* column_family) {
1362 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1363 ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
1364 cfh->GetName().c_str());
1365 Status s;
1366 if (immutable_db_options_.atomic_flush) {
1367 s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
1368 FlushReason::kManualFlush);
1369 } else {
1370 s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
1371 }
1372
1373 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1374 "[%s] Manual flush finished, status: %s\n",
1375 cfh->GetName().c_str(), s.ToString().c_str());
1376 return s;
1377 }
1378
Flush(const FlushOptions & flush_options,const std::vector<ColumnFamilyHandle * > & column_families)1379 Status DBImpl::Flush(const FlushOptions& flush_options,
1380 const std::vector<ColumnFamilyHandle*>& column_families) {
1381 Status s;
1382 if (!immutable_db_options_.atomic_flush) {
1383 for (auto cfh : column_families) {
1384 s = Flush(flush_options, cfh);
1385 if (!s.ok()) {
1386 break;
1387 }
1388 }
1389 } else {
1390 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1391 "Manual atomic flush start.\n"
1392 "=====Column families:=====");
1393 for (auto cfh : column_families) {
1394 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1395 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1396 cfhi->GetName().c_str());
1397 }
1398 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1399 "=====End of column families list=====");
1400 autovector<ColumnFamilyData*> cfds;
1401 std::for_each(column_families.begin(), column_families.end(),
1402 [&cfds](ColumnFamilyHandle* elem) {
1403 auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
1404 cfds.emplace_back(cfh->cfd());
1405 });
1406 s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
1407 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1408 "Manual atomic flush finished, status: %s\n"
1409 "=====Column families:=====",
1410 s.ToString().c_str());
1411 for (auto cfh : column_families) {
1412 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1413 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1414 cfhi->GetName().c_str());
1415 }
1416 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1417 "=====End of column families list=====");
1418 }
1419 return s;
1420 }
1421
RunManualCompaction(ColumnFamilyData * cfd,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const Slice * begin,const Slice * end,bool exclusive,bool disallow_trivial_move,uint64_t max_file_num_to_ignore)1422 Status DBImpl::RunManualCompaction(
1423 ColumnFamilyData* cfd, int input_level, int output_level,
1424 const CompactRangeOptions& compact_range_options, const Slice* begin,
1425 const Slice* end, bool exclusive, bool disallow_trivial_move,
1426 uint64_t max_file_num_to_ignore) {
1427 assert(input_level == ColumnFamilyData::kCompactAllLevels ||
1428 input_level >= 0);
1429
1430 InternalKey begin_storage, end_storage;
1431 CompactionArg* ca;
1432
1433 bool scheduled = false;
1434 bool manual_conflict = false;
1435 ManualCompactionState manual;
1436 manual.cfd = cfd;
1437 manual.input_level = input_level;
1438 manual.output_level = output_level;
1439 manual.output_path_id = compact_range_options.target_path_id;
1440 manual.done = false;
1441 manual.in_progress = false;
1442 manual.incomplete = false;
1443 manual.exclusive = exclusive;
1444 manual.disallow_trivial_move = disallow_trivial_move;
1445 // For universal compaction, we enforce every manual compaction to compact
1446 // all files.
1447 if (begin == nullptr ||
1448 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1449 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1450 manual.begin = nullptr;
1451 } else {
1452 begin_storage.SetMinPossibleForUserKey(*begin);
1453 manual.begin = &begin_storage;
1454 }
1455 if (end == nullptr ||
1456 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1457 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1458 manual.end = nullptr;
1459 } else {
1460 end_storage.SetMaxPossibleForUserKey(*end);
1461 manual.end = &end_storage;
1462 }
1463
1464 TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
1465 TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
1466 InstrumentedMutexLock l(&mutex_);
1467
1468 // When a manual compaction arrives, temporarily disable scheduling of
1469 // non-manual compactions and wait until the number of scheduled compaction
1470 // jobs drops to zero. This is needed to ensure that this manual compaction
1471 // can compact any range of keys/files.
1472 //
1473 // HasPendingManualCompaction() is true when at least one thread is inside
1474 // RunManualCompaction(), i.e. during that time no other compaction will
1475 // get scheduled (see MaybeScheduleFlushOrCompaction).
1476 //
1477 // Note that the following loop doesn't stop more that one thread calling
1478 // RunManualCompaction() from getting to the second while loop below.
1479 // However, only one of them will actually schedule compaction, while
1480 // others will wait on a condition variable until it completes.
1481
1482 AddManualCompaction(&manual);
1483 TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
1484 if (exclusive) {
1485 while (bg_bottom_compaction_scheduled_ > 0 ||
1486 bg_compaction_scheduled_ > 0) {
1487 TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
1488 ROCKS_LOG_INFO(
1489 immutable_db_options_.info_log,
1490 "[%s] Manual compaction waiting for all other scheduled background "
1491 "compactions to finish",
1492 cfd->GetName().c_str());
1493 bg_cv_.Wait();
1494 }
1495 }
1496
1497 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1498 "[%s] Manual compaction starting", cfd->GetName().c_str());
1499
1500 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1501 immutable_db_options_.info_log.get());
1502 // We don't check bg_error_ here, because if we get the error in compaction,
1503 // the compaction will set manual.status to bg_error_ and set manual.done to
1504 // true.
1505 while (!manual.done) {
1506 assert(HasPendingManualCompaction());
1507 manual_conflict = false;
1508 Compaction* compaction = nullptr;
1509 if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
1510 scheduled ||
1511 (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1512 ((compaction = manual.cfd->CompactRange(
1513 *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
1514 manual.output_level, compact_range_options, manual.begin,
1515 manual.end, &manual.manual_end, &manual_conflict,
1516 max_file_num_to_ignore)) == nullptr &&
1517 manual_conflict))) {
1518 // exclusive manual compactions should not see a conflict during
1519 // CompactRange
1520 assert(!exclusive || !manual_conflict);
1521 // Running either this or some other manual compaction
1522 bg_cv_.Wait();
1523 if (scheduled && manual.incomplete == true) {
1524 assert(!manual.in_progress);
1525 scheduled = false;
1526 manual.incomplete = false;
1527 }
1528 } else if (!scheduled) {
1529 if (compaction == nullptr) {
1530 manual.done = true;
1531 bg_cv_.SignalAll();
1532 continue;
1533 }
1534 ca = new CompactionArg;
1535 ca->db = this;
1536 ca->prepicked_compaction = new PrepickedCompaction;
1537 ca->prepicked_compaction->manual_compaction_state = &manual;
1538 ca->prepicked_compaction->compaction = compaction;
1539 if (!RequestCompactionToken(
1540 cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
1541 // Don't throttle manual compaction, only count outstanding tasks.
1542 assert(false);
1543 }
1544 manual.incomplete = false;
1545 bg_compaction_scheduled_++;
1546 Env::Priority thread_pool_pri = Env::Priority::LOW;
1547 if (compaction->bottommost_level() &&
1548 env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
1549 thread_pool_pri = Env::Priority::BOTTOM;
1550 }
1551 env_->Schedule(&DBImpl::BGWorkCompaction, ca, thread_pool_pri, this,
1552 &DBImpl::UnscheduleCompactionCallback);
1553 scheduled = true;
1554 }
1555 }
1556
1557 log_buffer.FlushBufferToLog();
1558 assert(!manual.in_progress);
1559 assert(HasPendingManualCompaction());
1560 RemoveManualCompaction(&manual);
1561 bg_cv_.SignalAll();
1562 return manual.status;
1563 }
1564
GenerateFlushRequest(const autovector<ColumnFamilyData * > & cfds,FlushRequest * req)1565 void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
1566 FlushRequest* req) {
1567 assert(req != nullptr);
1568 req->reserve(cfds.size());
1569 for (const auto cfd : cfds) {
1570 if (nullptr == cfd) {
1571 // cfd may be null, see DBImpl::ScheduleFlushes
1572 continue;
1573 }
1574 uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
1575 req->emplace_back(cfd, max_memtable_id);
1576 }
1577 }
1578
FlushMemTable(ColumnFamilyData * cfd,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1579 Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
1580 const FlushOptions& flush_options,
1581 FlushReason flush_reason, bool writes_stopped) {
1582 Status s;
1583 uint64_t flush_memtable_id = 0;
1584 if (!flush_options.allow_write_stall) {
1585 bool flush_needed = true;
1586 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1587 TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
1588 if (!s.ok() || !flush_needed) {
1589 return s;
1590 }
1591 }
1592 FlushRequest flush_req;
1593 {
1594 WriteContext context;
1595 InstrumentedMutexLock guard_lock(&mutex_);
1596
1597 WriteThread::Writer w;
1598 WriteThread::Writer nonmem_w;
1599 if (!writes_stopped) {
1600 write_thread_.EnterUnbatched(&w, &mutex_);
1601 if (two_write_queues_) {
1602 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1603 }
1604 }
1605 WaitForPendingWrites();
1606
1607 if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
1608 s = SwitchMemtable(cfd, &context);
1609 }
1610 if (s.ok()) {
1611 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1612 !cached_recoverable_state_empty_.load()) {
1613 flush_memtable_id = cfd->imm()->GetLatestMemTableID();
1614 flush_req.emplace_back(cfd, flush_memtable_id);
1615 }
1616 if (immutable_db_options_.persist_stats_to_disk) {
1617 ColumnFamilyData* cfd_stats =
1618 versions_->GetColumnFamilySet()->GetColumnFamily(
1619 kPersistentStatsColumnFamilyName);
1620 if (cfd_stats != nullptr && cfd_stats != cfd &&
1621 !cfd_stats->mem()->IsEmpty()) {
1622 // only force flush stats CF when it will be the only CF lagging
1623 // behind after the current flush
1624 bool stats_cf_flush_needed = true;
1625 for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
1626 if (loop_cfd == cfd_stats || loop_cfd == cfd) {
1627 continue;
1628 }
1629 if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
1630 stats_cf_flush_needed = false;
1631 }
1632 }
1633 if (stats_cf_flush_needed) {
1634 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1635 "Force flushing stats CF with manual flush of %s "
1636 "to avoid holding old logs",
1637 cfd->GetName().c_str());
1638 s = SwitchMemtable(cfd_stats, &context);
1639 flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID();
1640 flush_req.emplace_back(cfd_stats, flush_memtable_id);
1641 }
1642 }
1643 }
1644 }
1645
1646 if (s.ok() && !flush_req.empty()) {
1647 for (auto& elem : flush_req) {
1648 ColumnFamilyData* loop_cfd = elem.first;
1649 loop_cfd->imm()->FlushRequested();
1650 }
1651 // If the caller wants to wait for this flush to complete, it indicates
1652 // that the caller expects the ColumnFamilyData not to be free'ed by
1653 // other threads which may drop the column family concurrently.
1654 // Therefore, we increase the cfd's ref count.
1655 if (flush_options.wait) {
1656 for (auto& elem : flush_req) {
1657 ColumnFamilyData* loop_cfd = elem.first;
1658 loop_cfd->Ref();
1659 }
1660 }
1661 SchedulePendingFlush(flush_req, flush_reason);
1662 MaybeScheduleFlushOrCompaction();
1663 }
1664
1665 if (!writes_stopped) {
1666 write_thread_.ExitUnbatched(&w);
1667 if (two_write_queues_) {
1668 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1669 }
1670 }
1671 }
1672 TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
1673 TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
1674 if (s.ok() && flush_options.wait) {
1675 autovector<ColumnFamilyData*> cfds;
1676 autovector<const uint64_t*> flush_memtable_ids;
1677 for (auto& iter : flush_req) {
1678 cfds.push_back(iter.first);
1679 flush_memtable_ids.push_back(&(iter.second));
1680 }
1681 s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1682 (flush_reason == FlushReason::kErrorRecovery));
1683 InstrumentedMutexLock lock_guard(&mutex_);
1684 for (auto* tmp_cfd : cfds) {
1685 tmp_cfd->UnrefAndTryDelete();
1686 }
1687 }
1688 TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
1689 return s;
1690 }
1691
1692 // Flush all elements in 'column_family_datas'
1693 // and atomically record the result to the MANIFEST.
AtomicFlushMemTables(const autovector<ColumnFamilyData * > & column_family_datas,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1694 Status DBImpl::AtomicFlushMemTables(
1695 const autovector<ColumnFamilyData*>& column_family_datas,
1696 const FlushOptions& flush_options, FlushReason flush_reason,
1697 bool writes_stopped) {
1698 Status s;
1699 if (!flush_options.allow_write_stall) {
1700 int num_cfs_to_flush = 0;
1701 for (auto cfd : column_family_datas) {
1702 bool flush_needed = true;
1703 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1704 if (!s.ok()) {
1705 return s;
1706 } else if (flush_needed) {
1707 ++num_cfs_to_flush;
1708 }
1709 }
1710 if (0 == num_cfs_to_flush) {
1711 return s;
1712 }
1713 }
1714 FlushRequest flush_req;
1715 autovector<ColumnFamilyData*> cfds;
1716 {
1717 WriteContext context;
1718 InstrumentedMutexLock guard_lock(&mutex_);
1719
1720 WriteThread::Writer w;
1721 WriteThread::Writer nonmem_w;
1722 if (!writes_stopped) {
1723 write_thread_.EnterUnbatched(&w, &mutex_);
1724 if (two_write_queues_) {
1725 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1726 }
1727 }
1728 WaitForPendingWrites();
1729
1730 for (auto cfd : column_family_datas) {
1731 if (cfd->IsDropped()) {
1732 continue;
1733 }
1734 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1735 !cached_recoverable_state_empty_.load()) {
1736 cfds.emplace_back(cfd);
1737 }
1738 }
1739 for (auto cfd : cfds) {
1740 if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) {
1741 continue;
1742 }
1743 cfd->Ref();
1744 s = SwitchMemtable(cfd, &context);
1745 cfd->UnrefAndTryDelete();
1746 if (!s.ok()) {
1747 break;
1748 }
1749 }
1750 if (s.ok()) {
1751 AssignAtomicFlushSeq(cfds);
1752 for (auto cfd : cfds) {
1753 cfd->imm()->FlushRequested();
1754 }
1755 // If the caller wants to wait for this flush to complete, it indicates
1756 // that the caller expects the ColumnFamilyData not to be free'ed by
1757 // other threads which may drop the column family concurrently.
1758 // Therefore, we increase the cfd's ref count.
1759 if (flush_options.wait) {
1760 for (auto cfd : cfds) {
1761 cfd->Ref();
1762 }
1763 }
1764 GenerateFlushRequest(cfds, &flush_req);
1765 SchedulePendingFlush(flush_req, flush_reason);
1766 MaybeScheduleFlushOrCompaction();
1767 }
1768
1769 if (!writes_stopped) {
1770 write_thread_.ExitUnbatched(&w);
1771 if (two_write_queues_) {
1772 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1773 }
1774 }
1775 }
1776 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
1777 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
1778 if (s.ok() && flush_options.wait) {
1779 autovector<const uint64_t*> flush_memtable_ids;
1780 for (auto& iter : flush_req) {
1781 flush_memtable_ids.push_back(&(iter.second));
1782 }
1783 s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1784 (flush_reason == FlushReason::kErrorRecovery));
1785 InstrumentedMutexLock lock_guard(&mutex_);
1786 for (auto* cfd : cfds) {
1787 cfd->UnrefAndTryDelete();
1788 }
1789 }
1790 return s;
1791 }
1792
1793 // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
1794 // cause write stall, for example if one memtable is being flushed already.
1795 // This method tries to avoid write stall (similar to CompactRange() behavior)
1796 // it emulates how the SuperVersion / LSM would change if flush happens, checks
1797 // it against various constrains and delays flush if it'd cause write stall.
1798 // Called should check status and flush_needed to see if flush already happened.
WaitUntilFlushWouldNotStallWrites(ColumnFamilyData * cfd,bool * flush_needed)1799 Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
1800 bool* flush_needed) {
1801 {
1802 *flush_needed = true;
1803 InstrumentedMutexLock l(&mutex_);
1804 uint64_t orig_active_memtable_id = cfd->mem()->GetID();
1805 WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
1806 do {
1807 if (write_stall_condition != WriteStallCondition::kNormal) {
1808 // Same error handling as user writes: Don't wait if there's a
1809 // background error, even if it's a soft error. We might wait here
1810 // indefinitely as the pending flushes/compactions may never finish
1811 // successfully, resulting in the stall condition lasting indefinitely
1812 if (error_handler_.IsBGWorkStopped()) {
1813 return error_handler_.GetBGError();
1814 }
1815
1816 TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
1817 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1818 "[%s] WaitUntilFlushWouldNotStallWrites"
1819 " waiting on stall conditions to clear",
1820 cfd->GetName().c_str());
1821 bg_cv_.Wait();
1822 }
1823 if (cfd->IsDropped()) {
1824 return Status::ColumnFamilyDropped();
1825 }
1826 if (shutting_down_.load(std::memory_order_acquire)) {
1827 return Status::ShutdownInProgress();
1828 }
1829
1830 uint64_t earliest_memtable_id =
1831 std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
1832 if (earliest_memtable_id > orig_active_memtable_id) {
1833 // We waited so long that the memtable we were originally waiting on was
1834 // flushed.
1835 *flush_needed = false;
1836 return Status::OK();
1837 }
1838
1839 const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1840 const auto* vstorage = cfd->current()->storage_info();
1841
1842 // Skip stalling check if we're below auto-flush and auto-compaction
1843 // triggers. If it stalled in these conditions, that'd mean the stall
1844 // triggers are so low that stalling is needed for any background work. In
1845 // that case we shouldn't wait since background work won't be scheduled.
1846 if (cfd->imm()->NumNotFlushed() <
1847 cfd->ioptions()->min_write_buffer_number_to_merge &&
1848 vstorage->l0_delay_trigger_count() <
1849 mutable_cf_options.level0_file_num_compaction_trigger) {
1850 break;
1851 }
1852
1853 // check whether one extra immutable memtable or an extra L0 file would
1854 // cause write stalling mode to be entered. It could still enter stall
1855 // mode due to pending compaction bytes, but that's less common
1856 write_stall_condition =
1857 ColumnFamilyData::GetWriteStallConditionAndCause(
1858 cfd->imm()->NumNotFlushed() + 1,
1859 vstorage->l0_delay_trigger_count() + 1,
1860 vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
1861 .first;
1862 } while (write_stall_condition != WriteStallCondition::kNormal);
1863 }
1864 return Status::OK();
1865 }
1866
1867 // Wait for memtables to be flushed for multiple column families.
1868 // let N = cfds.size()
1869 // for i in [0, N),
1870 // 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
1871 // have to be flushed for THIS column family;
1872 // 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
1873 // family have to be flushed.
1874 // Finish waiting when ALL column families finish flushing memtables.
1875 // resuming_from_bg_err indicates whether the caller is trying to resume from
1876 // background error or in normal processing.
WaitForFlushMemTables(const autovector<ColumnFamilyData * > & cfds,const autovector<const uint64_t * > & flush_memtable_ids,bool resuming_from_bg_err)1877 Status DBImpl::WaitForFlushMemTables(
1878 const autovector<ColumnFamilyData*>& cfds,
1879 const autovector<const uint64_t*>& flush_memtable_ids,
1880 bool resuming_from_bg_err) {
1881 int num = static_cast<int>(cfds.size());
1882 // Wait until the compaction completes
1883 InstrumentedMutexLock l(&mutex_);
1884 // If the caller is trying to resume from bg error, then
1885 // error_handler_.IsDBStopped() is true.
1886 while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
1887 if (shutting_down_.load(std::memory_order_acquire)) {
1888 return Status::ShutdownInProgress();
1889 }
1890 // If an error has occurred during resumption, then no need to wait.
1891 if (!error_handler_.GetRecoveryError().ok()) {
1892 break;
1893 }
1894 // Number of column families that have been dropped.
1895 int num_dropped = 0;
1896 // Number of column families that have finished flush.
1897 int num_finished = 0;
1898 for (int i = 0; i < num; ++i) {
1899 if (cfds[i]->IsDropped()) {
1900 ++num_dropped;
1901 } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
1902 (flush_memtable_ids[i] != nullptr &&
1903 cfds[i]->imm()->GetEarliestMemTableID() >
1904 *flush_memtable_ids[i])) {
1905 ++num_finished;
1906 }
1907 }
1908 if (1 == num_dropped && 1 == num) {
1909 return Status::InvalidArgument("Cannot flush a dropped CF");
1910 }
1911 // Column families involved in this flush request have either been dropped
1912 // or finished flush. Then it's time to finish waiting.
1913 if (num_dropped + num_finished == num) {
1914 break;
1915 }
1916 bg_cv_.Wait();
1917 }
1918 Status s;
1919 // If not resuming from bg error, and an error has caused the DB to stop,
1920 // then report the bg error to caller.
1921 if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
1922 s = error_handler_.GetBGError();
1923 }
1924 return s;
1925 }
1926
EnableAutoCompaction(const std::vector<ColumnFamilyHandle * > & column_family_handles)1927 Status DBImpl::EnableAutoCompaction(
1928 const std::vector<ColumnFamilyHandle*>& column_family_handles) {
1929 Status s;
1930 for (auto cf_ptr : column_family_handles) {
1931 Status status =
1932 this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
1933 if (!status.ok()) {
1934 s = status;
1935 }
1936 }
1937
1938 return s;
1939 }
1940
DisableManualCompaction()1941 void DBImpl::DisableManualCompaction() {
1942 manual_compaction_paused_.store(true, std::memory_order_release);
1943 }
1944
EnableManualCompaction()1945 void DBImpl::EnableManualCompaction() {
1946 manual_compaction_paused_.store(false, std::memory_order_release);
1947 }
1948
MaybeScheduleFlushOrCompaction()1949 void DBImpl::MaybeScheduleFlushOrCompaction() {
1950 mutex_.AssertHeld();
1951 if (!opened_successfully_) {
1952 // Compaction may introduce data race to DB open
1953 return;
1954 }
1955 if (bg_work_paused_ > 0) {
1956 // we paused the background work
1957 return;
1958 } else if (error_handler_.IsBGWorkStopped() &&
1959 !error_handler_.IsRecoveryInProgress()) {
1960 // There has been a hard error and this call is not part of the recovery
1961 // sequence. Bail out here so we don't get into an endless loop of
1962 // scheduling BG work which will again call this function
1963 return;
1964 } else if (shutting_down_.load(std::memory_order_acquire)) {
1965 // DB is being deleted; no more background compactions
1966 return;
1967 }
1968 auto bg_job_limits = GetBGJobLimits();
1969 bool is_flush_pool_empty =
1970 env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
1971 while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
1972 bg_flush_scheduled_ < bg_job_limits.max_flushes) {
1973 bg_flush_scheduled_++;
1974 FlushThreadArg* fta = new FlushThreadArg;
1975 fta->db_ = this;
1976 fta->thread_pri_ = Env::Priority::HIGH;
1977 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
1978 &DBImpl::UnscheduleFlushCallback);
1979 --unscheduled_flushes_;
1980 TEST_SYNC_POINT_CALLBACK(
1981 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
1982 &unscheduled_flushes_);
1983 }
1984
1985 // special case -- if high-pri (flush) thread pool is empty, then schedule
1986 // flushes in low-pri (compaction) thread pool.
1987 if (is_flush_pool_empty) {
1988 while (unscheduled_flushes_ > 0 &&
1989 bg_flush_scheduled_ + bg_compaction_scheduled_ <
1990 bg_job_limits.max_flushes) {
1991 bg_flush_scheduled_++;
1992 FlushThreadArg* fta = new FlushThreadArg;
1993 fta->db_ = this;
1994 fta->thread_pri_ = Env::Priority::LOW;
1995 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
1996 &DBImpl::UnscheduleFlushCallback);
1997 --unscheduled_flushes_;
1998 }
1999 }
2000
2001 if (bg_compaction_paused_ > 0) {
2002 // we paused the background compaction
2003 return;
2004 } else if (error_handler_.IsBGWorkStopped()) {
2005 // Compaction is not part of the recovery sequence from a hard error. We
2006 // might get here because recovery might do a flush and install a new
2007 // super version, which will try to schedule pending compactions. Bail
2008 // out here and let the higher level recovery handle compactions
2009 return;
2010 }
2011
2012 if (HasExclusiveManualCompaction()) {
2013 // only manual compactions are allowed to run. don't schedule automatic
2014 // compactions
2015 TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
2016 return;
2017 }
2018
2019 while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
2020 unscheduled_compactions_ > 0) {
2021 CompactionArg* ca = new CompactionArg;
2022 ca->db = this;
2023 ca->prepicked_compaction = nullptr;
2024 bg_compaction_scheduled_++;
2025 unscheduled_compactions_--;
2026 env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
2027 &DBImpl::UnscheduleCompactionCallback);
2028 }
2029 }
2030
GetBGJobLimits() const2031 DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
2032 mutex_.AssertHeld();
2033 return GetBGJobLimits(immutable_db_options_.max_background_flushes,
2034 mutable_db_options_.max_background_compactions,
2035 mutable_db_options_.max_background_jobs,
2036 write_controller_.NeedSpeedupCompaction());
2037 }
2038
GetBGJobLimits(int max_background_flushes,int max_background_compactions,int max_background_jobs,bool parallelize_compactions)2039 DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
2040 int max_background_compactions,
2041 int max_background_jobs,
2042 bool parallelize_compactions) {
2043 BGJobLimits res;
2044 if (max_background_flushes == -1 && max_background_compactions == -1) {
2045 // for our first stab implementing max_background_jobs, simply allocate a
2046 // quarter of the threads to flushes.
2047 res.max_flushes = std::max(1, max_background_jobs / 4);
2048 res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
2049 } else {
2050 // compatibility code in case users haven't migrated to max_background_jobs,
2051 // which automatically computes flush/compaction limits
2052 res.max_flushes = std::max(1, max_background_flushes);
2053 res.max_compactions = std::max(1, max_background_compactions);
2054 }
2055 if (!parallelize_compactions) {
2056 // throttle background compactions until we deem necessary
2057 res.max_compactions = 1;
2058 }
2059 return res;
2060 }
2061
AddToCompactionQueue(ColumnFamilyData * cfd)2062 void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
2063 assert(!cfd->queued_for_compaction());
2064 cfd->Ref();
2065 compaction_queue_.push_back(cfd);
2066 cfd->set_queued_for_compaction(true);
2067 }
2068
PopFirstFromCompactionQueue()2069 ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
2070 assert(!compaction_queue_.empty());
2071 auto cfd = *compaction_queue_.begin();
2072 compaction_queue_.pop_front();
2073 assert(cfd->queued_for_compaction());
2074 cfd->set_queued_for_compaction(false);
2075 return cfd;
2076 }
2077
PopFirstFromFlushQueue()2078 DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
2079 assert(!flush_queue_.empty());
2080 FlushRequest flush_req = flush_queue_.front();
2081 flush_queue_.pop_front();
2082 // TODO: need to unset flush reason?
2083 return flush_req;
2084 }
2085
PickCompactionFromQueue(std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)2086 ColumnFamilyData* DBImpl::PickCompactionFromQueue(
2087 std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
2088 assert(!compaction_queue_.empty());
2089 assert(*token == nullptr);
2090 autovector<ColumnFamilyData*> throttled_candidates;
2091 ColumnFamilyData* cfd = nullptr;
2092 while (!compaction_queue_.empty()) {
2093 auto first_cfd = *compaction_queue_.begin();
2094 compaction_queue_.pop_front();
2095 assert(first_cfd->queued_for_compaction());
2096 if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
2097 throttled_candidates.push_back(first_cfd);
2098 continue;
2099 }
2100 cfd = first_cfd;
2101 cfd->set_queued_for_compaction(false);
2102 break;
2103 }
2104 // Add throttled compaction candidates back to queue in the original order.
2105 for (auto iter = throttled_candidates.rbegin();
2106 iter != throttled_candidates.rend(); ++iter) {
2107 compaction_queue_.push_front(*iter);
2108 }
2109 return cfd;
2110 }
2111
SchedulePendingFlush(const FlushRequest & flush_req,FlushReason flush_reason)2112 void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
2113 FlushReason flush_reason) {
2114 if (flush_req.empty()) {
2115 return;
2116 }
2117 for (auto& iter : flush_req) {
2118 ColumnFamilyData* cfd = iter.first;
2119 cfd->Ref();
2120 cfd->SetFlushReason(flush_reason);
2121 }
2122 ++unscheduled_flushes_;
2123 flush_queue_.push_back(flush_req);
2124 }
2125
SchedulePendingCompaction(ColumnFamilyData * cfd)2126 void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
2127 if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
2128 AddToCompactionQueue(cfd);
2129 ++unscheduled_compactions_;
2130 }
2131 }
2132
SchedulePendingPurge(std::string fname,std::string dir_to_sync,FileType type,uint64_t number,int job_id)2133 void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
2134 FileType type, uint64_t number, int job_id) {
2135 mutex_.AssertHeld();
2136 PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
2137 purge_files_.insert({{number, std::move(file_info)}});
2138 }
2139
BGWorkFlush(void * arg)2140 void DBImpl::BGWorkFlush(void* arg) {
2141 FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
2142 delete reinterpret_cast<FlushThreadArg*>(arg);
2143
2144 IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
2145 TEST_SYNC_POINT("DBImpl::BGWorkFlush");
2146 static_cast_with_check<DBImpl, DB>(fta.db_)->BackgroundCallFlush(
2147 fta.thread_pri_);
2148 TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
2149 }
2150
BGWorkCompaction(void * arg)2151 void DBImpl::BGWorkCompaction(void* arg) {
2152 CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2153 delete reinterpret_cast<CompactionArg*>(arg);
2154 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
2155 TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
2156 auto prepicked_compaction =
2157 static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
2158 static_cast_with_check<DBImpl, DB>(ca.db)->BackgroundCallCompaction(
2159 prepicked_compaction, Env::Priority::LOW);
2160 delete prepicked_compaction;
2161 }
2162
BGWorkBottomCompaction(void * arg)2163 void DBImpl::BGWorkBottomCompaction(void* arg) {
2164 CompactionArg ca = *(static_cast<CompactionArg*>(arg));
2165 delete static_cast<CompactionArg*>(arg);
2166 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
2167 TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
2168 auto* prepicked_compaction = ca.prepicked_compaction;
2169 assert(prepicked_compaction && prepicked_compaction->compaction &&
2170 !prepicked_compaction->manual_compaction_state);
2171 ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
2172 delete prepicked_compaction;
2173 }
2174
BGWorkPurge(void * db)2175 void DBImpl::BGWorkPurge(void* db) {
2176 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
2177 TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
2178 reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
2179 TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
2180 }
2181
UnscheduleCompactionCallback(void * arg)2182 void DBImpl::UnscheduleCompactionCallback(void* arg) {
2183 CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2184 delete reinterpret_cast<CompactionArg*>(arg);
2185 if (ca.prepicked_compaction != nullptr) {
2186 if (ca.prepicked_compaction->compaction != nullptr) {
2187 delete ca.prepicked_compaction->compaction;
2188 }
2189 delete ca.prepicked_compaction;
2190 }
2191 TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
2192 }
2193
UnscheduleFlushCallback(void * arg)2194 void DBImpl::UnscheduleFlushCallback(void* arg) {
2195 delete reinterpret_cast<FlushThreadArg*>(arg);
2196 TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
2197 }
2198
BackgroundFlush(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,FlushReason * reason,Env::Priority thread_pri)2199 Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
2200 LogBuffer* log_buffer, FlushReason* reason,
2201 Env::Priority thread_pri) {
2202 mutex_.AssertHeld();
2203
2204 Status status;
2205 *reason = FlushReason::kOthers;
2206 // If BG work is stopped due to an error, but a recovery is in progress,
2207 // that means this flush is part of the recovery. So allow it to go through
2208 if (!error_handler_.IsBGWorkStopped()) {
2209 if (shutting_down_.load(std::memory_order_acquire)) {
2210 status = Status::ShutdownInProgress();
2211 }
2212 } else if (!error_handler_.IsRecoveryInProgress()) {
2213 status = error_handler_.GetBGError();
2214 }
2215
2216 if (!status.ok()) {
2217 return status;
2218 }
2219
2220 autovector<BGFlushArg> bg_flush_args;
2221 std::vector<SuperVersionContext>& superversion_contexts =
2222 job_context->superversion_contexts;
2223 autovector<ColumnFamilyData*> column_families_not_to_flush;
2224 while (!flush_queue_.empty()) {
2225 // This cfd is already referenced
2226 const FlushRequest& flush_req = PopFirstFromFlushQueue();
2227 superversion_contexts.clear();
2228 superversion_contexts.reserve(flush_req.size());
2229
2230 for (const auto& iter : flush_req) {
2231 ColumnFamilyData* cfd = iter.first;
2232 if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
2233 // can't flush this CF, try next one
2234 column_families_not_to_flush.push_back(cfd);
2235 continue;
2236 }
2237 superversion_contexts.emplace_back(SuperVersionContext(true));
2238 bg_flush_args.emplace_back(cfd, iter.second,
2239 &(superversion_contexts.back()));
2240 }
2241 if (!bg_flush_args.empty()) {
2242 break;
2243 }
2244 }
2245
2246 if (!bg_flush_args.empty()) {
2247 auto bg_job_limits = GetBGJobLimits();
2248 for (const auto& arg : bg_flush_args) {
2249 ColumnFamilyData* cfd = arg.cfd_;
2250 ROCKS_LOG_BUFFER(
2251 log_buffer,
2252 "Calling FlushMemTableToOutputFile with column "
2253 "family [%s], flush slots available %d, compaction slots available "
2254 "%d, "
2255 "flush slots scheduled %d, compaction slots scheduled %d",
2256 cfd->GetName().c_str(), bg_job_limits.max_flushes,
2257 bg_job_limits.max_compactions, bg_flush_scheduled_,
2258 bg_compaction_scheduled_);
2259 }
2260 status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
2261 job_context, log_buffer, thread_pri);
2262 TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
2263 // All the CFDs in the FlushReq must have the same flush reason, so just
2264 // grab the first one
2265 *reason = bg_flush_args[0].cfd_->GetFlushReason();
2266 for (auto& arg : bg_flush_args) {
2267 ColumnFamilyData* cfd = arg.cfd_;
2268 if (cfd->UnrefAndTryDelete()) {
2269 arg.cfd_ = nullptr;
2270 }
2271 }
2272 }
2273 for (auto cfd : column_families_not_to_flush) {
2274 cfd->UnrefAndTryDelete();
2275 }
2276 return status;
2277 }
2278
BackgroundCallFlush(Env::Priority thread_pri)2279 void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
2280 bool made_progress = false;
2281 JobContext job_context(next_job_id_.fetch_add(1), true);
2282
2283 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
2284
2285 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2286 immutable_db_options_.info_log.get());
2287 {
2288 InstrumentedMutexLock l(&mutex_);
2289 assert(bg_flush_scheduled_);
2290 num_running_flushes_++;
2291
2292 std::unique_ptr<std::list<uint64_t>::iterator>
2293 pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2294 CaptureCurrentFileNumberInPendingOutputs()));
2295 FlushReason reason;
2296
2297 Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
2298 &reason, thread_pri);
2299 if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
2300 reason != FlushReason::kErrorRecovery) {
2301 // Wait a little bit before retrying background flush in
2302 // case this is an environmental problem and we do not want to
2303 // chew up resources for failed flushes for the duration of
2304 // the problem.
2305 uint64_t error_cnt =
2306 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2307 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2308 mutex_.Unlock();
2309 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2310 "Waiting after background flush error: %s"
2311 "Accumulated background error counts: %" PRIu64,
2312 s.ToString().c_str(), error_cnt);
2313 log_buffer.FlushBufferToLog();
2314 LogFlush(immutable_db_options_.info_log);
2315 env_->SleepForMicroseconds(1000000);
2316 mutex_.Lock();
2317 }
2318
2319 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
2320 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2321
2322 // If flush failed, we want to delete all temporary files that we might have
2323 // created. Thus, we force full scan in FindObsoleteFiles()
2324 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2325 !s.IsColumnFamilyDropped());
2326 // delete unnecessary files if any, this is done outside the mutex
2327 if (job_context.HaveSomethingToClean() ||
2328 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2329 mutex_.Unlock();
2330 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
2331 // Have to flush the info logs before bg_flush_scheduled_--
2332 // because if bg_flush_scheduled_ becomes 0 and the lock is
2333 // released, the deconstructor of DB can kick in and destroy all the
2334 // states of DB so info_log might not be available after that point.
2335 // It also applies to access other states that DB owns.
2336 log_buffer.FlushBufferToLog();
2337 if (job_context.HaveSomethingToDelete()) {
2338 PurgeObsoleteFiles(job_context);
2339 }
2340 job_context.Clean();
2341 mutex_.Lock();
2342 }
2343 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
2344
2345 assert(num_running_flushes_ > 0);
2346 num_running_flushes_--;
2347 bg_flush_scheduled_--;
2348 // See if there's more work to be done
2349 MaybeScheduleFlushOrCompaction();
2350 atomic_flush_install_cv_.SignalAll();
2351 bg_cv_.SignalAll();
2352 // IMPORTANT: there should be no code after calling SignalAll. This call may
2353 // signal the DB destructor that it's OK to proceed with destruction. In
2354 // that case, all DB variables will be dealloacated and referencing them
2355 // will cause trouble.
2356 }
2357 }
2358
BackgroundCallCompaction(PrepickedCompaction * prepicked_compaction,Env::Priority bg_thread_pri)2359 void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2360 Env::Priority bg_thread_pri) {
2361 bool made_progress = false;
2362 JobContext job_context(next_job_id_.fetch_add(1), true);
2363 TEST_SYNC_POINT("BackgroundCallCompaction:0");
2364 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2365 immutable_db_options_.info_log.get());
2366 {
2367 InstrumentedMutexLock l(&mutex_);
2368
2369 // This call will unlock/lock the mutex to wait for current running
2370 // IngestExternalFile() calls to finish.
2371 WaitForIngestFile();
2372
2373 num_running_compactions_++;
2374
2375 std::unique_ptr<std::list<uint64_t>::iterator>
2376 pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2377 CaptureCurrentFileNumberInPendingOutputs()));
2378
2379 assert((bg_thread_pri == Env::Priority::BOTTOM &&
2380 bg_bottom_compaction_scheduled_) ||
2381 (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
2382 Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
2383 prepicked_compaction, bg_thread_pri);
2384 TEST_SYNC_POINT("BackgroundCallCompaction:1");
2385 if (s.IsBusy()) {
2386 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2387 mutex_.Unlock();
2388 env_->SleepForMicroseconds(10000); // prevent hot loop
2389 mutex_.Lock();
2390 } else if (!s.ok() && !s.IsShutdownInProgress() &&
2391 !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
2392 // Wait a little bit before retrying background compaction in
2393 // case this is an environmental problem and we do not want to
2394 // chew up resources for failed compactions for the duration of
2395 // the problem.
2396 uint64_t error_cnt =
2397 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2398 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2399 mutex_.Unlock();
2400 log_buffer.FlushBufferToLog();
2401 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2402 "Waiting after background compaction error: %s, "
2403 "Accumulated background error counts: %" PRIu64,
2404 s.ToString().c_str(), error_cnt);
2405 LogFlush(immutable_db_options_.info_log);
2406 env_->SleepForMicroseconds(1000000);
2407 mutex_.Lock();
2408 } else if (s.IsManualCompactionPaused()) {
2409 ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
2410 assert(m);
2411 ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
2412 m->cfd->GetName().c_str(), job_context.job_id);
2413 }
2414
2415 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2416
2417 // If compaction failed, we want to delete all temporary files that we might
2418 // have created (they might not be all recorded in job_context in case of a
2419 // failure). Thus, we force full scan in FindObsoleteFiles()
2420 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2421 !s.IsManualCompactionPaused() &&
2422 !s.IsColumnFamilyDropped());
2423 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
2424
2425 // delete unnecessary files if any, this is done outside the mutex
2426 if (job_context.HaveSomethingToClean() ||
2427 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2428 mutex_.Unlock();
2429 // Have to flush the info logs before bg_compaction_scheduled_--
2430 // because if bg_flush_scheduled_ becomes 0 and the lock is
2431 // released, the deconstructor of DB can kick in and destroy all the
2432 // states of DB so info_log might not be available after that point.
2433 // It also applies to access other states that DB owns.
2434 log_buffer.FlushBufferToLog();
2435 if (job_context.HaveSomethingToDelete()) {
2436 PurgeObsoleteFiles(job_context);
2437 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
2438 }
2439 job_context.Clean();
2440 mutex_.Lock();
2441 }
2442
2443 assert(num_running_compactions_ > 0);
2444 num_running_compactions_--;
2445 if (bg_thread_pri == Env::Priority::LOW) {
2446 bg_compaction_scheduled_--;
2447 } else {
2448 assert(bg_thread_pri == Env::Priority::BOTTOM);
2449 bg_bottom_compaction_scheduled_--;
2450 }
2451
2452 versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
2453
2454 // See if there's more work to be done
2455 MaybeScheduleFlushOrCompaction();
2456 if (made_progress ||
2457 (bg_compaction_scheduled_ == 0 &&
2458 bg_bottom_compaction_scheduled_ == 0) ||
2459 HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
2460 // signal if
2461 // * made_progress -- need to wakeup DelayWrite
2462 // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
2463 // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
2464 // If none of this is true, there is no need to signal since nobody is
2465 // waiting for it
2466 bg_cv_.SignalAll();
2467 }
2468 // IMPORTANT: there should be no code after calling SignalAll. This call may
2469 // signal the DB destructor that it's OK to proceed with destruction. In
2470 // that case, all DB variables will be dealloacated and referencing them
2471 // will cause trouble.
2472 }
2473 }
2474
BackgroundCompaction(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,PrepickedCompaction * prepicked_compaction,Env::Priority thread_pri)2475 Status DBImpl::BackgroundCompaction(bool* made_progress,
2476 JobContext* job_context,
2477 LogBuffer* log_buffer,
2478 PrepickedCompaction* prepicked_compaction,
2479 Env::Priority thread_pri) {
2480 ManualCompactionState* manual_compaction =
2481 prepicked_compaction == nullptr
2482 ? nullptr
2483 : prepicked_compaction->manual_compaction_state;
2484 *made_progress = false;
2485 mutex_.AssertHeld();
2486 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
2487
2488 bool is_manual = (manual_compaction != nullptr);
2489 std::unique_ptr<Compaction> c;
2490 if (prepicked_compaction != nullptr &&
2491 prepicked_compaction->compaction != nullptr) {
2492 c.reset(prepicked_compaction->compaction);
2493 }
2494 bool is_prepicked = is_manual || c;
2495
2496 // (manual_compaction->in_progress == false);
2497 bool trivial_move_disallowed =
2498 is_manual && manual_compaction->disallow_trivial_move;
2499
2500 CompactionJobStats compaction_job_stats;
2501 Status status;
2502 if (!error_handler_.IsBGWorkStopped()) {
2503 if (shutting_down_.load(std::memory_order_acquire)) {
2504 status = Status::ShutdownInProgress();
2505 } else if (is_manual &&
2506 manual_compaction_paused_.load(std::memory_order_acquire)) {
2507 status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
2508 }
2509 } else {
2510 status = error_handler_.GetBGError();
2511 // If we get here, it means a hard error happened after this compaction
2512 // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
2513 // a chance to execute. Since we didn't pop a cfd from the compaction
2514 // queue, increment unscheduled_compactions_
2515 unscheduled_compactions_++;
2516 }
2517
2518 if (!status.ok()) {
2519 if (is_manual) {
2520 manual_compaction->status = status;
2521 manual_compaction->done = true;
2522 manual_compaction->in_progress = false;
2523 manual_compaction = nullptr;
2524 }
2525 if (c) {
2526 c->ReleaseCompactionFiles(status);
2527 c.reset();
2528 }
2529 return status;
2530 }
2531
2532 if (is_manual) {
2533 // another thread cannot pick up the same work
2534 manual_compaction->in_progress = true;
2535 }
2536
2537 std::unique_ptr<TaskLimiterToken> task_token;
2538
2539 // InternalKey manual_end_storage;
2540 // InternalKey* manual_end = &manual_end_storage;
2541 bool sfm_reserved_compact_space = false;
2542 if (is_manual) {
2543 ManualCompactionState* m = manual_compaction;
2544 assert(m->in_progress);
2545 if (!c) {
2546 m->done = true;
2547 m->manual_end = nullptr;
2548 ROCKS_LOG_BUFFER(log_buffer,
2549 "[%s] Manual compaction from level-%d from %s .. "
2550 "%s; nothing to do\n",
2551 m->cfd->GetName().c_str(), m->input_level,
2552 (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2553 (m->end ? m->end->DebugString().c_str() : "(end)"));
2554 } else {
2555 // First check if we have enough room to do the compaction
2556 bool enough_room = EnoughRoomForCompaction(
2557 m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2558
2559 if (!enough_room) {
2560 // Then don't do the compaction
2561 c->ReleaseCompactionFiles(status);
2562 c.reset();
2563 // m's vars will get set properly at the end of this function,
2564 // as long as status == CompactionTooLarge
2565 status = Status::CompactionTooLarge();
2566 } else {
2567 ROCKS_LOG_BUFFER(
2568 log_buffer,
2569 "[%s] Manual compaction from level-%d to level-%d from %s .. "
2570 "%s; will stop at %s\n",
2571 m->cfd->GetName().c_str(), m->input_level, c->output_level(),
2572 (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2573 (m->end ? m->end->DebugString().c_str() : "(end)"),
2574 ((m->done || m->manual_end == nullptr)
2575 ? "(end)"
2576 : m->manual_end->DebugString().c_str()));
2577 }
2578 }
2579 } else if (!is_prepicked && !compaction_queue_.empty()) {
2580 if (HasExclusiveManualCompaction()) {
2581 // Can't compact right now, but try again later
2582 TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
2583
2584 // Stay in the compaction queue.
2585 unscheduled_compactions_++;
2586
2587 return Status::OK();
2588 }
2589
2590 auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
2591 if (cfd == nullptr) {
2592 // Can't find any executable task from the compaction queue.
2593 // All tasks have been throttled by compaction thread limiter.
2594 ++unscheduled_compactions_;
2595 return Status::Busy();
2596 }
2597
2598 // We unreference here because the following code will take a Ref() on
2599 // this cfd if it is going to use it (Compaction class holds a
2600 // reference).
2601 // This will all happen under a mutex so we don't have to be afraid of
2602 // somebody else deleting it.
2603 if (cfd->UnrefAndTryDelete()) {
2604 // This was the last reference of the column family, so no need to
2605 // compact.
2606 return Status::OK();
2607 }
2608
2609 // Pick up latest mutable CF Options and use it throughout the
2610 // compaction job
2611 // Compaction makes a copy of the latest MutableCFOptions. It should be used
2612 // throughout the compaction procedure to make sure consistency. It will
2613 // eventually be installed into SuperVersion
2614 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2615 if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
2616 // NOTE: try to avoid unnecessary copy of MutableCFOptions if
2617 // compaction is not necessary. Need to make sure mutex is held
2618 // until we make a copy in the following code
2619 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
2620 c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
2621 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
2622
2623 if (c != nullptr) {
2624 bool enough_room = EnoughRoomForCompaction(
2625 cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2626
2627 if (!enough_room) {
2628 // Then don't do the compaction
2629 c->ReleaseCompactionFiles(status);
2630 c->column_family_data()
2631 ->current()
2632 ->storage_info()
2633 ->ComputeCompactionScore(*(c->immutable_cf_options()),
2634 *(c->mutable_cf_options()));
2635 AddToCompactionQueue(cfd);
2636 ++unscheduled_compactions_;
2637
2638 c.reset();
2639 // Don't need to sleep here, because BackgroundCallCompaction
2640 // will sleep if !s.ok()
2641 status = Status::CompactionTooLarge();
2642 } else {
2643 // update statistics
2644 RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
2645 c->inputs(0)->size());
2646 // There are three things that can change compaction score:
2647 // 1) When flush or compaction finish. This case is covered by
2648 // InstallSuperVersionAndScheduleWork
2649 // 2) When MutableCFOptions changes. This case is also covered by
2650 // InstallSuperVersionAndScheduleWork, because this is when the new
2651 // options take effect.
2652 // 3) When we Pick a new compaction, we "remove" those files being
2653 // compacted from the calculation, which then influences compaction
2654 // score. Here we check if we need the new compaction even without the
2655 // files that are currently being compacted. If we need another
2656 // compaction, we might be able to execute it in parallel, so we add
2657 // it to the queue and schedule a new thread.
2658 if (cfd->NeedsCompaction()) {
2659 // Yes, we need more compactions!
2660 AddToCompactionQueue(cfd);
2661 ++unscheduled_compactions_;
2662 MaybeScheduleFlushOrCompaction();
2663 }
2664 }
2665 }
2666 }
2667 }
2668
2669 IOStatus io_s;
2670 if (!c) {
2671 // Nothing to do
2672 ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
2673 } else if (c->deletion_compaction()) {
2674 // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
2675 // file if there is alive snapshot pointing to it
2676 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2677 c->column_family_data());
2678 assert(c->num_input_files(1) == 0);
2679 assert(c->level() == 0);
2680 assert(c->column_family_data()->ioptions()->compaction_style ==
2681 kCompactionStyleFIFO);
2682
2683 compaction_job_stats.num_input_files = c->num_input_files(0);
2684
2685 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2686 compaction_job_stats, job_context->job_id);
2687
2688 for (const auto& f : *c->inputs(0)) {
2689 c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
2690 }
2691 versions_->SetIOStatusOK();
2692 status = versions_->LogAndApply(c->column_family_data(),
2693 *c->mutable_cf_options(), c->edit(),
2694 &mutex_, directories_.GetDbDir());
2695 io_s = versions_->io_status();
2696 InstallSuperVersionAndScheduleWork(c->column_family_data(),
2697 &job_context->superversion_contexts[0],
2698 *c->mutable_cf_options());
2699 ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
2700 c->column_family_data()->GetName().c_str(),
2701 c->num_input_files(0));
2702 *made_progress = true;
2703 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2704 c->column_family_data());
2705 } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
2706 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
2707 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2708 c->column_family_data());
2709 // Instrument for event update
2710 // TODO(yhchiang): add op details for showing trivial-move.
2711 ThreadStatusUtil::SetColumnFamily(
2712 c->column_family_data(), c->column_family_data()->ioptions()->env,
2713 immutable_db_options_.enable_thread_tracking);
2714 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
2715
2716 compaction_job_stats.num_input_files = c->num_input_files(0);
2717
2718 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2719 compaction_job_stats, job_context->job_id);
2720
2721 // Move files to next level
2722 int32_t moved_files = 0;
2723 int64_t moved_bytes = 0;
2724 for (unsigned int l = 0; l < c->num_input_levels(); l++) {
2725 if (c->level(l) == c->output_level()) {
2726 continue;
2727 }
2728 for (size_t i = 0; i < c->num_input_files(l); i++) {
2729 FileMetaData* f = c->input(l, i);
2730 c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
2731 c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
2732 f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
2733 f->largest, f->fd.smallest_seqno,
2734 f->fd.largest_seqno, f->marked_for_compaction,
2735 f->oldest_blob_file_number, f->oldest_ancester_time,
2736 f->file_creation_time, f->file_checksum,
2737 f->file_checksum_func_name);
2738
2739 ROCKS_LOG_BUFFER(
2740 log_buffer,
2741 "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
2742 c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
2743 c->output_level(), f->fd.GetFileSize());
2744 ++moved_files;
2745 moved_bytes += f->fd.GetFileSize();
2746 }
2747 }
2748
2749 versions_->SetIOStatusOK();
2750 status = versions_->LogAndApply(c->column_family_data(),
2751 *c->mutable_cf_options(), c->edit(),
2752 &mutex_, directories_.GetDbDir());
2753 io_s = versions_->io_status();
2754 // Use latest MutableCFOptions
2755 InstallSuperVersionAndScheduleWork(c->column_family_data(),
2756 &job_context->superversion_contexts[0],
2757 *c->mutable_cf_options());
2758
2759 VersionStorageInfo::LevelSummaryStorage tmp;
2760 c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
2761 moved_bytes);
2762 {
2763 event_logger_.LogToBuffer(log_buffer)
2764 << "job" << job_context->job_id << "event"
2765 << "trivial_move"
2766 << "destination_level" << c->output_level() << "files" << moved_files
2767 << "total_files_size" << moved_bytes;
2768 }
2769 ROCKS_LOG_BUFFER(
2770 log_buffer,
2771 "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
2772 c->column_family_data()->GetName().c_str(), moved_files,
2773 c->output_level(), moved_bytes, status.ToString().c_str(),
2774 c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
2775 *made_progress = true;
2776
2777 // Clear Instrument
2778 ThreadStatusUtil::ResetThreadStatus();
2779 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2780 c->column_family_data());
2781 } else if (!is_prepicked && c->output_level() > 0 &&
2782 c->output_level() ==
2783 c->column_family_data()
2784 ->current()
2785 ->storage_info()
2786 ->MaxOutputLevel(
2787 immutable_db_options_.allow_ingest_behind) &&
2788 env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
2789 // Forward compactions involving last level to the bottom pool if it exists,
2790 // such that compactions unlikely to contribute to write stalls can be
2791 // delayed or deprioritized.
2792 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
2793 CompactionArg* ca = new CompactionArg;
2794 ca->db = this;
2795 ca->prepicked_compaction = new PrepickedCompaction;
2796 ca->prepicked_compaction->compaction = c.release();
2797 ca->prepicked_compaction->manual_compaction_state = nullptr;
2798 // Transfer requested token, so it doesn't need to do it again.
2799 ca->prepicked_compaction->task_token = std::move(task_token);
2800 ++bg_bottom_compaction_scheduled_;
2801 env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
2802 this, &DBImpl::UnscheduleCompactionCallback);
2803 } else {
2804 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2805 c->column_family_data());
2806 int output_level __attribute__((__unused__));
2807 output_level = c->output_level();
2808 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
2809 &output_level);
2810 std::vector<SequenceNumber> snapshot_seqs;
2811 SequenceNumber earliest_write_conflict_snapshot;
2812 SnapshotChecker* snapshot_checker;
2813 GetSnapshotContext(job_context, &snapshot_seqs,
2814 &earliest_write_conflict_snapshot, &snapshot_checker);
2815 assert(is_snapshot_supported_ || snapshots_.empty());
2816 CompactionJob compaction_job(
2817 job_context->job_id, c.get(), immutable_db_options_,
2818 file_options_for_compaction_, versions_.get(), &shutting_down_,
2819 preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
2820 GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
2821 &mutex_, &error_handler_, snapshot_seqs,
2822 earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
2823 &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
2824 c->mutable_cf_options()->report_bg_io_stats, dbname_,
2825 &compaction_job_stats, thread_pri,
2826 is_manual ? &manual_compaction_paused_ : nullptr);
2827 compaction_job.Prepare();
2828
2829 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2830 compaction_job_stats, job_context->job_id);
2831
2832 mutex_.Unlock();
2833 TEST_SYNC_POINT_CALLBACK(
2834 "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
2835 compaction_job.Run();
2836 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
2837 mutex_.Lock();
2838
2839 status = compaction_job.Install(*c->mutable_cf_options());
2840 io_s = compaction_job.io_status();
2841 if (status.ok()) {
2842 InstallSuperVersionAndScheduleWork(c->column_family_data(),
2843 &job_context->superversion_contexts[0],
2844 *c->mutable_cf_options());
2845 }
2846 *made_progress = true;
2847 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2848 c->column_family_data());
2849 }
2850
2851 if (status.ok() && !io_s.ok()) {
2852 status = io_s;
2853 }
2854
2855 if (c != nullptr) {
2856 c->ReleaseCompactionFiles(status);
2857 *made_progress = true;
2858
2859 #ifndef ROCKSDB_LITE
2860 // Need to make sure SstFileManager does its bookkeeping
2861 auto sfm = static_cast<SstFileManagerImpl*>(
2862 immutable_db_options_.sst_file_manager.get());
2863 if (sfm && sfm_reserved_compact_space) {
2864 sfm->OnCompactionCompletion(c.get());
2865 }
2866 #endif // ROCKSDB_LITE
2867
2868 NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
2869 compaction_job_stats, job_context->job_id);
2870 }
2871
2872 if (status.ok() || status.IsCompactionTooLarge() ||
2873 status.IsManualCompactionPaused()) {
2874 // Done
2875 } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
2876 // Ignore compaction errors found during shutting down
2877 } else {
2878 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
2879 status.ToString().c_str());
2880 if (!io_s.ok()) {
2881 error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
2882 } else {
2883 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
2884 }
2885 if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
2886 // Put this cfd back in the compaction queue so we can retry after some
2887 // time
2888 auto cfd = c->column_family_data();
2889 assert(cfd != nullptr);
2890 // Since this compaction failed, we need to recompute the score so it
2891 // takes the original input files into account
2892 c->column_family_data()
2893 ->current()
2894 ->storage_info()
2895 ->ComputeCompactionScore(*(c->immutable_cf_options()),
2896 *(c->mutable_cf_options()));
2897 if (!cfd->queued_for_compaction()) {
2898 AddToCompactionQueue(cfd);
2899 ++unscheduled_compactions_;
2900 }
2901 }
2902 }
2903 // this will unref its input_version and column_family_data
2904 c.reset();
2905
2906 if (is_manual) {
2907 ManualCompactionState* m = manual_compaction;
2908 if (!status.ok()) {
2909 m->status = status;
2910 m->done = true;
2911 }
2912 // For universal compaction:
2913 // Because universal compaction always happens at level 0, so one
2914 // compaction will pick up all overlapped files. No files will be
2915 // filtered out due to size limit and left for a successive compaction.
2916 // So we can safely conclude the current compaction.
2917 //
2918 // Also note that, if we don't stop here, then the current compaction
2919 // writes a new file back to level 0, which will be used in successive
2920 // compaction. Hence the manual compaction will never finish.
2921 //
2922 // Stop the compaction if manual_end points to nullptr -- this means
2923 // that we compacted the whole range. manual_end should always point
2924 // to nullptr in case of universal compaction
2925 if (m->manual_end == nullptr) {
2926 m->done = true;
2927 }
2928 if (!m->done) {
2929 // We only compacted part of the requested range. Update *m
2930 // to the range that is left to be compacted.
2931 // Universal and FIFO compactions should always compact the whole range
2932 assert(m->cfd->ioptions()->compaction_style !=
2933 kCompactionStyleUniversal ||
2934 m->cfd->ioptions()->num_levels > 1);
2935 assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
2936 m->tmp_storage = *m->manual_end;
2937 m->begin = &m->tmp_storage;
2938 m->incomplete = true;
2939 }
2940 m->in_progress = false; // not being processed anymore
2941 }
2942 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
2943 return status;
2944 }
2945
HasPendingManualCompaction()2946 bool DBImpl::HasPendingManualCompaction() {
2947 return (!manual_compaction_dequeue_.empty());
2948 }
2949
AddManualCompaction(DBImpl::ManualCompactionState * m)2950 void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
2951 manual_compaction_dequeue_.push_back(m);
2952 }
2953
RemoveManualCompaction(DBImpl::ManualCompactionState * m)2954 void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
2955 // Remove from queue
2956 std::deque<ManualCompactionState*>::iterator it =
2957 manual_compaction_dequeue_.begin();
2958 while (it != manual_compaction_dequeue_.end()) {
2959 if (m == (*it)) {
2960 it = manual_compaction_dequeue_.erase(it);
2961 return;
2962 }
2963 ++it;
2964 }
2965 assert(false);
2966 return;
2967 }
2968
ShouldntRunManualCompaction(ManualCompactionState * m)2969 bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
2970 if (num_running_ingest_file_ > 0) {
2971 // We need to wait for other IngestExternalFile() calls to finish
2972 // before running a manual compaction.
2973 return true;
2974 }
2975 if (m->exclusive) {
2976 return (bg_bottom_compaction_scheduled_ > 0 ||
2977 bg_compaction_scheduled_ > 0);
2978 }
2979 std::deque<ManualCompactionState*>::iterator it =
2980 manual_compaction_dequeue_.begin();
2981 bool seen = false;
2982 while (it != manual_compaction_dequeue_.end()) {
2983 if (m == (*it)) {
2984 ++it;
2985 seen = true;
2986 continue;
2987 } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
2988 // Consider the other manual compaction *it, conflicts if:
2989 // overlaps with m
2990 // and (*it) is ahead in the queue and is not yet in progress
2991 return true;
2992 }
2993 ++it;
2994 }
2995 return false;
2996 }
2997
HaveManualCompaction(ColumnFamilyData * cfd)2998 bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
2999 // Remove from priority queue
3000 std::deque<ManualCompactionState*>::iterator it =
3001 manual_compaction_dequeue_.begin();
3002 while (it != manual_compaction_dequeue_.end()) {
3003 if ((*it)->exclusive) {
3004 return true;
3005 }
3006 if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
3007 // Allow automatic compaction if manual compaction is
3008 // in progress
3009 return true;
3010 }
3011 ++it;
3012 }
3013 return false;
3014 }
3015
HasExclusiveManualCompaction()3016 bool DBImpl::HasExclusiveManualCompaction() {
3017 // Remove from priority queue
3018 std::deque<ManualCompactionState*>::iterator it =
3019 manual_compaction_dequeue_.begin();
3020 while (it != manual_compaction_dequeue_.end()) {
3021 if ((*it)->exclusive) {
3022 return true;
3023 }
3024 ++it;
3025 }
3026 return false;
3027 }
3028
MCOverlap(ManualCompactionState * m,ManualCompactionState * m1)3029 bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
3030 if ((m->exclusive) || (m1->exclusive)) {
3031 return true;
3032 }
3033 if (m->cfd != m1->cfd) {
3034 return false;
3035 }
3036 return true;
3037 }
3038
3039 #ifndef ROCKSDB_LITE
BuildCompactionJobInfo(const ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id,const Version * current,CompactionJobInfo * compaction_job_info) const3040 void DBImpl::BuildCompactionJobInfo(
3041 const ColumnFamilyData* cfd, Compaction* c, const Status& st,
3042 const CompactionJobStats& compaction_job_stats, const int job_id,
3043 const Version* current, CompactionJobInfo* compaction_job_info) const {
3044 assert(compaction_job_info != nullptr);
3045 compaction_job_info->cf_id = cfd->GetID();
3046 compaction_job_info->cf_name = cfd->GetName();
3047 compaction_job_info->status = st;
3048 compaction_job_info->thread_id = env_->GetThreadID();
3049 compaction_job_info->job_id = job_id;
3050 compaction_job_info->base_input_level = c->start_level();
3051 compaction_job_info->output_level = c->output_level();
3052 compaction_job_info->stats = compaction_job_stats;
3053 compaction_job_info->table_properties = c->GetOutputTableProperties();
3054 compaction_job_info->compaction_reason = c->compaction_reason();
3055 compaction_job_info->compression = c->output_compression();
3056 for (size_t i = 0; i < c->num_input_levels(); ++i) {
3057 for (const auto fmd : *c->inputs(i)) {
3058 const FileDescriptor& desc = fmd->fd;
3059 const uint64_t file_number = desc.GetNumber();
3060 auto fn = TableFileName(c->immutable_cf_options()->cf_paths, file_number,
3061 desc.GetPathId());
3062 compaction_job_info->input_files.push_back(fn);
3063 compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
3064 static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
3065 if (compaction_job_info->table_properties.count(fn) == 0) {
3066 std::shared_ptr<const TableProperties> tp;
3067 auto s = current->GetTableProperties(&tp, fmd, &fn);
3068 if (s.ok()) {
3069 compaction_job_info->table_properties[fn] = tp;
3070 }
3071 }
3072 }
3073 }
3074 for (const auto& newf : c->edit()->GetNewFiles()) {
3075 const FileMetaData& meta = newf.second;
3076 const FileDescriptor& desc = meta.fd;
3077 const uint64_t file_number = desc.GetNumber();
3078 compaction_job_info->output_files.push_back(TableFileName(
3079 c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
3080 compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
3081 newf.first, file_number, meta.oldest_blob_file_number});
3082 }
3083 }
3084 #endif
3085
3086 // SuperVersionContext gets created and destructed outside of the lock --
3087 // we use this conveniently to:
3088 // * malloc one SuperVersion() outside of the lock -- new_superversion
3089 // * delete SuperVersion()s outside of the lock -- superversions_to_free
3090 //
3091 // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
3092 // same sv_context, we can't reuse the SuperVersion() that got
3093 // malloced because
3094 // first call already used it. In that rare case, we take a hit and create a
3095 // new SuperVersion() inside of the mutex. We do similar thing
3096 // for superversion_to_free
3097
InstallSuperVersionAndScheduleWork(ColumnFamilyData * cfd,SuperVersionContext * sv_context,const MutableCFOptions & mutable_cf_options)3098 void DBImpl::InstallSuperVersionAndScheduleWork(
3099 ColumnFamilyData* cfd, SuperVersionContext* sv_context,
3100 const MutableCFOptions& mutable_cf_options) {
3101 mutex_.AssertHeld();
3102
3103 // Update max_total_in_memory_state_
3104 size_t old_memtable_size = 0;
3105 auto* old_sv = cfd->GetSuperVersion();
3106 if (old_sv) {
3107 old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
3108 old_sv->mutable_cf_options.max_write_buffer_number;
3109 }
3110
3111 // this branch is unlikely to step in
3112 if (UNLIKELY(sv_context->new_superversion == nullptr)) {
3113 sv_context->NewSuperVersion();
3114 }
3115 cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
3116
3117 // There may be a small data race here. The snapshot tricking bottommost
3118 // compaction may already be released here. But assuming there will always be
3119 // newer snapshot created and released frequently, the compaction will be
3120 // triggered soon anyway.
3121 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3122 for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
3123 bottommost_files_mark_threshold_ = std::min(
3124 bottommost_files_mark_threshold_,
3125 my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
3126 }
3127
3128 // Whenever we install new SuperVersion, we might need to issue new flushes or
3129 // compactions.
3130 SchedulePendingCompaction(cfd);
3131 MaybeScheduleFlushOrCompaction();
3132
3133 // Update max_total_in_memory_state_
3134 max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
3135 mutable_cf_options.write_buffer_size *
3136 mutable_cf_options.max_write_buffer_number;
3137 }
3138
3139 // ShouldPurge is called by FindObsoleteFiles when doing a full scan,
3140 // and db mutex (mutex_) should already be held.
3141 // Actually, the current implementation of FindObsoleteFiles with
3142 // full_scan=true can issue I/O requests to obtain list of files in
3143 // directories, e.g. env_->getChildren while holding db mutex.
ShouldPurge(uint64_t file_number) const3144 bool DBImpl::ShouldPurge(uint64_t file_number) const {
3145 return files_grabbed_for_purge_.find(file_number) ==
3146 files_grabbed_for_purge_.end() &&
3147 purge_files_.find(file_number) == purge_files_.end();
3148 }
3149
3150 // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
3151 // (mutex_) should already be held.
MarkAsGrabbedForPurge(uint64_t file_number)3152 void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
3153 files_grabbed_for_purge_.insert(file_number);
3154 }
3155
SetSnapshotChecker(SnapshotChecker * snapshot_checker)3156 void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
3157 InstrumentedMutexLock l(&mutex_);
3158 // snapshot_checker_ should only set once. If we need to set it multiple
3159 // times, we need to make sure the old one is not deleted while it is still
3160 // using by a compaction job.
3161 assert(!snapshot_checker_);
3162 snapshot_checker_.reset(snapshot_checker);
3163 }
3164
GetSnapshotContext(JobContext * job_context,std::vector<SequenceNumber> * snapshot_seqs,SequenceNumber * earliest_write_conflict_snapshot,SnapshotChecker ** snapshot_checker_ptr)3165 void DBImpl::GetSnapshotContext(
3166 JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
3167 SequenceNumber* earliest_write_conflict_snapshot,
3168 SnapshotChecker** snapshot_checker_ptr) {
3169 mutex_.AssertHeld();
3170 assert(job_context != nullptr);
3171 assert(snapshot_seqs != nullptr);
3172 assert(earliest_write_conflict_snapshot != nullptr);
3173 assert(snapshot_checker_ptr != nullptr);
3174
3175 *snapshot_checker_ptr = snapshot_checker_.get();
3176 if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
3177 *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
3178 }
3179 if (*snapshot_checker_ptr != nullptr) {
3180 // If snapshot_checker is used, that means the flush/compaction may
3181 // contain values not visible to snapshot taken after
3182 // flush/compaction job starts. Take a snapshot and it will appear
3183 // in snapshot_seqs and force compaction iterator to consider such
3184 // snapshots.
3185 const Snapshot* job_snapshot =
3186 GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
3187 job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
3188 }
3189 *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
3190 }
3191 } // namespace ROCKSDB_NAMESPACE
3192