1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10 
11 #include <cinttypes>
12 
13 #include "db/builder.h"
14 #include "db/error_handler.h"
15 #include "db/event_helpers.h"
16 #include "file/sst_file_manager_impl.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "monitoring/perf_context_imp.h"
19 #include "monitoring/thread_status_updater.h"
20 #include "monitoring/thread_status_util.h"
21 #include "test_util/sync_point.h"
22 #include "util/cast_util.h"
23 #include "util/concurrent_task_limiter_impl.h"
24 
25 namespace ROCKSDB_NAMESPACE {
26 
EnoughRoomForCompaction(ColumnFamilyData * cfd,const std::vector<CompactionInputFiles> & inputs,bool * sfm_reserved_compact_space,LogBuffer * log_buffer)27 bool DBImpl::EnoughRoomForCompaction(
28     ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
29     bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
30   // Check if we have enough room to do the compaction
31   bool enough_room = true;
32 #ifndef ROCKSDB_LITE
33   auto sfm = static_cast<SstFileManagerImpl*>(
34       immutable_db_options_.sst_file_manager.get());
35   if (sfm) {
36     // Pass the current bg_error_ to SFM so it can decide what checks to
37     // perform. If this DB instance hasn't seen any error yet, the SFM can be
38     // optimistic and not do disk space checks
39     enough_room =
40         sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError());
41     if (enough_room) {
42       *sfm_reserved_compact_space = true;
43     }
44   }
45 #else
46   (void)cfd;
47   (void)inputs;
48   (void)sfm_reserved_compact_space;
49 #endif  // ROCKSDB_LITE
50   if (!enough_room) {
51     // Just in case tests want to change the value of enough_room
52     TEST_SYNC_POINT_CALLBACK(
53         "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
54     ROCKS_LOG_BUFFER(log_buffer,
55                      "Cancelled compaction because not enough room");
56     RecordTick(stats_, COMPACTION_CANCELLED, 1);
57   }
58   return enough_room;
59 }
60 
RequestCompactionToken(ColumnFamilyData * cfd,bool force,std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)61 bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
62                                     std::unique_ptr<TaskLimiterToken>* token,
63                                     LogBuffer* log_buffer) {
64   assert(*token == nullptr);
65   auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
66       cfd->ioptions()->compaction_thread_limiter.get());
67   if (limiter == nullptr) {
68     return true;
69   }
70   *token = limiter->GetToken(force);
71   if (*token != nullptr) {
72     ROCKS_LOG_BUFFER(log_buffer,
73                      "Thread limiter [%s] increase [%s] compaction task, "
74                      "force: %s, tasks after: %d",
75                      limiter->GetName().c_str(), cfd->GetName().c_str(),
76                      force ? "true" : "false", limiter->GetOutstandingTask());
77     return true;
78   }
79   return false;
80 }
81 
SyncClosedLogs(JobContext * job_context)82 IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
83   TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
84   mutex_.AssertHeld();
85   autovector<log::Writer*, 1> logs_to_sync;
86   uint64_t current_log_number = logfile_number_;
87   while (logs_.front().number < current_log_number &&
88          logs_.front().getting_synced) {
89     log_sync_cv_.Wait();
90   }
91   for (auto it = logs_.begin();
92        it != logs_.end() && it->number < current_log_number; ++it) {
93     auto& log = *it;
94     assert(!log.getting_synced);
95     log.getting_synced = true;
96     logs_to_sync.push_back(log.writer);
97   }
98 
99   IOStatus io_s;
100   if (!logs_to_sync.empty()) {
101     mutex_.Unlock();
102 
103     for (log::Writer* log : logs_to_sync) {
104       ROCKS_LOG_INFO(immutable_db_options_.info_log,
105                      "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
106                      log->get_log_number());
107       io_s = log->file()->Sync(immutable_db_options_.use_fsync);
108       if (!io_s.ok()) {
109         break;
110       }
111 
112       if (immutable_db_options_.recycle_log_file_num > 0) {
113         io_s = log->Close();
114         if (!io_s.ok()) {
115           break;
116         }
117       }
118     }
119     if (io_s.ok()) {
120       io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
121     }
122 
123     mutex_.Lock();
124 
125     // "number <= current_log_number - 1" is equivalent to
126     // "number < current_log_number".
127     MarkLogsSynced(current_log_number - 1, true, io_s);
128     if (!io_s.ok()) {
129       error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
130       TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
131       return io_s;
132     }
133   }
134   return io_s;
135 }
136 
FlushMemTableToOutputFile(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,bool * made_progress,JobContext * job_context,SuperVersionContext * superversion_context,std::vector<SequenceNumber> & snapshot_seqs,SequenceNumber earliest_write_conflict_snapshot,SnapshotChecker * snapshot_checker,LogBuffer * log_buffer,Env::Priority thread_pri)137 Status DBImpl::FlushMemTableToOutputFile(
138     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
139     bool* made_progress, JobContext* job_context,
140     SuperVersionContext* superversion_context,
141     std::vector<SequenceNumber>& snapshot_seqs,
142     SequenceNumber earliest_write_conflict_snapshot,
143     SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
144     Env::Priority thread_pri) {
145   mutex_.AssertHeld();
146   assert(cfd->imm()->NumNotFlushed() != 0);
147   assert(cfd->imm()->IsFlushPending());
148 
149   FlushJob flush_job(
150       dbname_, cfd, immutable_db_options_, mutable_cf_options,
151       nullptr /* memtable_id */, file_options_for_compaction_, versions_.get(),
152       &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
153       snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
154       GetDataDir(cfd, 0U),
155       GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
156       &event_logger_, mutable_cf_options.report_bg_io_stats,
157       true /* sync_output_directory */, true /* write_manifest */, thread_pri);
158   FileMetaData file_meta;
159 
160   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
161   flush_job.PickMemTable();
162   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
163 
164 #ifndef ROCKSDB_LITE
165   // may temporarily unlock and lock the mutex.
166   NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
167 #endif  // ROCKSDB_LITE
168 
169   Status s;
170   IOStatus io_s;
171   if (logfile_number_ > 0 &&
172       versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
173     // If there are more than one column families, we need to make sure that
174     // all the log files except the most recent one are synced. Otherwise if
175     // the host crashes after flushing and before WAL is persistent, the
176     // flushed SST may contain data from write batches whose updates to
177     // other column families are missing.
178     // SyncClosedLogs() may unlock and re-lock the db_mutex.
179     io_s = SyncClosedLogs(job_context);
180     s = io_s;
181   } else {
182     TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
183   }
184 
185   // Within flush_job.Run, rocksdb may call event listener to notify
186   // file creation and deletion.
187   //
188   // Note that flush_job.Run will unlock and lock the db_mutex,
189   // and EventListener callback will be called when the db_mutex
190   // is unlocked by the current thread.
191   if (s.ok()) {
192     s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
193   } else {
194     flush_job.Cancel();
195   }
196   io_s = flush_job.io_status();
197 
198   if (s.ok()) {
199     InstallSuperVersionAndScheduleWork(cfd, superversion_context,
200                                        mutable_cf_options);
201     if (made_progress) {
202       *made_progress = true;
203     }
204     VersionStorageInfo::LevelSummaryStorage tmp;
205     ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
206                      cfd->GetName().c_str(),
207                      cfd->current()->storage_info()->LevelSummary(&tmp));
208   }
209 
210   if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
211     if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
212         !io_s.IsColumnFamilyDropped()) {
213       error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
214     } else {
215       Status new_bg_error = s;
216       error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
217     }
218   }
219   if (s.ok()) {
220 #ifndef ROCKSDB_LITE
221     // may temporarily unlock and lock the mutex.
222     NotifyOnFlushCompleted(cfd, mutable_cf_options,
223                            flush_job.GetCommittedFlushJobsInfo());
224     auto sfm = static_cast<SstFileManagerImpl*>(
225         immutable_db_options_.sst_file_manager.get());
226     if (sfm) {
227       // Notify sst_file_manager that a new file was added
228       std::string file_path = MakeTableFileName(
229           cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
230       sfm->OnAddFile(file_path);
231       if (sfm->IsMaxAllowedSpaceReached()) {
232         Status new_bg_error =
233             Status::SpaceLimit("Max allowed space was reached");
234         TEST_SYNC_POINT_CALLBACK(
235             "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
236             &new_bg_error);
237         error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
238       }
239     }
240 #endif  // ROCKSDB_LITE
241   }
242   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
243   return s;
244 }
245 
FlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)246 Status DBImpl::FlushMemTablesToOutputFiles(
247     const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
248     JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
249   if (immutable_db_options_.atomic_flush) {
250     return AtomicFlushMemTablesToOutputFiles(
251         bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
252   }
253   std::vector<SequenceNumber> snapshot_seqs;
254   SequenceNumber earliest_write_conflict_snapshot;
255   SnapshotChecker* snapshot_checker;
256   GetSnapshotContext(job_context, &snapshot_seqs,
257                      &earliest_write_conflict_snapshot, &snapshot_checker);
258   Status status;
259   for (auto& arg : bg_flush_args) {
260     ColumnFamilyData* cfd = arg.cfd_;
261     MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
262     SuperVersionContext* superversion_context = arg.superversion_context_;
263     Status s = FlushMemTableToOutputFile(
264         cfd, mutable_cf_options, made_progress, job_context,
265         superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
266         snapshot_checker, log_buffer, thread_pri);
267     if (!s.ok()) {
268       status = s;
269       if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
270         // At this point, DB is not shutting down, nor is cfd dropped.
271         // Something is wrong, thus we break out of the loop.
272         break;
273       }
274     }
275   }
276   return status;
277 }
278 
279 /*
280  * Atomically flushes multiple column families.
281  *
282  * For each column family, all memtables with ID smaller than or equal to the
283  * ID specified in bg_flush_args will be flushed. Only after all column
284  * families finish flush will this function commit to MANIFEST. If any of the
285  * column families are not flushed successfully, this function does not have
286  * any side-effect on the state of the database.
287  */
AtomicFlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)288 Status DBImpl::AtomicFlushMemTablesToOutputFiles(
289     const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
290     JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
291   mutex_.AssertHeld();
292 
293   autovector<ColumnFamilyData*> cfds;
294   for (const auto& arg : bg_flush_args) {
295     cfds.emplace_back(arg.cfd_);
296   }
297 
298 #ifndef NDEBUG
299   for (const auto cfd : cfds) {
300     assert(cfd->imm()->NumNotFlushed() != 0);
301     assert(cfd->imm()->IsFlushPending());
302   }
303 #endif /* !NDEBUG */
304 
305   std::vector<SequenceNumber> snapshot_seqs;
306   SequenceNumber earliest_write_conflict_snapshot;
307   SnapshotChecker* snapshot_checker;
308   GetSnapshotContext(job_context, &snapshot_seqs,
309                      &earliest_write_conflict_snapshot, &snapshot_checker);
310 
311   autovector<FSDirectory*> distinct_output_dirs;
312   autovector<std::string> distinct_output_dir_paths;
313   std::vector<std::unique_ptr<FlushJob>> jobs;
314   std::vector<MutableCFOptions> all_mutable_cf_options;
315   int num_cfs = static_cast<int>(cfds.size());
316   all_mutable_cf_options.reserve(num_cfs);
317   for (int i = 0; i < num_cfs; ++i) {
318     auto cfd = cfds[i];
319     FSDirectory* data_dir = GetDataDir(cfd, 0U);
320     const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
321 
322     // Add to distinct output directories if eligible. Use linear search. Since
323     // the number of elements in the vector is not large, performance should be
324     // tolerable.
325     bool found = false;
326     for (const auto& path : distinct_output_dir_paths) {
327       if (path == curr_path) {
328         found = true;
329         break;
330       }
331     }
332     if (!found) {
333       distinct_output_dir_paths.emplace_back(curr_path);
334       distinct_output_dirs.emplace_back(data_dir);
335     }
336 
337     all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
338     const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
339     const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
340     jobs.emplace_back(new FlushJob(
341         dbname_, cfd, immutable_db_options_, mutable_cf_options,
342         max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
343         &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
344         snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
345         data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
346         stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
347         false /* sync_output_directory */, false /* write_manifest */,
348         thread_pri));
349     jobs.back()->PickMemTable();
350   }
351 
352   std::vector<FileMetaData> file_meta(num_cfs);
353   Status s;
354   IOStatus io_s;
355   assert(num_cfs == static_cast<int>(jobs.size()));
356 
357 #ifndef ROCKSDB_LITE
358   for (int i = 0; i != num_cfs; ++i) {
359     const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
360     // may temporarily unlock and lock the mutex.
361     NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
362                        job_context->job_id);
363   }
364 #endif /* !ROCKSDB_LITE */
365 
366   if (logfile_number_ > 0) {
367     // TODO (yanqin) investigate whether we should sync the closed logs for
368     // single column family case.
369     io_s = SyncClosedLogs(job_context);
370     s = io_s;
371   }
372 
373   // exec_status stores the execution status of flush_jobs as
374   // <bool /* executed */, Status /* status code */>
375   autovector<std::pair<bool, Status>> exec_status;
376   autovector<IOStatus> io_status;
377   for (int i = 0; i != num_cfs; ++i) {
378     // Initially all jobs are not executed, with status OK.
379     exec_status.emplace_back(false, Status::OK());
380     io_status.emplace_back(IOStatus::OK());
381   }
382 
383   if (s.ok()) {
384     // TODO (yanqin): parallelize jobs with threads.
385     for (int i = 1; i != num_cfs; ++i) {
386       exec_status[i].second =
387           jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
388       exec_status[i].first = true;
389       io_status[i] = jobs[i]->io_status();
390     }
391     if (num_cfs > 1) {
392       TEST_SYNC_POINT(
393           "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
394       TEST_SYNC_POINT(
395           "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
396     }
397     assert(exec_status.size() > 0);
398     assert(!file_meta.empty());
399     exec_status[0].second =
400         jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
401     exec_status[0].first = true;
402     io_status[0] = jobs[0]->io_status();
403 
404     Status error_status;
405     for (const auto& e : exec_status) {
406       if (!e.second.ok()) {
407         s = e.second;
408         if (!e.second.IsShutdownInProgress() &&
409             !e.second.IsColumnFamilyDropped()) {
410           // If a flush job did not return OK, and the CF is not dropped, and
411           // the DB is not shutting down, then we have to return this result to
412           // caller later.
413           error_status = e.second;
414         }
415       }
416     }
417 
418     s = error_status.ok() ? s : error_status;
419   }
420 
421   if (io_s.ok()) {
422     IOStatus io_error = IOStatus::OK();
423     for (int i = 0; i != static_cast<int>(io_status.size()); i++) {
424       if (!io_status[i].ok() && !io_status[i].IsShutdownInProgress() &&
425           !io_status[i].IsColumnFamilyDropped()) {
426         io_error = io_status[i];
427       }
428     }
429     io_s = io_error;
430     if (s.ok() && !io_s.ok()) {
431       s = io_s;
432     }
433   }
434 
435   if (s.IsColumnFamilyDropped()) {
436     s = Status::OK();
437   }
438 
439   if (s.ok() || s.IsShutdownInProgress()) {
440     // Sync on all distinct output directories.
441     for (auto dir : distinct_output_dirs) {
442       if (dir != nullptr) {
443         Status error_status = dir->Fsync(IOOptions(), nullptr);
444         if (!error_status.ok()) {
445           s = error_status;
446           break;
447         }
448       }
449     }
450   } else {
451     // Need to undo atomic flush if something went wrong, i.e. s is not OK and
452     // it is not because of CF drop.
453     // Have to cancel the flush jobs that have NOT executed because we need to
454     // unref the versions.
455     for (int i = 0; i != num_cfs; ++i) {
456       if (!exec_status[i].first) {
457         jobs[i]->Cancel();
458       }
459     }
460     for (int i = 0; i != num_cfs; ++i) {
461       if (exec_status[i].first && exec_status[i].second.ok()) {
462         auto& mems = jobs[i]->GetMemTables();
463         cfds[i]->imm()->RollbackMemtableFlush(mems,
464                                               file_meta[i].fd.GetNumber());
465       }
466     }
467   }
468 
469   if (s.ok()) {
470     auto wait_to_install_func = [&]() {
471       bool ready = true;
472       for (size_t i = 0; i != cfds.size(); ++i) {
473         const auto& mems = jobs[i]->GetMemTables();
474         if (cfds[i]->IsDropped()) {
475           // If the column family is dropped, then do not wait.
476           continue;
477         } else if (!mems.empty() &&
478                    cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
479           // If a flush job needs to install the flush result for mems and
480           // mems[0] is not the earliest memtable, it means another thread must
481           // be installing flush results for the same column family, then the
482           // current thread needs to wait.
483           ready = false;
484           break;
485         } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
486                                        bg_flush_args[i].max_memtable_id_) {
487           // If a flush job does not need to install flush results, then it has
488           // to wait until all memtables up to max_memtable_id_ (inclusive) are
489           // installed.
490           ready = false;
491           break;
492         }
493       }
494       return ready;
495     };
496 
497     bool resuming_from_bg_err = error_handler_.IsDBStopped();
498     while ((!error_handler_.IsDBStopped() ||
499             error_handler_.GetRecoveryError().ok()) &&
500            !wait_to_install_func()) {
501       atomic_flush_install_cv_.Wait();
502     }
503 
504     s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
505                              : error_handler_.GetBGError();
506   }
507 
508   if (s.ok()) {
509     autovector<ColumnFamilyData*> tmp_cfds;
510     autovector<const autovector<MemTable*>*> mems_list;
511     autovector<const MutableCFOptions*> mutable_cf_options_list;
512     autovector<FileMetaData*> tmp_file_meta;
513     for (int i = 0; i != num_cfs; ++i) {
514       const auto& mems = jobs[i]->GetMemTables();
515       if (!cfds[i]->IsDropped() && !mems.empty()) {
516         tmp_cfds.emplace_back(cfds[i]);
517         mems_list.emplace_back(&mems);
518         mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
519         tmp_file_meta.emplace_back(&file_meta[i]);
520       }
521     }
522 
523     s = InstallMemtableAtomicFlushResults(
524         nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
525         versions_.get(), &mutex_, tmp_file_meta,
526         &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
527   }
528 
529   if (s.ok()) {
530     assert(num_cfs ==
531            static_cast<int>(job_context->superversion_contexts.size()));
532     for (int i = 0; i != num_cfs; ++i) {
533       if (cfds[i]->IsDropped()) {
534         continue;
535       }
536       InstallSuperVersionAndScheduleWork(cfds[i],
537                                          &job_context->superversion_contexts[i],
538                                          all_mutable_cf_options[i]);
539       VersionStorageInfo::LevelSummaryStorage tmp;
540       ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
541                        cfds[i]->GetName().c_str(),
542                        cfds[i]->current()->storage_info()->LevelSummary(&tmp));
543     }
544     if (made_progress) {
545       *made_progress = true;
546     }
547 #ifndef ROCKSDB_LITE
548     auto sfm = static_cast<SstFileManagerImpl*>(
549         immutable_db_options_.sst_file_manager.get());
550     assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
551     for (int i = 0; i != num_cfs; ++i) {
552       if (cfds[i]->IsDropped()) {
553         continue;
554       }
555       NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
556                              jobs[i]->GetCommittedFlushJobsInfo());
557       if (sfm) {
558         std::string file_path = MakeTableFileName(
559             cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
560         sfm->OnAddFile(file_path);
561         if (sfm->IsMaxAllowedSpaceReached() &&
562             error_handler_.GetBGError().ok()) {
563           Status new_bg_error =
564               Status::SpaceLimit("Max allowed space was reached");
565           error_handler_.SetBGError(new_bg_error,
566                                     BackgroundErrorReason::kFlush);
567         }
568       }
569     }
570 #endif  // ROCKSDB_LITE
571   }
572 
573   // Need to undo atomic flush if something went wrong, i.e. s is not OK and
574   // it is not because of CF drop.
575   if (!s.ok() && !s.IsColumnFamilyDropped()) {
576     if (!io_s.ok() && io_s.IsColumnFamilyDropped()) {
577       error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
578     } else {
579       Status new_bg_error = s;
580       error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
581     }
582   }
583 
584   return s;
585 }
586 
NotifyOnFlushBegin(ColumnFamilyData * cfd,FileMetaData * file_meta,const MutableCFOptions & mutable_cf_options,int job_id)587 void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
588                                 const MutableCFOptions& mutable_cf_options,
589                                 int job_id) {
590 #ifndef ROCKSDB_LITE
591   if (immutable_db_options_.listeners.size() == 0U) {
592     return;
593   }
594   mutex_.AssertHeld();
595   if (shutting_down_.load(std::memory_order_acquire)) {
596     return;
597   }
598   bool triggered_writes_slowdown =
599       (cfd->current()->storage_info()->NumLevelFiles(0) >=
600        mutable_cf_options.level0_slowdown_writes_trigger);
601   bool triggered_writes_stop =
602       (cfd->current()->storage_info()->NumLevelFiles(0) >=
603        mutable_cf_options.level0_stop_writes_trigger);
604   // release lock while notifying events
605   mutex_.Unlock();
606   {
607     FlushJobInfo info{};
608     info.cf_id = cfd->GetID();
609     info.cf_name = cfd->GetName();
610     // TODO(yhchiang): make db_paths dynamic in case flush does not
611     //                 go to L0 in the future.
612     const uint64_t file_number = file_meta->fd.GetNumber();
613     info.file_path =
614         MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
615     info.file_number = file_number;
616     info.thread_id = env_->GetThreadID();
617     info.job_id = job_id;
618     info.triggered_writes_slowdown = triggered_writes_slowdown;
619     info.triggered_writes_stop = triggered_writes_stop;
620     info.smallest_seqno = file_meta->fd.smallest_seqno;
621     info.largest_seqno = file_meta->fd.largest_seqno;
622     info.flush_reason = cfd->GetFlushReason();
623     for (auto listener : immutable_db_options_.listeners) {
624       listener->OnFlushBegin(this, info);
625     }
626   }
627   mutex_.Lock();
628 // no need to signal bg_cv_ as it will be signaled at the end of the
629 // flush process.
630 #else
631   (void)cfd;
632   (void)file_meta;
633   (void)mutable_cf_options;
634   (void)job_id;
635 #endif  // ROCKSDB_LITE
636 }
637 
NotifyOnFlushCompleted(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,std::list<std::unique_ptr<FlushJobInfo>> * flush_jobs_info)638 void DBImpl::NotifyOnFlushCompleted(
639     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
640     std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
641 #ifndef ROCKSDB_LITE
642   assert(flush_jobs_info != nullptr);
643   if (immutable_db_options_.listeners.size() == 0U) {
644     return;
645   }
646   mutex_.AssertHeld();
647   if (shutting_down_.load(std::memory_order_acquire)) {
648     return;
649   }
650   bool triggered_writes_slowdown =
651       (cfd->current()->storage_info()->NumLevelFiles(0) >=
652        mutable_cf_options.level0_slowdown_writes_trigger);
653   bool triggered_writes_stop =
654       (cfd->current()->storage_info()->NumLevelFiles(0) >=
655        mutable_cf_options.level0_stop_writes_trigger);
656   // release lock while notifying events
657   mutex_.Unlock();
658   {
659     for (auto& info : *flush_jobs_info) {
660       info->triggered_writes_slowdown = triggered_writes_slowdown;
661       info->triggered_writes_stop = triggered_writes_stop;
662       for (auto listener : immutable_db_options_.listeners) {
663         listener->OnFlushCompleted(this, *info);
664       }
665     }
666     flush_jobs_info->clear();
667   }
668   mutex_.Lock();
669   // no need to signal bg_cv_ as it will be signaled at the end of the
670   // flush process.
671 #else
672   (void)cfd;
673   (void)mutable_cf_options;
674   (void)flush_jobs_info;
675 #endif  // ROCKSDB_LITE
676 }
677 
CompactRange(const CompactRangeOptions & options,ColumnFamilyHandle * column_family,const Slice * begin,const Slice * end)678 Status DBImpl::CompactRange(const CompactRangeOptions& options,
679                             ColumnFamilyHandle* column_family,
680                             const Slice* begin, const Slice* end) {
681   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
682   auto cfd = cfh->cfd();
683 
684   if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
685     return Status::InvalidArgument("Invalid target path ID");
686   }
687 
688   bool flush_needed = true;
689   if (begin != nullptr && end != nullptr) {
690     // TODO(ajkr): We could also optimize away the flush in certain cases where
691     // one/both sides of the interval are unbounded. But it requires more
692     // changes to RangesOverlapWithMemtables.
693     Range range(*begin, *end);
694     SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
695     cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
696     CleanupSuperVersion(super_version);
697   }
698 
699   Status s;
700   if (flush_needed) {
701     FlushOptions fo;
702     fo.allow_write_stall = options.allow_write_stall;
703     if (immutable_db_options_.atomic_flush) {
704       autovector<ColumnFamilyData*> cfds;
705       mutex_.Lock();
706       SelectColumnFamiliesForAtomicFlush(&cfds);
707       mutex_.Unlock();
708       s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
709                                false /* writes_stopped */);
710     } else {
711       s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
712                         false /* writes_stopped*/);
713     }
714     if (!s.ok()) {
715       LogFlush(immutable_db_options_.info_log);
716       return s;
717     }
718   }
719 
720   constexpr int kInvalidLevel = -1;
721   int final_output_level = kInvalidLevel;
722   bool exclusive = options.exclusive_manual_compaction;
723   if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
724       cfd->NumberLevels() > 1) {
725     // Always compact all files together.
726     final_output_level = cfd->NumberLevels() - 1;
727     // if bottom most level is reserved
728     if (immutable_db_options_.allow_ingest_behind) {
729       final_output_level--;
730     }
731     s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
732                             final_output_level, options, begin, end, exclusive,
733                             false, port::kMaxUint64);
734   } else {
735     int first_overlapped_level = kInvalidLevel;
736     int max_overlapped_level = kInvalidLevel;
737     {
738       SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
739       Version* current_version = super_version->current;
740       ReadOptions ro;
741       ro.total_order_seek = true;
742       bool overlap;
743       for (int level = 0;
744            level < current_version->storage_info()->num_non_empty_levels();
745            level++) {
746         overlap = true;
747         if (begin != nullptr && end != nullptr) {
748           Status status = current_version->OverlapWithLevelIterator(
749               ro, file_options_, *begin, *end, level, &overlap);
750           if (!status.ok()) {
751             overlap = current_version->storage_info()->OverlapInLevel(
752                 level, begin, end);
753           }
754         } else {
755           overlap = current_version->storage_info()->OverlapInLevel(level,
756                                                                     begin, end);
757         }
758         if (overlap) {
759           if (first_overlapped_level == kInvalidLevel) {
760             first_overlapped_level = level;
761           }
762           max_overlapped_level = level;
763         }
764       }
765       CleanupSuperVersion(super_version);
766     }
767     if (s.ok() && first_overlapped_level != kInvalidLevel) {
768       // max_file_num_to_ignore can be used to filter out newly created SST
769       // files, useful for bottom level compaction in a manual compaction
770       uint64_t max_file_num_to_ignore = port::kMaxUint64;
771       uint64_t next_file_number = versions_->current_next_file_number();
772       final_output_level = max_overlapped_level;
773       int output_level;
774       for (int level = first_overlapped_level; level <= max_overlapped_level;
775            level++) {
776         // in case the compaction is universal or if we're compacting the
777         // bottom-most level, the output level will be the same as input one.
778         // level 0 can never be the bottommost level (i.e. if all files are in
779         // level 0, we will compact to level 1)
780         if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
781             cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
782           output_level = level;
783         } else if (level == max_overlapped_level && level > 0) {
784           if (options.bottommost_level_compaction ==
785               BottommostLevelCompaction::kSkip) {
786             // Skip bottommost level compaction
787             continue;
788           } else if (options.bottommost_level_compaction ==
789                          BottommostLevelCompaction::kIfHaveCompactionFilter &&
790                      cfd->ioptions()->compaction_filter == nullptr &&
791                      cfd->ioptions()->compaction_filter_factory == nullptr) {
792             // Skip bottommost level compaction since we don't have a compaction
793             // filter
794             continue;
795           }
796           output_level = level;
797           // update max_file_num_to_ignore only for bottom level compaction
798           // because data in newly compacted files in middle levels may still
799           // need to be pushed down
800           max_file_num_to_ignore = next_file_number;
801         } else {
802           output_level = level + 1;
803           if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
804               cfd->ioptions()->level_compaction_dynamic_level_bytes &&
805               level == 0) {
806             output_level = ColumnFamilyData::kCompactToBaseLevel;
807           }
808         }
809         s = RunManualCompaction(cfd, level, output_level, options, begin, end,
810                                 exclusive, false, max_file_num_to_ignore);
811         if (!s.ok()) {
812           break;
813         }
814         if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
815           final_output_level = cfd->NumberLevels() - 1;
816         } else if (output_level > final_output_level) {
817           final_output_level = output_level;
818         }
819         TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
820         TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
821       }
822     }
823   }
824   if (!s.ok() || final_output_level == kInvalidLevel) {
825     LogFlush(immutable_db_options_.info_log);
826     return s;
827   }
828 
829   if (options.change_level) {
830     ROCKS_LOG_INFO(immutable_db_options_.info_log,
831                    "[RefitLevel] waiting for background threads to stop");
832     s = PauseBackgroundWork();
833     if (s.ok()) {
834       s = ReFitLevel(cfd, final_output_level, options.target_level);
835     }
836     ContinueBackgroundWork();
837   }
838   LogFlush(immutable_db_options_.info_log);
839 
840   {
841     InstrumentedMutexLock l(&mutex_);
842     // an automatic compaction that has been scheduled might have been
843     // preempted by the manual compactions. Need to schedule it back.
844     MaybeScheduleFlushOrCompaction();
845   }
846 
847   return s;
848 }
849 
CompactFiles(const CompactionOptions & compact_options,ColumnFamilyHandle * column_family,const std::vector<std::string> & input_file_names,const int output_level,const int output_path_id,std::vector<std::string> * const output_file_names,CompactionJobInfo * compaction_job_info)850 Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
851                             ColumnFamilyHandle* column_family,
852                             const std::vector<std::string>& input_file_names,
853                             const int output_level, const int output_path_id,
854                             std::vector<std::string>* const output_file_names,
855                             CompactionJobInfo* compaction_job_info) {
856 #ifdef ROCKSDB_LITE
857   (void)compact_options;
858   (void)column_family;
859   (void)input_file_names;
860   (void)output_level;
861   (void)output_path_id;
862   (void)output_file_names;
863   (void)compaction_job_info;
864   // not supported in lite version
865   return Status::NotSupported("Not supported in ROCKSDB LITE");
866 #else
867   if (column_family == nullptr) {
868     return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
869   }
870 
871   auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
872   assert(cfd);
873 
874   Status s;
875   JobContext job_context(0, true);
876   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
877                        immutable_db_options_.info_log.get());
878 
879   // Perform CompactFiles
880   TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
881   {
882     InstrumentedMutexLock l(&mutex_);
883 
884     // This call will unlock/lock the mutex to wait for current running
885     // IngestExternalFile() calls to finish.
886     WaitForIngestFile();
887 
888     // We need to get current after `WaitForIngestFile`, because
889     // `IngestExternalFile` may add files that overlap with `input_file_names`
890     auto* current = cfd->current();
891     current->Ref();
892 
893     s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
894                          output_file_names, output_level, output_path_id,
895                          &job_context, &log_buffer, compaction_job_info);
896 
897     current->Unref();
898   }
899 
900   // Find and delete obsolete files
901   {
902     InstrumentedMutexLock l(&mutex_);
903     // If !s.ok(), this means that Compaction failed. In that case, we want
904     // to delete all obsolete files we might have created and we force
905     // FindObsoleteFiles(). This is because job_context does not
906     // catch all created files if compaction failed.
907     FindObsoleteFiles(&job_context, !s.ok());
908   }  // release the mutex
909 
910   // delete unnecessary files if any, this is done outside the mutex
911   if (job_context.HaveSomethingToClean() ||
912       job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
913     // Have to flush the info logs before bg_compaction_scheduled_--
914     // because if bg_flush_scheduled_ becomes 0 and the lock is
915     // released, the deconstructor of DB can kick in and destroy all the
916     // states of DB so info_log might not be available after that point.
917     // It also applies to access other states that DB owns.
918     log_buffer.FlushBufferToLog();
919     if (job_context.HaveSomethingToDelete()) {
920       // no mutex is locked here.  No need to Unlock() and Lock() here.
921       PurgeObsoleteFiles(job_context);
922     }
923     job_context.Clean();
924   }
925 
926   return s;
927 #endif  // ROCKSDB_LITE
928 }
929 
930 #ifndef ROCKSDB_LITE
CompactFilesImpl(const CompactionOptions & compact_options,ColumnFamilyData * cfd,Version * version,const std::vector<std::string> & input_file_names,std::vector<std::string> * const output_file_names,const int output_level,int output_path_id,JobContext * job_context,LogBuffer * log_buffer,CompactionJobInfo * compaction_job_info)931 Status DBImpl::CompactFilesImpl(
932     const CompactionOptions& compact_options, ColumnFamilyData* cfd,
933     Version* version, const std::vector<std::string>& input_file_names,
934     std::vector<std::string>* const output_file_names, const int output_level,
935     int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
936     CompactionJobInfo* compaction_job_info) {
937   mutex_.AssertHeld();
938 
939   if (shutting_down_.load(std::memory_order_acquire)) {
940     return Status::ShutdownInProgress();
941   }
942   if (manual_compaction_paused_.load(std::memory_order_acquire)) {
943     return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
944   }
945 
946   std::unordered_set<uint64_t> input_set;
947   for (const auto& file_name : input_file_names) {
948     input_set.insert(TableFileNameToNumber(file_name));
949   }
950 
951   ColumnFamilyMetaData cf_meta;
952   // TODO(yhchiang): can directly use version here if none of the
953   // following functions call is pluggable to external developers.
954   version->GetColumnFamilyMetaData(&cf_meta);
955 
956   if (output_path_id < 0) {
957     if (cfd->ioptions()->cf_paths.size() == 1U) {
958       output_path_id = 0;
959     } else {
960       return Status::NotSupported(
961           "Automatic output path selection is not "
962           "yet supported in CompactFiles()");
963     }
964   }
965 
966   Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
967       &input_set, cf_meta, output_level);
968   if (!s.ok()) {
969     return s;
970   }
971 
972   std::vector<CompactionInputFiles> input_files;
973   s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
974       &input_files, &input_set, version->storage_info(), compact_options);
975   if (!s.ok()) {
976     return s;
977   }
978 
979   for (const auto& inputs : input_files) {
980     if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
981       return Status::Aborted(
982           "Some of the necessary compaction input "
983           "files are already being compacted");
984     }
985   }
986   bool sfm_reserved_compact_space = false;
987   // First check if we have enough room to do the compaction
988   bool enough_room = EnoughRoomForCompaction(
989       cfd, input_files, &sfm_reserved_compact_space, log_buffer);
990 
991   if (!enough_room) {
992     // m's vars will get set properly at the end of this function,
993     // as long as status == CompactionTooLarge
994     return Status::CompactionTooLarge();
995   }
996 
997   // At this point, CompactFiles will be run.
998   bg_compaction_scheduled_++;
999 
1000   std::unique_ptr<Compaction> c;
1001   assert(cfd->compaction_picker());
1002   c.reset(cfd->compaction_picker()->CompactFiles(
1003       compact_options, input_files, output_level, version->storage_info(),
1004       *cfd->GetLatestMutableCFOptions(), output_path_id));
1005   // we already sanitized the set of input files and checked for conflicts
1006   // without releasing the lock, so we're guaranteed a compaction can be formed.
1007   assert(c != nullptr);
1008 
1009   c->SetInputVersion(version);
1010   // deletion compaction currently not allowed in CompactFiles.
1011   assert(!c->deletion_compaction());
1012 
1013   std::vector<SequenceNumber> snapshot_seqs;
1014   SequenceNumber earliest_write_conflict_snapshot;
1015   SnapshotChecker* snapshot_checker;
1016   GetSnapshotContext(job_context, &snapshot_seqs,
1017                      &earliest_write_conflict_snapshot, &snapshot_checker);
1018 
1019   std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1020       new std::list<uint64_t>::iterator(
1021           CaptureCurrentFileNumberInPendingOutputs()));
1022 
1023   assert(is_snapshot_supported_ || snapshots_.empty());
1024   CompactionJobStats compaction_job_stats;
1025   CompactionJob compaction_job(
1026       job_context->job_id, c.get(), immutable_db_options_,
1027       file_options_for_compaction_, versions_.get(), &shutting_down_,
1028       preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
1029       GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
1030       &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
1031       snapshot_checker, table_cache_, &event_logger_,
1032       c->mutable_cf_options()->paranoid_file_checks,
1033       c->mutable_cf_options()->report_bg_io_stats, dbname_,
1034       &compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_);
1035 
1036   // Creating a compaction influences the compaction score because the score
1037   // takes running compactions into account (by skipping files that are already
1038   // being compacted). Since we just changed compaction score, we recalculate it
1039   // here.
1040   version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
1041                                                   *c->mutable_cf_options());
1042 
1043   compaction_job.Prepare();
1044 
1045   mutex_.Unlock();
1046   TEST_SYNC_POINT("CompactFilesImpl:0");
1047   TEST_SYNC_POINT("CompactFilesImpl:1");
1048   compaction_job.Run();
1049   TEST_SYNC_POINT("CompactFilesImpl:2");
1050   TEST_SYNC_POINT("CompactFilesImpl:3");
1051   mutex_.Lock();
1052 
1053   Status status = compaction_job.Install(*c->mutable_cf_options());
1054   if (status.ok()) {
1055     InstallSuperVersionAndScheduleWork(c->column_family_data(),
1056                                        &job_context->superversion_contexts[0],
1057                                        *c->mutable_cf_options());
1058   }
1059   c->ReleaseCompactionFiles(s);
1060 #ifndef ROCKSDB_LITE
1061   // Need to make sure SstFileManager does its bookkeeping
1062   auto sfm = static_cast<SstFileManagerImpl*>(
1063       immutable_db_options_.sst_file_manager.get());
1064   if (sfm && sfm_reserved_compact_space) {
1065     sfm->OnCompactionCompletion(c.get());
1066   }
1067 #endif  // ROCKSDB_LITE
1068 
1069   ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1070 
1071   if (compaction_job_info != nullptr) {
1072     BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
1073                            job_context->job_id, version, compaction_job_info);
1074   }
1075 
1076   if (status.ok()) {
1077     // Done
1078   } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
1079     // Ignore compaction errors found during shutting down
1080   } else if (status.IsManualCompactionPaused()) {
1081     // Don't report stopping manual compaction as error
1082     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1083                    "[%s] [JOB %d] Stopping manual compaction",
1084                    c->column_family_data()->GetName().c_str(),
1085                    job_context->job_id);
1086   } else {
1087     ROCKS_LOG_WARN(immutable_db_options_.info_log,
1088                    "[%s] [JOB %d] Compaction error: %s",
1089                    c->column_family_data()->GetName().c_str(),
1090                    job_context->job_id, status.ToString().c_str());
1091     error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
1092   }
1093 
1094   if (output_file_names != nullptr) {
1095     for (const auto& newf : c->edit()->GetNewFiles()) {
1096       (*output_file_names)
1097           .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
1098                                    newf.second.fd.GetNumber(),
1099                                    newf.second.fd.GetPathId()));
1100     }
1101   }
1102 
1103   c.reset();
1104 
1105   bg_compaction_scheduled_--;
1106   if (bg_compaction_scheduled_ == 0) {
1107     bg_cv_.SignalAll();
1108   }
1109   MaybeScheduleFlushOrCompaction();
1110   TEST_SYNC_POINT("CompactFilesImpl:End");
1111 
1112   return status;
1113 }
1114 #endif  // ROCKSDB_LITE
1115 
PauseBackgroundWork()1116 Status DBImpl::PauseBackgroundWork() {
1117   InstrumentedMutexLock guard_lock(&mutex_);
1118   bg_compaction_paused_++;
1119   while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
1120          bg_flush_scheduled_ > 0) {
1121     bg_cv_.Wait();
1122   }
1123   bg_work_paused_++;
1124   return Status::OK();
1125 }
1126 
ContinueBackgroundWork()1127 Status DBImpl::ContinueBackgroundWork() {
1128   InstrumentedMutexLock guard_lock(&mutex_);
1129   if (bg_work_paused_ == 0) {
1130     return Status::InvalidArgument();
1131   }
1132   assert(bg_work_paused_ > 0);
1133   assert(bg_compaction_paused_ > 0);
1134   bg_compaction_paused_--;
1135   bg_work_paused_--;
1136   // It's sufficient to check just bg_work_paused_ here since
1137   // bg_work_paused_ is always no greater than bg_compaction_paused_
1138   if (bg_work_paused_ == 0) {
1139     MaybeScheduleFlushOrCompaction();
1140   }
1141   return Status::OK();
1142 }
1143 
NotifyOnCompactionBegin(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & job_stats,int job_id)1144 void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1145                                      const Status& st,
1146                                      const CompactionJobStats& job_stats,
1147                                      int job_id) {
1148 #ifndef ROCKSDB_LITE
1149   if (immutable_db_options_.listeners.empty()) {
1150     return;
1151   }
1152   mutex_.AssertHeld();
1153   if (shutting_down_.load(std::memory_order_acquire)) {
1154     return;
1155   }
1156   if (c->is_manual_compaction() &&
1157       manual_compaction_paused_.load(std::memory_order_acquire)) {
1158     return;
1159   }
1160   Version* current = cfd->current();
1161   current->Ref();
1162   // release lock while notifying events
1163   mutex_.Unlock();
1164   TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
1165   {
1166     CompactionJobInfo info{};
1167     info.cf_name = cfd->GetName();
1168     info.status = st;
1169     info.thread_id = env_->GetThreadID();
1170     info.job_id = job_id;
1171     info.base_input_level = c->start_level();
1172     info.output_level = c->output_level();
1173     info.stats = job_stats;
1174     info.table_properties = c->GetOutputTableProperties();
1175     info.compaction_reason = c->compaction_reason();
1176     info.compression = c->output_compression();
1177     for (size_t i = 0; i < c->num_input_levels(); ++i) {
1178       for (const auto fmd : *c->inputs(i)) {
1179         const FileDescriptor& desc = fmd->fd;
1180         const uint64_t file_number = desc.GetNumber();
1181         auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
1182                                 file_number, desc.GetPathId());
1183         info.input_files.push_back(fn);
1184         info.input_file_infos.push_back(CompactionFileInfo{
1185             static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
1186         if (info.table_properties.count(fn) == 0) {
1187           std::shared_ptr<const TableProperties> tp;
1188           auto s = current->GetTableProperties(&tp, fmd, &fn);
1189           if (s.ok()) {
1190             info.table_properties[fn] = tp;
1191           }
1192         }
1193       }
1194     }
1195     for (const auto& newf : c->edit()->GetNewFiles()) {
1196       const FileMetaData& meta = newf.second;
1197       const FileDescriptor& desc = meta.fd;
1198       const uint64_t file_number = desc.GetNumber();
1199       info.output_files.push_back(TableFileName(
1200           c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
1201       info.output_file_infos.push_back(CompactionFileInfo{
1202           newf.first, file_number, meta.oldest_blob_file_number});
1203     }
1204     for (auto listener : immutable_db_options_.listeners) {
1205       listener->OnCompactionBegin(this, info);
1206     }
1207   }
1208   mutex_.Lock();
1209   current->Unref();
1210 #else
1211   (void)cfd;
1212   (void)c;
1213   (void)st;
1214   (void)job_stats;
1215   (void)job_id;
1216 #endif  // ROCKSDB_LITE
1217 }
1218 
NotifyOnCompactionCompleted(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id)1219 void DBImpl::NotifyOnCompactionCompleted(
1220     ColumnFamilyData* cfd, Compaction* c, const Status& st,
1221     const CompactionJobStats& compaction_job_stats, const int job_id) {
1222 #ifndef ROCKSDB_LITE
1223   if (immutable_db_options_.listeners.size() == 0U) {
1224     return;
1225   }
1226   mutex_.AssertHeld();
1227   if (shutting_down_.load(std::memory_order_acquire)) {
1228     return;
1229   }
1230   if (c->is_manual_compaction() &&
1231       manual_compaction_paused_.load(std::memory_order_acquire)) {
1232     return;
1233   }
1234   Version* current = cfd->current();
1235   current->Ref();
1236   // release lock while notifying events
1237   mutex_.Unlock();
1238   TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
1239   {
1240     CompactionJobInfo info{};
1241     BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
1242                            &info);
1243     for (auto listener : immutable_db_options_.listeners) {
1244       listener->OnCompactionCompleted(this, info);
1245     }
1246   }
1247   mutex_.Lock();
1248   current->Unref();
1249   // no need to signal bg_cv_ as it will be signaled at the end of the
1250   // flush process.
1251 #else
1252   (void)cfd;
1253   (void)c;
1254   (void)st;
1255   (void)compaction_job_stats;
1256   (void)job_id;
1257 #endif  // ROCKSDB_LITE
1258 }
1259 
1260 // REQUIREMENT: block all background work by calling PauseBackgroundWork()
1261 // before calling this function
ReFitLevel(ColumnFamilyData * cfd,int level,int target_level)1262 Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
1263   assert(level < cfd->NumberLevels());
1264   if (target_level >= cfd->NumberLevels()) {
1265     return Status::InvalidArgument("Target level exceeds number of levels");
1266   }
1267 
1268   SuperVersionContext sv_context(/* create_superversion */ true);
1269 
1270   Status status;
1271 
1272   InstrumentedMutexLock guard_lock(&mutex_);
1273 
1274   // only allow one thread refitting
1275   if (refitting_level_) {
1276     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1277                    "[ReFitLevel] another thread is refitting");
1278     return Status::NotSupported("another thread is refitting");
1279   }
1280   refitting_level_ = true;
1281 
1282   const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1283   // move to a smaller level
1284   int to_level = target_level;
1285   if (target_level < 0) {
1286     to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1287   }
1288 
1289   auto* vstorage = cfd->current()->storage_info();
1290   if (to_level > level) {
1291     if (level == 0) {
1292       return Status::NotSupported(
1293           "Cannot change from level 0 to other levels.");
1294     }
1295     // Check levels are empty for a trivial move
1296     for (int l = level + 1; l <= to_level; l++) {
1297       if (vstorage->NumLevelFiles(l) > 0) {
1298         return Status::NotSupported(
1299             "Levels between source and target are not empty for a move.");
1300       }
1301     }
1302   }
1303   if (to_level != level) {
1304     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1305                     "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
1306                     cfd->current()->DebugString().data());
1307 
1308     VersionEdit edit;
1309     edit.SetColumnFamily(cfd->GetID());
1310     for (const auto& f : vstorage->LevelFiles(level)) {
1311       edit.DeleteFile(level, f->fd.GetNumber());
1312       edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
1313                    f->fd.GetFileSize(), f->smallest, f->largest,
1314                    f->fd.smallest_seqno, f->fd.largest_seqno,
1315                    f->marked_for_compaction, f->oldest_blob_file_number,
1316                    f->oldest_ancester_time, f->file_creation_time,
1317                    f->file_checksum, f->file_checksum_func_name);
1318     }
1319     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1320                     "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
1321                     edit.DebugString().data());
1322 
1323     status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
1324                                     directories_.GetDbDir());
1325     InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
1326 
1327     ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
1328                     cfd->GetName().c_str(), status.ToString().data());
1329 
1330     if (status.ok()) {
1331       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1332                       "[%s] After refitting:\n%s", cfd->GetName().c_str(),
1333                       cfd->current()->DebugString().data());
1334     }
1335   }
1336 
1337   sv_context.Clean();
1338   refitting_level_ = false;
1339 
1340   return status;
1341 }
1342 
NumberLevels(ColumnFamilyHandle * column_family)1343 int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
1344   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1345   return cfh->cfd()->NumberLevels();
1346 }
1347 
MaxMemCompactionLevel(ColumnFamilyHandle *)1348 int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
1349   return 0;
1350 }
1351 
Level0StopWriteTrigger(ColumnFamilyHandle * column_family)1352 int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
1353   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1354   InstrumentedMutexLock l(&mutex_);
1355   return cfh->cfd()
1356       ->GetSuperVersion()
1357       ->mutable_cf_options.level0_stop_writes_trigger;
1358 }
1359 
Flush(const FlushOptions & flush_options,ColumnFamilyHandle * column_family)1360 Status DBImpl::Flush(const FlushOptions& flush_options,
1361                      ColumnFamilyHandle* column_family) {
1362   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1363   ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
1364                  cfh->GetName().c_str());
1365   Status s;
1366   if (immutable_db_options_.atomic_flush) {
1367     s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
1368                              FlushReason::kManualFlush);
1369   } else {
1370     s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
1371   }
1372 
1373   ROCKS_LOG_INFO(immutable_db_options_.info_log,
1374                  "[%s] Manual flush finished, status: %s\n",
1375                  cfh->GetName().c_str(), s.ToString().c_str());
1376   return s;
1377 }
1378 
Flush(const FlushOptions & flush_options,const std::vector<ColumnFamilyHandle * > & column_families)1379 Status DBImpl::Flush(const FlushOptions& flush_options,
1380                      const std::vector<ColumnFamilyHandle*>& column_families) {
1381   Status s;
1382   if (!immutable_db_options_.atomic_flush) {
1383     for (auto cfh : column_families) {
1384       s = Flush(flush_options, cfh);
1385       if (!s.ok()) {
1386         break;
1387       }
1388     }
1389   } else {
1390     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1391                    "Manual atomic flush start.\n"
1392                    "=====Column families:=====");
1393     for (auto cfh : column_families) {
1394       auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1395       ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1396                      cfhi->GetName().c_str());
1397     }
1398     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1399                    "=====End of column families list=====");
1400     autovector<ColumnFamilyData*> cfds;
1401     std::for_each(column_families.begin(), column_families.end(),
1402                   [&cfds](ColumnFamilyHandle* elem) {
1403                     auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
1404                     cfds.emplace_back(cfh->cfd());
1405                   });
1406     s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
1407     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1408                    "Manual atomic flush finished, status: %s\n"
1409                    "=====Column families:=====",
1410                    s.ToString().c_str());
1411     for (auto cfh : column_families) {
1412       auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1413       ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1414                      cfhi->GetName().c_str());
1415     }
1416     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1417                    "=====End of column families list=====");
1418   }
1419   return s;
1420 }
1421 
RunManualCompaction(ColumnFamilyData * cfd,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const Slice * begin,const Slice * end,bool exclusive,bool disallow_trivial_move,uint64_t max_file_num_to_ignore)1422 Status DBImpl::RunManualCompaction(
1423     ColumnFamilyData* cfd, int input_level, int output_level,
1424     const CompactRangeOptions& compact_range_options, const Slice* begin,
1425     const Slice* end, bool exclusive, bool disallow_trivial_move,
1426     uint64_t max_file_num_to_ignore) {
1427   assert(input_level == ColumnFamilyData::kCompactAllLevels ||
1428          input_level >= 0);
1429 
1430   InternalKey begin_storage, end_storage;
1431   CompactionArg* ca;
1432 
1433   bool scheduled = false;
1434   bool manual_conflict = false;
1435   ManualCompactionState manual;
1436   manual.cfd = cfd;
1437   manual.input_level = input_level;
1438   manual.output_level = output_level;
1439   manual.output_path_id = compact_range_options.target_path_id;
1440   manual.done = false;
1441   manual.in_progress = false;
1442   manual.incomplete = false;
1443   manual.exclusive = exclusive;
1444   manual.disallow_trivial_move = disallow_trivial_move;
1445   // For universal compaction, we enforce every manual compaction to compact
1446   // all files.
1447   if (begin == nullptr ||
1448       cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1449       cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1450     manual.begin = nullptr;
1451   } else {
1452     begin_storage.SetMinPossibleForUserKey(*begin);
1453     manual.begin = &begin_storage;
1454   }
1455   if (end == nullptr ||
1456       cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1457       cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1458     manual.end = nullptr;
1459   } else {
1460     end_storage.SetMaxPossibleForUserKey(*end);
1461     manual.end = &end_storage;
1462   }
1463 
1464   TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
1465   TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
1466   InstrumentedMutexLock l(&mutex_);
1467 
1468   // When a manual compaction arrives, temporarily disable scheduling of
1469   // non-manual compactions and wait until the number of scheduled compaction
1470   // jobs drops to zero. This is needed to ensure that this manual compaction
1471   // can compact any range of keys/files.
1472   //
1473   // HasPendingManualCompaction() is true when at least one thread is inside
1474   // RunManualCompaction(), i.e. during that time no other compaction will
1475   // get scheduled (see MaybeScheduleFlushOrCompaction).
1476   //
1477   // Note that the following loop doesn't stop more that one thread calling
1478   // RunManualCompaction() from getting to the second while loop below.
1479   // However, only one of them will actually schedule compaction, while
1480   // others will wait on a condition variable until it completes.
1481 
1482   AddManualCompaction(&manual);
1483   TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
1484   if (exclusive) {
1485     while (bg_bottom_compaction_scheduled_ > 0 ||
1486            bg_compaction_scheduled_ > 0) {
1487       TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
1488       ROCKS_LOG_INFO(
1489           immutable_db_options_.info_log,
1490           "[%s] Manual compaction waiting for all other scheduled background "
1491           "compactions to finish",
1492           cfd->GetName().c_str());
1493       bg_cv_.Wait();
1494     }
1495   }
1496 
1497   ROCKS_LOG_INFO(immutable_db_options_.info_log,
1498                  "[%s] Manual compaction starting", cfd->GetName().c_str());
1499 
1500   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1501                        immutable_db_options_.info_log.get());
1502   // We don't check bg_error_ here, because if we get the error in compaction,
1503   // the compaction will set manual.status to bg_error_ and set manual.done to
1504   // true.
1505   while (!manual.done) {
1506     assert(HasPendingManualCompaction());
1507     manual_conflict = false;
1508     Compaction* compaction = nullptr;
1509     if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
1510         scheduled ||
1511         (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1512          ((compaction = manual.cfd->CompactRange(
1513                *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
1514                manual.output_level, compact_range_options, manual.begin,
1515                manual.end, &manual.manual_end, &manual_conflict,
1516                max_file_num_to_ignore)) == nullptr &&
1517           manual_conflict))) {
1518       // exclusive manual compactions should not see a conflict during
1519       // CompactRange
1520       assert(!exclusive || !manual_conflict);
1521       // Running either this or some other manual compaction
1522       bg_cv_.Wait();
1523       if (scheduled && manual.incomplete == true) {
1524         assert(!manual.in_progress);
1525         scheduled = false;
1526         manual.incomplete = false;
1527       }
1528     } else if (!scheduled) {
1529       if (compaction == nullptr) {
1530         manual.done = true;
1531         bg_cv_.SignalAll();
1532         continue;
1533       }
1534       ca = new CompactionArg;
1535       ca->db = this;
1536       ca->prepicked_compaction = new PrepickedCompaction;
1537       ca->prepicked_compaction->manual_compaction_state = &manual;
1538       ca->prepicked_compaction->compaction = compaction;
1539       if (!RequestCompactionToken(
1540               cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
1541         // Don't throttle manual compaction, only count outstanding tasks.
1542         assert(false);
1543       }
1544       manual.incomplete = false;
1545       bg_compaction_scheduled_++;
1546       Env::Priority thread_pool_pri = Env::Priority::LOW;
1547       if (compaction->bottommost_level() &&
1548           env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
1549         thread_pool_pri = Env::Priority::BOTTOM;
1550       }
1551       env_->Schedule(&DBImpl::BGWorkCompaction, ca, thread_pool_pri, this,
1552                      &DBImpl::UnscheduleCompactionCallback);
1553       scheduled = true;
1554     }
1555   }
1556 
1557   log_buffer.FlushBufferToLog();
1558   assert(!manual.in_progress);
1559   assert(HasPendingManualCompaction());
1560   RemoveManualCompaction(&manual);
1561   bg_cv_.SignalAll();
1562   return manual.status;
1563 }
1564 
GenerateFlushRequest(const autovector<ColumnFamilyData * > & cfds,FlushRequest * req)1565 void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
1566                                   FlushRequest* req) {
1567   assert(req != nullptr);
1568   req->reserve(cfds.size());
1569   for (const auto cfd : cfds) {
1570     if (nullptr == cfd) {
1571       // cfd may be null, see DBImpl::ScheduleFlushes
1572       continue;
1573     }
1574     uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
1575     req->emplace_back(cfd, max_memtable_id);
1576   }
1577 }
1578 
FlushMemTable(ColumnFamilyData * cfd,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1579 Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
1580                              const FlushOptions& flush_options,
1581                              FlushReason flush_reason, bool writes_stopped) {
1582   Status s;
1583   uint64_t flush_memtable_id = 0;
1584   if (!flush_options.allow_write_stall) {
1585     bool flush_needed = true;
1586     s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1587     TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
1588     if (!s.ok() || !flush_needed) {
1589       return s;
1590     }
1591   }
1592   FlushRequest flush_req;
1593   {
1594     WriteContext context;
1595     InstrumentedMutexLock guard_lock(&mutex_);
1596 
1597     WriteThread::Writer w;
1598     WriteThread::Writer nonmem_w;
1599     if (!writes_stopped) {
1600       write_thread_.EnterUnbatched(&w, &mutex_);
1601       if (two_write_queues_) {
1602         nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1603       }
1604     }
1605     WaitForPendingWrites();
1606 
1607     if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
1608       s = SwitchMemtable(cfd, &context);
1609     }
1610     if (s.ok()) {
1611       if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1612           !cached_recoverable_state_empty_.load()) {
1613         flush_memtable_id = cfd->imm()->GetLatestMemTableID();
1614         flush_req.emplace_back(cfd, flush_memtable_id);
1615       }
1616       if (immutable_db_options_.persist_stats_to_disk) {
1617         ColumnFamilyData* cfd_stats =
1618             versions_->GetColumnFamilySet()->GetColumnFamily(
1619                 kPersistentStatsColumnFamilyName);
1620         if (cfd_stats != nullptr && cfd_stats != cfd &&
1621             !cfd_stats->mem()->IsEmpty()) {
1622           // only force flush stats CF when it will be the only CF lagging
1623           // behind after the current flush
1624           bool stats_cf_flush_needed = true;
1625           for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
1626             if (loop_cfd == cfd_stats || loop_cfd == cfd) {
1627               continue;
1628             }
1629             if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
1630               stats_cf_flush_needed = false;
1631             }
1632           }
1633           if (stats_cf_flush_needed) {
1634             ROCKS_LOG_INFO(immutable_db_options_.info_log,
1635                            "Force flushing stats CF with manual flush of %s "
1636                            "to avoid holding old logs",
1637                            cfd->GetName().c_str());
1638             s = SwitchMemtable(cfd_stats, &context);
1639             flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID();
1640             flush_req.emplace_back(cfd_stats, flush_memtable_id);
1641           }
1642         }
1643       }
1644     }
1645 
1646     if (s.ok() && !flush_req.empty()) {
1647       for (auto& elem : flush_req) {
1648         ColumnFamilyData* loop_cfd = elem.first;
1649         loop_cfd->imm()->FlushRequested();
1650       }
1651       // If the caller wants to wait for this flush to complete, it indicates
1652       // that the caller expects the ColumnFamilyData not to be free'ed by
1653       // other threads which may drop the column family concurrently.
1654       // Therefore, we increase the cfd's ref count.
1655       if (flush_options.wait) {
1656         for (auto& elem : flush_req) {
1657           ColumnFamilyData* loop_cfd = elem.first;
1658           loop_cfd->Ref();
1659         }
1660       }
1661       SchedulePendingFlush(flush_req, flush_reason);
1662       MaybeScheduleFlushOrCompaction();
1663     }
1664 
1665     if (!writes_stopped) {
1666       write_thread_.ExitUnbatched(&w);
1667       if (two_write_queues_) {
1668         nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1669       }
1670     }
1671   }
1672   TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
1673   TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
1674   if (s.ok() && flush_options.wait) {
1675     autovector<ColumnFamilyData*> cfds;
1676     autovector<const uint64_t*> flush_memtable_ids;
1677     for (auto& iter : flush_req) {
1678       cfds.push_back(iter.first);
1679       flush_memtable_ids.push_back(&(iter.second));
1680     }
1681     s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1682                               (flush_reason == FlushReason::kErrorRecovery));
1683     InstrumentedMutexLock lock_guard(&mutex_);
1684     for (auto* tmp_cfd : cfds) {
1685       tmp_cfd->UnrefAndTryDelete();
1686     }
1687   }
1688   TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
1689   return s;
1690 }
1691 
1692 // Flush all elements in 'column_family_datas'
1693 // and atomically record the result to the MANIFEST.
AtomicFlushMemTables(const autovector<ColumnFamilyData * > & column_family_datas,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1694 Status DBImpl::AtomicFlushMemTables(
1695     const autovector<ColumnFamilyData*>& column_family_datas,
1696     const FlushOptions& flush_options, FlushReason flush_reason,
1697     bool writes_stopped) {
1698   Status s;
1699   if (!flush_options.allow_write_stall) {
1700     int num_cfs_to_flush = 0;
1701     for (auto cfd : column_family_datas) {
1702       bool flush_needed = true;
1703       s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1704       if (!s.ok()) {
1705         return s;
1706       } else if (flush_needed) {
1707         ++num_cfs_to_flush;
1708       }
1709     }
1710     if (0 == num_cfs_to_flush) {
1711       return s;
1712     }
1713   }
1714   FlushRequest flush_req;
1715   autovector<ColumnFamilyData*> cfds;
1716   {
1717     WriteContext context;
1718     InstrumentedMutexLock guard_lock(&mutex_);
1719 
1720     WriteThread::Writer w;
1721     WriteThread::Writer nonmem_w;
1722     if (!writes_stopped) {
1723       write_thread_.EnterUnbatched(&w, &mutex_);
1724       if (two_write_queues_) {
1725         nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1726       }
1727     }
1728     WaitForPendingWrites();
1729 
1730     for (auto cfd : column_family_datas) {
1731       if (cfd->IsDropped()) {
1732         continue;
1733       }
1734       if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1735           !cached_recoverable_state_empty_.load()) {
1736         cfds.emplace_back(cfd);
1737       }
1738     }
1739     for (auto cfd : cfds) {
1740       if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) {
1741         continue;
1742       }
1743       cfd->Ref();
1744       s = SwitchMemtable(cfd, &context);
1745       cfd->UnrefAndTryDelete();
1746       if (!s.ok()) {
1747         break;
1748       }
1749     }
1750     if (s.ok()) {
1751       AssignAtomicFlushSeq(cfds);
1752       for (auto cfd : cfds) {
1753         cfd->imm()->FlushRequested();
1754       }
1755       // If the caller wants to wait for this flush to complete, it indicates
1756       // that the caller expects the ColumnFamilyData not to be free'ed by
1757       // other threads which may drop the column family concurrently.
1758       // Therefore, we increase the cfd's ref count.
1759       if (flush_options.wait) {
1760         for (auto cfd : cfds) {
1761           cfd->Ref();
1762         }
1763       }
1764       GenerateFlushRequest(cfds, &flush_req);
1765       SchedulePendingFlush(flush_req, flush_reason);
1766       MaybeScheduleFlushOrCompaction();
1767     }
1768 
1769     if (!writes_stopped) {
1770       write_thread_.ExitUnbatched(&w);
1771       if (two_write_queues_) {
1772         nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1773       }
1774     }
1775   }
1776   TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
1777   TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
1778   if (s.ok() && flush_options.wait) {
1779     autovector<const uint64_t*> flush_memtable_ids;
1780     for (auto& iter : flush_req) {
1781       flush_memtable_ids.push_back(&(iter.second));
1782     }
1783     s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1784                               (flush_reason == FlushReason::kErrorRecovery));
1785     InstrumentedMutexLock lock_guard(&mutex_);
1786     for (auto* cfd : cfds) {
1787       cfd->UnrefAndTryDelete();
1788     }
1789   }
1790   return s;
1791 }
1792 
1793 // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
1794 // cause write stall, for example if one memtable is being flushed already.
1795 // This method tries to avoid write stall (similar to CompactRange() behavior)
1796 // it emulates how the SuperVersion / LSM would change if flush happens, checks
1797 // it against various constrains and delays flush if it'd cause write stall.
1798 // Called should check status and flush_needed to see if flush already happened.
WaitUntilFlushWouldNotStallWrites(ColumnFamilyData * cfd,bool * flush_needed)1799 Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
1800                                                  bool* flush_needed) {
1801   {
1802     *flush_needed = true;
1803     InstrumentedMutexLock l(&mutex_);
1804     uint64_t orig_active_memtable_id = cfd->mem()->GetID();
1805     WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
1806     do {
1807       if (write_stall_condition != WriteStallCondition::kNormal) {
1808         // Same error handling as user writes: Don't wait if there's a
1809         // background error, even if it's a soft error. We might wait here
1810         // indefinitely as the pending flushes/compactions may never finish
1811         // successfully, resulting in the stall condition lasting indefinitely
1812         if (error_handler_.IsBGWorkStopped()) {
1813           return error_handler_.GetBGError();
1814         }
1815 
1816         TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
1817         ROCKS_LOG_INFO(immutable_db_options_.info_log,
1818                        "[%s] WaitUntilFlushWouldNotStallWrites"
1819                        " waiting on stall conditions to clear",
1820                        cfd->GetName().c_str());
1821         bg_cv_.Wait();
1822       }
1823       if (cfd->IsDropped()) {
1824         return Status::ColumnFamilyDropped();
1825       }
1826       if (shutting_down_.load(std::memory_order_acquire)) {
1827         return Status::ShutdownInProgress();
1828       }
1829 
1830       uint64_t earliest_memtable_id =
1831           std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
1832       if (earliest_memtable_id > orig_active_memtable_id) {
1833         // We waited so long that the memtable we were originally waiting on was
1834         // flushed.
1835         *flush_needed = false;
1836         return Status::OK();
1837       }
1838 
1839       const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1840       const auto* vstorage = cfd->current()->storage_info();
1841 
1842       // Skip stalling check if we're below auto-flush and auto-compaction
1843       // triggers. If it stalled in these conditions, that'd mean the stall
1844       // triggers are so low that stalling is needed for any background work. In
1845       // that case we shouldn't wait since background work won't be scheduled.
1846       if (cfd->imm()->NumNotFlushed() <
1847               cfd->ioptions()->min_write_buffer_number_to_merge &&
1848           vstorage->l0_delay_trigger_count() <
1849               mutable_cf_options.level0_file_num_compaction_trigger) {
1850         break;
1851       }
1852 
1853       // check whether one extra immutable memtable or an extra L0 file would
1854       // cause write stalling mode to be entered. It could still enter stall
1855       // mode due to pending compaction bytes, but that's less common
1856       write_stall_condition =
1857           ColumnFamilyData::GetWriteStallConditionAndCause(
1858               cfd->imm()->NumNotFlushed() + 1,
1859               vstorage->l0_delay_trigger_count() + 1,
1860               vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
1861               .first;
1862     } while (write_stall_condition != WriteStallCondition::kNormal);
1863   }
1864   return Status::OK();
1865 }
1866 
1867 // Wait for memtables to be flushed for multiple column families.
1868 // let N = cfds.size()
1869 // for i in [0, N),
1870 //  1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
1871 //     have to be flushed for THIS column family;
1872 //  2) if flush_memtable_ids[i] is null, then all memtables in THIS column
1873 //     family have to be flushed.
1874 // Finish waiting when ALL column families finish flushing memtables.
1875 // resuming_from_bg_err indicates whether the caller is trying to resume from
1876 // background error or in normal processing.
WaitForFlushMemTables(const autovector<ColumnFamilyData * > & cfds,const autovector<const uint64_t * > & flush_memtable_ids,bool resuming_from_bg_err)1877 Status DBImpl::WaitForFlushMemTables(
1878     const autovector<ColumnFamilyData*>& cfds,
1879     const autovector<const uint64_t*>& flush_memtable_ids,
1880     bool resuming_from_bg_err) {
1881   int num = static_cast<int>(cfds.size());
1882   // Wait until the compaction completes
1883   InstrumentedMutexLock l(&mutex_);
1884   // If the caller is trying to resume from bg error, then
1885   // error_handler_.IsDBStopped() is true.
1886   while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
1887     if (shutting_down_.load(std::memory_order_acquire)) {
1888       return Status::ShutdownInProgress();
1889     }
1890     // If an error has occurred during resumption, then no need to wait.
1891     if (!error_handler_.GetRecoveryError().ok()) {
1892       break;
1893     }
1894     // Number of column families that have been dropped.
1895     int num_dropped = 0;
1896     // Number of column families that have finished flush.
1897     int num_finished = 0;
1898     for (int i = 0; i < num; ++i) {
1899       if (cfds[i]->IsDropped()) {
1900         ++num_dropped;
1901       } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
1902                  (flush_memtable_ids[i] != nullptr &&
1903                   cfds[i]->imm()->GetEarliestMemTableID() >
1904                       *flush_memtable_ids[i])) {
1905         ++num_finished;
1906       }
1907     }
1908     if (1 == num_dropped && 1 == num) {
1909       return Status::InvalidArgument("Cannot flush a dropped CF");
1910     }
1911     // Column families involved in this flush request have either been dropped
1912     // or finished flush. Then it's time to finish waiting.
1913     if (num_dropped + num_finished == num) {
1914       break;
1915     }
1916     bg_cv_.Wait();
1917   }
1918   Status s;
1919   // If not resuming from bg error, and an error has caused the DB to stop,
1920   // then report the bg error to caller.
1921   if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
1922     s = error_handler_.GetBGError();
1923   }
1924   return s;
1925 }
1926 
EnableAutoCompaction(const std::vector<ColumnFamilyHandle * > & column_family_handles)1927 Status DBImpl::EnableAutoCompaction(
1928     const std::vector<ColumnFamilyHandle*>& column_family_handles) {
1929   Status s;
1930   for (auto cf_ptr : column_family_handles) {
1931     Status status =
1932         this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
1933     if (!status.ok()) {
1934       s = status;
1935     }
1936   }
1937 
1938   return s;
1939 }
1940 
DisableManualCompaction()1941 void DBImpl::DisableManualCompaction() {
1942   manual_compaction_paused_.store(true, std::memory_order_release);
1943 }
1944 
EnableManualCompaction()1945 void DBImpl::EnableManualCompaction() {
1946   manual_compaction_paused_.store(false, std::memory_order_release);
1947 }
1948 
MaybeScheduleFlushOrCompaction()1949 void DBImpl::MaybeScheduleFlushOrCompaction() {
1950   mutex_.AssertHeld();
1951   if (!opened_successfully_) {
1952     // Compaction may introduce data race to DB open
1953     return;
1954   }
1955   if (bg_work_paused_ > 0) {
1956     // we paused the background work
1957     return;
1958   } else if (error_handler_.IsBGWorkStopped() &&
1959              !error_handler_.IsRecoveryInProgress()) {
1960     // There has been a hard error and this call is not part of the recovery
1961     // sequence. Bail out here so we don't get into an endless loop of
1962     // scheduling BG work which will again call this function
1963     return;
1964   } else if (shutting_down_.load(std::memory_order_acquire)) {
1965     // DB is being deleted; no more background compactions
1966     return;
1967   }
1968   auto bg_job_limits = GetBGJobLimits();
1969   bool is_flush_pool_empty =
1970       env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
1971   while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
1972          bg_flush_scheduled_ < bg_job_limits.max_flushes) {
1973     bg_flush_scheduled_++;
1974     FlushThreadArg* fta = new FlushThreadArg;
1975     fta->db_ = this;
1976     fta->thread_pri_ = Env::Priority::HIGH;
1977     env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
1978                    &DBImpl::UnscheduleFlushCallback);
1979     --unscheduled_flushes_;
1980     TEST_SYNC_POINT_CALLBACK(
1981         "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
1982         &unscheduled_flushes_);
1983   }
1984 
1985   // special case -- if high-pri (flush) thread pool is empty, then schedule
1986   // flushes in low-pri (compaction) thread pool.
1987   if (is_flush_pool_empty) {
1988     while (unscheduled_flushes_ > 0 &&
1989            bg_flush_scheduled_ + bg_compaction_scheduled_ <
1990                bg_job_limits.max_flushes) {
1991       bg_flush_scheduled_++;
1992       FlushThreadArg* fta = new FlushThreadArg;
1993       fta->db_ = this;
1994       fta->thread_pri_ = Env::Priority::LOW;
1995       env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
1996                      &DBImpl::UnscheduleFlushCallback);
1997       --unscheduled_flushes_;
1998     }
1999   }
2000 
2001   if (bg_compaction_paused_ > 0) {
2002     // we paused the background compaction
2003     return;
2004   } else if (error_handler_.IsBGWorkStopped()) {
2005     // Compaction is not part of the recovery sequence from a hard error. We
2006     // might get here because recovery might do a flush and install a new
2007     // super version, which will try to schedule pending compactions. Bail
2008     // out here and let the higher level recovery handle compactions
2009     return;
2010   }
2011 
2012   if (HasExclusiveManualCompaction()) {
2013     // only manual compactions are allowed to run. don't schedule automatic
2014     // compactions
2015     TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
2016     return;
2017   }
2018 
2019   while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
2020          unscheduled_compactions_ > 0) {
2021     CompactionArg* ca = new CompactionArg;
2022     ca->db = this;
2023     ca->prepicked_compaction = nullptr;
2024     bg_compaction_scheduled_++;
2025     unscheduled_compactions_--;
2026     env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
2027                    &DBImpl::UnscheduleCompactionCallback);
2028   }
2029 }
2030 
GetBGJobLimits() const2031 DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
2032   mutex_.AssertHeld();
2033   return GetBGJobLimits(immutable_db_options_.max_background_flushes,
2034                         mutable_db_options_.max_background_compactions,
2035                         mutable_db_options_.max_background_jobs,
2036                         write_controller_.NeedSpeedupCompaction());
2037 }
2038 
GetBGJobLimits(int max_background_flushes,int max_background_compactions,int max_background_jobs,bool parallelize_compactions)2039 DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
2040                                            int max_background_compactions,
2041                                            int max_background_jobs,
2042                                            bool parallelize_compactions) {
2043   BGJobLimits res;
2044   if (max_background_flushes == -1 && max_background_compactions == -1) {
2045     // for our first stab implementing max_background_jobs, simply allocate a
2046     // quarter of the threads to flushes.
2047     res.max_flushes = std::max(1, max_background_jobs / 4);
2048     res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
2049   } else {
2050     // compatibility code in case users haven't migrated to max_background_jobs,
2051     // which automatically computes flush/compaction limits
2052     res.max_flushes = std::max(1, max_background_flushes);
2053     res.max_compactions = std::max(1, max_background_compactions);
2054   }
2055   if (!parallelize_compactions) {
2056     // throttle background compactions until we deem necessary
2057     res.max_compactions = 1;
2058   }
2059   return res;
2060 }
2061 
AddToCompactionQueue(ColumnFamilyData * cfd)2062 void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
2063   assert(!cfd->queued_for_compaction());
2064   cfd->Ref();
2065   compaction_queue_.push_back(cfd);
2066   cfd->set_queued_for_compaction(true);
2067 }
2068 
PopFirstFromCompactionQueue()2069 ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
2070   assert(!compaction_queue_.empty());
2071   auto cfd = *compaction_queue_.begin();
2072   compaction_queue_.pop_front();
2073   assert(cfd->queued_for_compaction());
2074   cfd->set_queued_for_compaction(false);
2075   return cfd;
2076 }
2077 
PopFirstFromFlushQueue()2078 DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
2079   assert(!flush_queue_.empty());
2080   FlushRequest flush_req = flush_queue_.front();
2081   flush_queue_.pop_front();
2082   // TODO: need to unset flush reason?
2083   return flush_req;
2084 }
2085 
PickCompactionFromQueue(std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)2086 ColumnFamilyData* DBImpl::PickCompactionFromQueue(
2087     std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
2088   assert(!compaction_queue_.empty());
2089   assert(*token == nullptr);
2090   autovector<ColumnFamilyData*> throttled_candidates;
2091   ColumnFamilyData* cfd = nullptr;
2092   while (!compaction_queue_.empty()) {
2093     auto first_cfd = *compaction_queue_.begin();
2094     compaction_queue_.pop_front();
2095     assert(first_cfd->queued_for_compaction());
2096     if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
2097       throttled_candidates.push_back(first_cfd);
2098       continue;
2099     }
2100     cfd = first_cfd;
2101     cfd->set_queued_for_compaction(false);
2102     break;
2103   }
2104   // Add throttled compaction candidates back to queue in the original order.
2105   for (auto iter = throttled_candidates.rbegin();
2106        iter != throttled_candidates.rend(); ++iter) {
2107     compaction_queue_.push_front(*iter);
2108   }
2109   return cfd;
2110 }
2111 
SchedulePendingFlush(const FlushRequest & flush_req,FlushReason flush_reason)2112 void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
2113                                   FlushReason flush_reason) {
2114   if (flush_req.empty()) {
2115     return;
2116   }
2117   for (auto& iter : flush_req) {
2118     ColumnFamilyData* cfd = iter.first;
2119     cfd->Ref();
2120     cfd->SetFlushReason(flush_reason);
2121   }
2122   ++unscheduled_flushes_;
2123   flush_queue_.push_back(flush_req);
2124 }
2125 
SchedulePendingCompaction(ColumnFamilyData * cfd)2126 void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
2127   if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
2128     AddToCompactionQueue(cfd);
2129     ++unscheduled_compactions_;
2130   }
2131 }
2132 
SchedulePendingPurge(std::string fname,std::string dir_to_sync,FileType type,uint64_t number,int job_id)2133 void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
2134                                   FileType type, uint64_t number, int job_id) {
2135   mutex_.AssertHeld();
2136   PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
2137   purge_files_.insert({{number, std::move(file_info)}});
2138 }
2139 
BGWorkFlush(void * arg)2140 void DBImpl::BGWorkFlush(void* arg) {
2141   FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
2142   delete reinterpret_cast<FlushThreadArg*>(arg);
2143 
2144   IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
2145   TEST_SYNC_POINT("DBImpl::BGWorkFlush");
2146   static_cast_with_check<DBImpl, DB>(fta.db_)->BackgroundCallFlush(
2147       fta.thread_pri_);
2148   TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
2149 }
2150 
BGWorkCompaction(void * arg)2151 void DBImpl::BGWorkCompaction(void* arg) {
2152   CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2153   delete reinterpret_cast<CompactionArg*>(arg);
2154   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
2155   TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
2156   auto prepicked_compaction =
2157       static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
2158   static_cast_with_check<DBImpl, DB>(ca.db)->BackgroundCallCompaction(
2159       prepicked_compaction, Env::Priority::LOW);
2160   delete prepicked_compaction;
2161 }
2162 
BGWorkBottomCompaction(void * arg)2163 void DBImpl::BGWorkBottomCompaction(void* arg) {
2164   CompactionArg ca = *(static_cast<CompactionArg*>(arg));
2165   delete static_cast<CompactionArg*>(arg);
2166   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
2167   TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
2168   auto* prepicked_compaction = ca.prepicked_compaction;
2169   assert(prepicked_compaction && prepicked_compaction->compaction &&
2170          !prepicked_compaction->manual_compaction_state);
2171   ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
2172   delete prepicked_compaction;
2173 }
2174 
BGWorkPurge(void * db)2175 void DBImpl::BGWorkPurge(void* db) {
2176   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
2177   TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
2178   reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
2179   TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
2180 }
2181 
UnscheduleCompactionCallback(void * arg)2182 void DBImpl::UnscheduleCompactionCallback(void* arg) {
2183   CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2184   delete reinterpret_cast<CompactionArg*>(arg);
2185   if (ca.prepicked_compaction != nullptr) {
2186     if (ca.prepicked_compaction->compaction != nullptr) {
2187       delete ca.prepicked_compaction->compaction;
2188     }
2189     delete ca.prepicked_compaction;
2190   }
2191   TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
2192 }
2193 
UnscheduleFlushCallback(void * arg)2194 void DBImpl::UnscheduleFlushCallback(void* arg) {
2195   delete reinterpret_cast<FlushThreadArg*>(arg);
2196   TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
2197 }
2198 
BackgroundFlush(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,FlushReason * reason,Env::Priority thread_pri)2199 Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
2200                                LogBuffer* log_buffer, FlushReason* reason,
2201                                Env::Priority thread_pri) {
2202   mutex_.AssertHeld();
2203 
2204   Status status;
2205   *reason = FlushReason::kOthers;
2206   // If BG work is stopped due to an error, but a recovery is in progress,
2207   // that means this flush is part of the recovery. So allow it to go through
2208   if (!error_handler_.IsBGWorkStopped()) {
2209     if (shutting_down_.load(std::memory_order_acquire)) {
2210       status = Status::ShutdownInProgress();
2211     }
2212   } else if (!error_handler_.IsRecoveryInProgress()) {
2213     status = error_handler_.GetBGError();
2214   }
2215 
2216   if (!status.ok()) {
2217     return status;
2218   }
2219 
2220   autovector<BGFlushArg> bg_flush_args;
2221   std::vector<SuperVersionContext>& superversion_contexts =
2222       job_context->superversion_contexts;
2223   autovector<ColumnFamilyData*> column_families_not_to_flush;
2224   while (!flush_queue_.empty()) {
2225     // This cfd is already referenced
2226     const FlushRequest& flush_req = PopFirstFromFlushQueue();
2227     superversion_contexts.clear();
2228     superversion_contexts.reserve(flush_req.size());
2229 
2230     for (const auto& iter : flush_req) {
2231       ColumnFamilyData* cfd = iter.first;
2232       if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
2233         // can't flush this CF, try next one
2234         column_families_not_to_flush.push_back(cfd);
2235         continue;
2236       }
2237       superversion_contexts.emplace_back(SuperVersionContext(true));
2238       bg_flush_args.emplace_back(cfd, iter.second,
2239                                  &(superversion_contexts.back()));
2240     }
2241     if (!bg_flush_args.empty()) {
2242       break;
2243     }
2244   }
2245 
2246   if (!bg_flush_args.empty()) {
2247     auto bg_job_limits = GetBGJobLimits();
2248     for (const auto& arg : bg_flush_args) {
2249       ColumnFamilyData* cfd = arg.cfd_;
2250       ROCKS_LOG_BUFFER(
2251           log_buffer,
2252           "Calling FlushMemTableToOutputFile with column "
2253           "family [%s], flush slots available %d, compaction slots available "
2254           "%d, "
2255           "flush slots scheduled %d, compaction slots scheduled %d",
2256           cfd->GetName().c_str(), bg_job_limits.max_flushes,
2257           bg_job_limits.max_compactions, bg_flush_scheduled_,
2258           bg_compaction_scheduled_);
2259     }
2260     status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
2261                                          job_context, log_buffer, thread_pri);
2262     TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
2263     // All the CFDs in the FlushReq must have the same flush reason, so just
2264     // grab the first one
2265     *reason = bg_flush_args[0].cfd_->GetFlushReason();
2266     for (auto& arg : bg_flush_args) {
2267       ColumnFamilyData* cfd = arg.cfd_;
2268       if (cfd->UnrefAndTryDelete()) {
2269         arg.cfd_ = nullptr;
2270       }
2271     }
2272   }
2273   for (auto cfd : column_families_not_to_flush) {
2274     cfd->UnrefAndTryDelete();
2275   }
2276   return status;
2277 }
2278 
BackgroundCallFlush(Env::Priority thread_pri)2279 void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
2280   bool made_progress = false;
2281   JobContext job_context(next_job_id_.fetch_add(1), true);
2282 
2283   TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
2284 
2285   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2286                        immutable_db_options_.info_log.get());
2287   {
2288     InstrumentedMutexLock l(&mutex_);
2289     assert(bg_flush_scheduled_);
2290     num_running_flushes_++;
2291 
2292     std::unique_ptr<std::list<uint64_t>::iterator>
2293         pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2294             CaptureCurrentFileNumberInPendingOutputs()));
2295     FlushReason reason;
2296 
2297     Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
2298                                &reason, thread_pri);
2299     if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
2300         reason != FlushReason::kErrorRecovery) {
2301       // Wait a little bit before retrying background flush in
2302       // case this is an environmental problem and we do not want to
2303       // chew up resources for failed flushes for the duration of
2304       // the problem.
2305       uint64_t error_cnt =
2306           default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2307       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
2308       mutex_.Unlock();
2309       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2310                       "Waiting after background flush error: %s"
2311                       "Accumulated background error counts: %" PRIu64,
2312                       s.ToString().c_str(), error_cnt);
2313       log_buffer.FlushBufferToLog();
2314       LogFlush(immutable_db_options_.info_log);
2315       env_->SleepForMicroseconds(1000000);
2316       mutex_.Lock();
2317     }
2318 
2319     TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
2320     ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2321 
2322     // If flush failed, we want to delete all temporary files that we might have
2323     // created. Thus, we force full scan in FindObsoleteFiles()
2324     FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2325                                         !s.IsColumnFamilyDropped());
2326     // delete unnecessary files if any, this is done outside the mutex
2327     if (job_context.HaveSomethingToClean() ||
2328         job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2329       mutex_.Unlock();
2330       TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
2331       // Have to flush the info logs before bg_flush_scheduled_--
2332       // because if bg_flush_scheduled_ becomes 0 and the lock is
2333       // released, the deconstructor of DB can kick in and destroy all the
2334       // states of DB so info_log might not be available after that point.
2335       // It also applies to access other states that DB owns.
2336       log_buffer.FlushBufferToLog();
2337       if (job_context.HaveSomethingToDelete()) {
2338         PurgeObsoleteFiles(job_context);
2339       }
2340       job_context.Clean();
2341       mutex_.Lock();
2342     }
2343     TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
2344 
2345     assert(num_running_flushes_ > 0);
2346     num_running_flushes_--;
2347     bg_flush_scheduled_--;
2348     // See if there's more work to be done
2349     MaybeScheduleFlushOrCompaction();
2350     atomic_flush_install_cv_.SignalAll();
2351     bg_cv_.SignalAll();
2352     // IMPORTANT: there should be no code after calling SignalAll. This call may
2353     // signal the DB destructor that it's OK to proceed with destruction. In
2354     // that case, all DB variables will be dealloacated and referencing them
2355     // will cause trouble.
2356   }
2357 }
2358 
BackgroundCallCompaction(PrepickedCompaction * prepicked_compaction,Env::Priority bg_thread_pri)2359 void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2360                                       Env::Priority bg_thread_pri) {
2361   bool made_progress = false;
2362   JobContext job_context(next_job_id_.fetch_add(1), true);
2363   TEST_SYNC_POINT("BackgroundCallCompaction:0");
2364   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2365                        immutable_db_options_.info_log.get());
2366   {
2367     InstrumentedMutexLock l(&mutex_);
2368 
2369     // This call will unlock/lock the mutex to wait for current running
2370     // IngestExternalFile() calls to finish.
2371     WaitForIngestFile();
2372 
2373     num_running_compactions_++;
2374 
2375     std::unique_ptr<std::list<uint64_t>::iterator>
2376         pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2377             CaptureCurrentFileNumberInPendingOutputs()));
2378 
2379     assert((bg_thread_pri == Env::Priority::BOTTOM &&
2380             bg_bottom_compaction_scheduled_) ||
2381            (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
2382     Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
2383                                     prepicked_compaction, bg_thread_pri);
2384     TEST_SYNC_POINT("BackgroundCallCompaction:1");
2385     if (s.IsBusy()) {
2386       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
2387       mutex_.Unlock();
2388       env_->SleepForMicroseconds(10000);  // prevent hot loop
2389       mutex_.Lock();
2390     } else if (!s.ok() && !s.IsShutdownInProgress() &&
2391                !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
2392       // Wait a little bit before retrying background compaction in
2393       // case this is an environmental problem and we do not want to
2394       // chew up resources for failed compactions for the duration of
2395       // the problem.
2396       uint64_t error_cnt =
2397           default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2398       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
2399       mutex_.Unlock();
2400       log_buffer.FlushBufferToLog();
2401       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2402                       "Waiting after background compaction error: %s, "
2403                       "Accumulated background error counts: %" PRIu64,
2404                       s.ToString().c_str(), error_cnt);
2405       LogFlush(immutable_db_options_.info_log);
2406       env_->SleepForMicroseconds(1000000);
2407       mutex_.Lock();
2408     } else if (s.IsManualCompactionPaused()) {
2409       ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
2410       assert(m);
2411       ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
2412                        m->cfd->GetName().c_str(), job_context.job_id);
2413     }
2414 
2415     ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2416 
2417     // If compaction failed, we want to delete all temporary files that we might
2418     // have created (they might not be all recorded in job_context in case of a
2419     // failure). Thus, we force full scan in FindObsoleteFiles()
2420     FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2421                                         !s.IsManualCompactionPaused() &&
2422                                         !s.IsColumnFamilyDropped());
2423     TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
2424 
2425     // delete unnecessary files if any, this is done outside the mutex
2426     if (job_context.HaveSomethingToClean() ||
2427         job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2428       mutex_.Unlock();
2429       // Have to flush the info logs before bg_compaction_scheduled_--
2430       // because if bg_flush_scheduled_ becomes 0 and the lock is
2431       // released, the deconstructor of DB can kick in and destroy all the
2432       // states of DB so info_log might not be available after that point.
2433       // It also applies to access other states that DB owns.
2434       log_buffer.FlushBufferToLog();
2435       if (job_context.HaveSomethingToDelete()) {
2436         PurgeObsoleteFiles(job_context);
2437         TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
2438       }
2439       job_context.Clean();
2440       mutex_.Lock();
2441     }
2442 
2443     assert(num_running_compactions_ > 0);
2444     num_running_compactions_--;
2445     if (bg_thread_pri == Env::Priority::LOW) {
2446       bg_compaction_scheduled_--;
2447     } else {
2448       assert(bg_thread_pri == Env::Priority::BOTTOM);
2449       bg_bottom_compaction_scheduled_--;
2450     }
2451 
2452     versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
2453 
2454     // See if there's more work to be done
2455     MaybeScheduleFlushOrCompaction();
2456     if (made_progress ||
2457         (bg_compaction_scheduled_ == 0 &&
2458          bg_bottom_compaction_scheduled_ == 0) ||
2459         HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
2460       // signal if
2461       // * made_progress -- need to wakeup DelayWrite
2462       // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
2463       // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
2464       // If none of this is true, there is no need to signal since nobody is
2465       // waiting for it
2466       bg_cv_.SignalAll();
2467     }
2468     // IMPORTANT: there should be no code after calling SignalAll. This call may
2469     // signal the DB destructor that it's OK to proceed with destruction. In
2470     // that case, all DB variables will be dealloacated and referencing them
2471     // will cause trouble.
2472   }
2473 }
2474 
BackgroundCompaction(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,PrepickedCompaction * prepicked_compaction,Env::Priority thread_pri)2475 Status DBImpl::BackgroundCompaction(bool* made_progress,
2476                                     JobContext* job_context,
2477                                     LogBuffer* log_buffer,
2478                                     PrepickedCompaction* prepicked_compaction,
2479                                     Env::Priority thread_pri) {
2480   ManualCompactionState* manual_compaction =
2481       prepicked_compaction == nullptr
2482           ? nullptr
2483           : prepicked_compaction->manual_compaction_state;
2484   *made_progress = false;
2485   mutex_.AssertHeld();
2486   TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
2487 
2488   bool is_manual = (manual_compaction != nullptr);
2489   std::unique_ptr<Compaction> c;
2490   if (prepicked_compaction != nullptr &&
2491       prepicked_compaction->compaction != nullptr) {
2492     c.reset(prepicked_compaction->compaction);
2493   }
2494   bool is_prepicked = is_manual || c;
2495 
2496   // (manual_compaction->in_progress == false);
2497   bool trivial_move_disallowed =
2498       is_manual && manual_compaction->disallow_trivial_move;
2499 
2500   CompactionJobStats compaction_job_stats;
2501   Status status;
2502   if (!error_handler_.IsBGWorkStopped()) {
2503     if (shutting_down_.load(std::memory_order_acquire)) {
2504       status = Status::ShutdownInProgress();
2505     } else if (is_manual &&
2506                manual_compaction_paused_.load(std::memory_order_acquire)) {
2507       status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
2508     }
2509   } else {
2510     status = error_handler_.GetBGError();
2511     // If we get here, it means a hard error happened after this compaction
2512     // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
2513     // a chance to execute. Since we didn't pop a cfd from the compaction
2514     // queue, increment unscheduled_compactions_
2515     unscheduled_compactions_++;
2516   }
2517 
2518   if (!status.ok()) {
2519     if (is_manual) {
2520       manual_compaction->status = status;
2521       manual_compaction->done = true;
2522       manual_compaction->in_progress = false;
2523       manual_compaction = nullptr;
2524     }
2525     if (c) {
2526       c->ReleaseCompactionFiles(status);
2527       c.reset();
2528     }
2529     return status;
2530   }
2531 
2532   if (is_manual) {
2533     // another thread cannot pick up the same work
2534     manual_compaction->in_progress = true;
2535   }
2536 
2537   std::unique_ptr<TaskLimiterToken> task_token;
2538 
2539   // InternalKey manual_end_storage;
2540   // InternalKey* manual_end = &manual_end_storage;
2541   bool sfm_reserved_compact_space = false;
2542   if (is_manual) {
2543     ManualCompactionState* m = manual_compaction;
2544     assert(m->in_progress);
2545     if (!c) {
2546       m->done = true;
2547       m->manual_end = nullptr;
2548       ROCKS_LOG_BUFFER(log_buffer,
2549                        "[%s] Manual compaction from level-%d from %s .. "
2550                        "%s; nothing to do\n",
2551                        m->cfd->GetName().c_str(), m->input_level,
2552                        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2553                        (m->end ? m->end->DebugString().c_str() : "(end)"));
2554     } else {
2555       // First check if we have enough room to do the compaction
2556       bool enough_room = EnoughRoomForCompaction(
2557           m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2558 
2559       if (!enough_room) {
2560         // Then don't do the compaction
2561         c->ReleaseCompactionFiles(status);
2562         c.reset();
2563         // m's vars will get set properly at the end of this function,
2564         // as long as status == CompactionTooLarge
2565         status = Status::CompactionTooLarge();
2566       } else {
2567         ROCKS_LOG_BUFFER(
2568             log_buffer,
2569             "[%s] Manual compaction from level-%d to level-%d from %s .. "
2570             "%s; will stop at %s\n",
2571             m->cfd->GetName().c_str(), m->input_level, c->output_level(),
2572             (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2573             (m->end ? m->end->DebugString().c_str() : "(end)"),
2574             ((m->done || m->manual_end == nullptr)
2575                  ? "(end)"
2576                  : m->manual_end->DebugString().c_str()));
2577       }
2578     }
2579   } else if (!is_prepicked && !compaction_queue_.empty()) {
2580     if (HasExclusiveManualCompaction()) {
2581       // Can't compact right now, but try again later
2582       TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
2583 
2584       // Stay in the compaction queue.
2585       unscheduled_compactions_++;
2586 
2587       return Status::OK();
2588     }
2589 
2590     auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
2591     if (cfd == nullptr) {
2592       // Can't find any executable task from the compaction queue.
2593       // All tasks have been throttled by compaction thread limiter.
2594       ++unscheduled_compactions_;
2595       return Status::Busy();
2596     }
2597 
2598     // We unreference here because the following code will take a Ref() on
2599     // this cfd if it is going to use it (Compaction class holds a
2600     // reference).
2601     // This will all happen under a mutex so we don't have to be afraid of
2602     // somebody else deleting it.
2603     if (cfd->UnrefAndTryDelete()) {
2604       // This was the last reference of the column family, so no need to
2605       // compact.
2606       return Status::OK();
2607     }
2608 
2609     // Pick up latest mutable CF Options and use it throughout the
2610     // compaction job
2611     // Compaction makes a copy of the latest MutableCFOptions. It should be used
2612     // throughout the compaction procedure to make sure consistency. It will
2613     // eventually be installed into SuperVersion
2614     auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2615     if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
2616       // NOTE: try to avoid unnecessary copy of MutableCFOptions if
2617       // compaction is not necessary. Need to make sure mutex is held
2618       // until we make a copy in the following code
2619       TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
2620       c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
2621       TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
2622 
2623       if (c != nullptr) {
2624         bool enough_room = EnoughRoomForCompaction(
2625             cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2626 
2627         if (!enough_room) {
2628           // Then don't do the compaction
2629           c->ReleaseCompactionFiles(status);
2630           c->column_family_data()
2631               ->current()
2632               ->storage_info()
2633               ->ComputeCompactionScore(*(c->immutable_cf_options()),
2634                                        *(c->mutable_cf_options()));
2635           AddToCompactionQueue(cfd);
2636           ++unscheduled_compactions_;
2637 
2638           c.reset();
2639           // Don't need to sleep here, because BackgroundCallCompaction
2640           // will sleep if !s.ok()
2641           status = Status::CompactionTooLarge();
2642         } else {
2643           // update statistics
2644           RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
2645                             c->inputs(0)->size());
2646           // There are three things that can change compaction score:
2647           // 1) When flush or compaction finish. This case is covered by
2648           // InstallSuperVersionAndScheduleWork
2649           // 2) When MutableCFOptions changes. This case is also covered by
2650           // InstallSuperVersionAndScheduleWork, because this is when the new
2651           // options take effect.
2652           // 3) When we Pick a new compaction, we "remove" those files being
2653           // compacted from the calculation, which then influences compaction
2654           // score. Here we check if we need the new compaction even without the
2655           // files that are currently being compacted. If we need another
2656           // compaction, we might be able to execute it in parallel, so we add
2657           // it to the queue and schedule a new thread.
2658           if (cfd->NeedsCompaction()) {
2659             // Yes, we need more compactions!
2660             AddToCompactionQueue(cfd);
2661             ++unscheduled_compactions_;
2662             MaybeScheduleFlushOrCompaction();
2663           }
2664         }
2665       }
2666     }
2667   }
2668 
2669   IOStatus io_s;
2670   if (!c) {
2671     // Nothing to do
2672     ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
2673   } else if (c->deletion_compaction()) {
2674     // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
2675     // file if there is alive snapshot pointing to it
2676     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2677                              c->column_family_data());
2678     assert(c->num_input_files(1) == 0);
2679     assert(c->level() == 0);
2680     assert(c->column_family_data()->ioptions()->compaction_style ==
2681            kCompactionStyleFIFO);
2682 
2683     compaction_job_stats.num_input_files = c->num_input_files(0);
2684 
2685     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2686                             compaction_job_stats, job_context->job_id);
2687 
2688     for (const auto& f : *c->inputs(0)) {
2689       c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
2690     }
2691     versions_->SetIOStatusOK();
2692     status = versions_->LogAndApply(c->column_family_data(),
2693                                     *c->mutable_cf_options(), c->edit(),
2694                                     &mutex_, directories_.GetDbDir());
2695     io_s = versions_->io_status();
2696     InstallSuperVersionAndScheduleWork(c->column_family_data(),
2697                                        &job_context->superversion_contexts[0],
2698                                        *c->mutable_cf_options());
2699     ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
2700                      c->column_family_data()->GetName().c_str(),
2701                      c->num_input_files(0));
2702     *made_progress = true;
2703     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2704                              c->column_family_data());
2705   } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
2706     TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
2707     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2708                              c->column_family_data());
2709     // Instrument for event update
2710     // TODO(yhchiang): add op details for showing trivial-move.
2711     ThreadStatusUtil::SetColumnFamily(
2712         c->column_family_data(), c->column_family_data()->ioptions()->env,
2713         immutable_db_options_.enable_thread_tracking);
2714     ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
2715 
2716     compaction_job_stats.num_input_files = c->num_input_files(0);
2717 
2718     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2719                             compaction_job_stats, job_context->job_id);
2720 
2721     // Move files to next level
2722     int32_t moved_files = 0;
2723     int64_t moved_bytes = 0;
2724     for (unsigned int l = 0; l < c->num_input_levels(); l++) {
2725       if (c->level(l) == c->output_level()) {
2726         continue;
2727       }
2728       for (size_t i = 0; i < c->num_input_files(l); i++) {
2729         FileMetaData* f = c->input(l, i);
2730         c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
2731         c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
2732                            f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
2733                            f->largest, f->fd.smallest_seqno,
2734                            f->fd.largest_seqno, f->marked_for_compaction,
2735                            f->oldest_blob_file_number, f->oldest_ancester_time,
2736                            f->file_creation_time, f->file_checksum,
2737                            f->file_checksum_func_name);
2738 
2739         ROCKS_LOG_BUFFER(
2740             log_buffer,
2741             "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
2742             c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
2743             c->output_level(), f->fd.GetFileSize());
2744         ++moved_files;
2745         moved_bytes += f->fd.GetFileSize();
2746       }
2747     }
2748 
2749     versions_->SetIOStatusOK();
2750     status = versions_->LogAndApply(c->column_family_data(),
2751                                     *c->mutable_cf_options(), c->edit(),
2752                                     &mutex_, directories_.GetDbDir());
2753     io_s = versions_->io_status();
2754     // Use latest MutableCFOptions
2755     InstallSuperVersionAndScheduleWork(c->column_family_data(),
2756                                        &job_context->superversion_contexts[0],
2757                                        *c->mutable_cf_options());
2758 
2759     VersionStorageInfo::LevelSummaryStorage tmp;
2760     c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
2761                                                              moved_bytes);
2762     {
2763       event_logger_.LogToBuffer(log_buffer)
2764           << "job" << job_context->job_id << "event"
2765           << "trivial_move"
2766           << "destination_level" << c->output_level() << "files" << moved_files
2767           << "total_files_size" << moved_bytes;
2768     }
2769     ROCKS_LOG_BUFFER(
2770         log_buffer,
2771         "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
2772         c->column_family_data()->GetName().c_str(), moved_files,
2773         c->output_level(), moved_bytes, status.ToString().c_str(),
2774         c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
2775     *made_progress = true;
2776 
2777     // Clear Instrument
2778     ThreadStatusUtil::ResetThreadStatus();
2779     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2780                              c->column_family_data());
2781   } else if (!is_prepicked && c->output_level() > 0 &&
2782              c->output_level() ==
2783                  c->column_family_data()
2784                      ->current()
2785                      ->storage_info()
2786                      ->MaxOutputLevel(
2787                          immutable_db_options_.allow_ingest_behind) &&
2788              env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
2789     // Forward compactions involving last level to the bottom pool if it exists,
2790     // such that compactions unlikely to contribute to write stalls can be
2791     // delayed or deprioritized.
2792     TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
2793     CompactionArg* ca = new CompactionArg;
2794     ca->db = this;
2795     ca->prepicked_compaction = new PrepickedCompaction;
2796     ca->prepicked_compaction->compaction = c.release();
2797     ca->prepicked_compaction->manual_compaction_state = nullptr;
2798     // Transfer requested token, so it doesn't need to do it again.
2799     ca->prepicked_compaction->task_token = std::move(task_token);
2800     ++bg_bottom_compaction_scheduled_;
2801     env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
2802                    this, &DBImpl::UnscheduleCompactionCallback);
2803   } else {
2804     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2805                              c->column_family_data());
2806     int output_level __attribute__((__unused__));
2807     output_level = c->output_level();
2808     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
2809                              &output_level);
2810     std::vector<SequenceNumber> snapshot_seqs;
2811     SequenceNumber earliest_write_conflict_snapshot;
2812     SnapshotChecker* snapshot_checker;
2813     GetSnapshotContext(job_context, &snapshot_seqs,
2814                        &earliest_write_conflict_snapshot, &snapshot_checker);
2815     assert(is_snapshot_supported_ || snapshots_.empty());
2816     CompactionJob compaction_job(
2817         job_context->job_id, c.get(), immutable_db_options_,
2818         file_options_for_compaction_, versions_.get(), &shutting_down_,
2819         preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
2820         GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
2821         &mutex_, &error_handler_, snapshot_seqs,
2822         earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
2823         &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
2824         c->mutable_cf_options()->report_bg_io_stats, dbname_,
2825         &compaction_job_stats, thread_pri,
2826         is_manual ? &manual_compaction_paused_ : nullptr);
2827     compaction_job.Prepare();
2828 
2829     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2830                             compaction_job_stats, job_context->job_id);
2831 
2832     mutex_.Unlock();
2833     TEST_SYNC_POINT_CALLBACK(
2834         "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
2835     compaction_job.Run();
2836     TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
2837     mutex_.Lock();
2838 
2839     status = compaction_job.Install(*c->mutable_cf_options());
2840     io_s = compaction_job.io_status();
2841     if (status.ok()) {
2842       InstallSuperVersionAndScheduleWork(c->column_family_data(),
2843                                          &job_context->superversion_contexts[0],
2844                                          *c->mutable_cf_options());
2845     }
2846     *made_progress = true;
2847     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2848                              c->column_family_data());
2849   }
2850 
2851   if (status.ok() && !io_s.ok()) {
2852     status = io_s;
2853   }
2854 
2855   if (c != nullptr) {
2856     c->ReleaseCompactionFiles(status);
2857     *made_progress = true;
2858 
2859 #ifndef ROCKSDB_LITE
2860     // Need to make sure SstFileManager does its bookkeeping
2861     auto sfm = static_cast<SstFileManagerImpl*>(
2862         immutable_db_options_.sst_file_manager.get());
2863     if (sfm && sfm_reserved_compact_space) {
2864       sfm->OnCompactionCompletion(c.get());
2865     }
2866 #endif  // ROCKSDB_LITE
2867 
2868     NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
2869                                 compaction_job_stats, job_context->job_id);
2870   }
2871 
2872   if (status.ok() || status.IsCompactionTooLarge() ||
2873       status.IsManualCompactionPaused()) {
2874     // Done
2875   } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
2876     // Ignore compaction errors found during shutting down
2877   } else {
2878     ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
2879                    status.ToString().c_str());
2880     if (!io_s.ok()) {
2881       error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
2882     } else {
2883       error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
2884     }
2885     if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
2886       // Put this cfd back in the compaction queue so we can retry after some
2887       // time
2888       auto cfd = c->column_family_data();
2889       assert(cfd != nullptr);
2890       // Since this compaction failed, we need to recompute the score so it
2891       // takes the original input files into account
2892       c->column_family_data()
2893           ->current()
2894           ->storage_info()
2895           ->ComputeCompactionScore(*(c->immutable_cf_options()),
2896                                    *(c->mutable_cf_options()));
2897       if (!cfd->queued_for_compaction()) {
2898         AddToCompactionQueue(cfd);
2899         ++unscheduled_compactions_;
2900       }
2901     }
2902   }
2903   // this will unref its input_version and column_family_data
2904   c.reset();
2905 
2906   if (is_manual) {
2907     ManualCompactionState* m = manual_compaction;
2908     if (!status.ok()) {
2909       m->status = status;
2910       m->done = true;
2911     }
2912     // For universal compaction:
2913     //   Because universal compaction always happens at level 0, so one
2914     //   compaction will pick up all overlapped files. No files will be
2915     //   filtered out due to size limit and left for a successive compaction.
2916     //   So we can safely conclude the current compaction.
2917     //
2918     //   Also note that, if we don't stop here, then the current compaction
2919     //   writes a new file back to level 0, which will be used in successive
2920     //   compaction. Hence the manual compaction will never finish.
2921     //
2922     // Stop the compaction if manual_end points to nullptr -- this means
2923     // that we compacted the whole range. manual_end should always point
2924     // to nullptr in case of universal compaction
2925     if (m->manual_end == nullptr) {
2926       m->done = true;
2927     }
2928     if (!m->done) {
2929       // We only compacted part of the requested range.  Update *m
2930       // to the range that is left to be compacted.
2931       // Universal and FIFO compactions should always compact the whole range
2932       assert(m->cfd->ioptions()->compaction_style !=
2933                  kCompactionStyleUniversal ||
2934              m->cfd->ioptions()->num_levels > 1);
2935       assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
2936       m->tmp_storage = *m->manual_end;
2937       m->begin = &m->tmp_storage;
2938       m->incomplete = true;
2939     }
2940     m->in_progress = false;  // not being processed anymore
2941   }
2942   TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
2943   return status;
2944 }
2945 
HasPendingManualCompaction()2946 bool DBImpl::HasPendingManualCompaction() {
2947   return (!manual_compaction_dequeue_.empty());
2948 }
2949 
AddManualCompaction(DBImpl::ManualCompactionState * m)2950 void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
2951   manual_compaction_dequeue_.push_back(m);
2952 }
2953 
RemoveManualCompaction(DBImpl::ManualCompactionState * m)2954 void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
2955   // Remove from queue
2956   std::deque<ManualCompactionState*>::iterator it =
2957       manual_compaction_dequeue_.begin();
2958   while (it != manual_compaction_dequeue_.end()) {
2959     if (m == (*it)) {
2960       it = manual_compaction_dequeue_.erase(it);
2961       return;
2962     }
2963     ++it;
2964   }
2965   assert(false);
2966   return;
2967 }
2968 
ShouldntRunManualCompaction(ManualCompactionState * m)2969 bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
2970   if (num_running_ingest_file_ > 0) {
2971     // We need to wait for other IngestExternalFile() calls to finish
2972     // before running a manual compaction.
2973     return true;
2974   }
2975   if (m->exclusive) {
2976     return (bg_bottom_compaction_scheduled_ > 0 ||
2977             bg_compaction_scheduled_ > 0);
2978   }
2979   std::deque<ManualCompactionState*>::iterator it =
2980       manual_compaction_dequeue_.begin();
2981   bool seen = false;
2982   while (it != manual_compaction_dequeue_.end()) {
2983     if (m == (*it)) {
2984       ++it;
2985       seen = true;
2986       continue;
2987     } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
2988       // Consider the other manual compaction *it, conflicts if:
2989       // overlaps with m
2990       // and (*it) is ahead in the queue and is not yet in progress
2991       return true;
2992     }
2993     ++it;
2994   }
2995   return false;
2996 }
2997 
HaveManualCompaction(ColumnFamilyData * cfd)2998 bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
2999   // Remove from priority queue
3000   std::deque<ManualCompactionState*>::iterator it =
3001       manual_compaction_dequeue_.begin();
3002   while (it != manual_compaction_dequeue_.end()) {
3003     if ((*it)->exclusive) {
3004       return true;
3005     }
3006     if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
3007       // Allow automatic compaction if manual compaction is
3008       // in progress
3009       return true;
3010     }
3011     ++it;
3012   }
3013   return false;
3014 }
3015 
HasExclusiveManualCompaction()3016 bool DBImpl::HasExclusiveManualCompaction() {
3017   // Remove from priority queue
3018   std::deque<ManualCompactionState*>::iterator it =
3019       manual_compaction_dequeue_.begin();
3020   while (it != manual_compaction_dequeue_.end()) {
3021     if ((*it)->exclusive) {
3022       return true;
3023     }
3024     ++it;
3025   }
3026   return false;
3027 }
3028 
MCOverlap(ManualCompactionState * m,ManualCompactionState * m1)3029 bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
3030   if ((m->exclusive) || (m1->exclusive)) {
3031     return true;
3032   }
3033   if (m->cfd != m1->cfd) {
3034     return false;
3035   }
3036   return true;
3037 }
3038 
3039 #ifndef ROCKSDB_LITE
BuildCompactionJobInfo(const ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id,const Version * current,CompactionJobInfo * compaction_job_info) const3040 void DBImpl::BuildCompactionJobInfo(
3041     const ColumnFamilyData* cfd, Compaction* c, const Status& st,
3042     const CompactionJobStats& compaction_job_stats, const int job_id,
3043     const Version* current, CompactionJobInfo* compaction_job_info) const {
3044   assert(compaction_job_info != nullptr);
3045   compaction_job_info->cf_id = cfd->GetID();
3046   compaction_job_info->cf_name = cfd->GetName();
3047   compaction_job_info->status = st;
3048   compaction_job_info->thread_id = env_->GetThreadID();
3049   compaction_job_info->job_id = job_id;
3050   compaction_job_info->base_input_level = c->start_level();
3051   compaction_job_info->output_level = c->output_level();
3052   compaction_job_info->stats = compaction_job_stats;
3053   compaction_job_info->table_properties = c->GetOutputTableProperties();
3054   compaction_job_info->compaction_reason = c->compaction_reason();
3055   compaction_job_info->compression = c->output_compression();
3056   for (size_t i = 0; i < c->num_input_levels(); ++i) {
3057     for (const auto fmd : *c->inputs(i)) {
3058       const FileDescriptor& desc = fmd->fd;
3059       const uint64_t file_number = desc.GetNumber();
3060       auto fn = TableFileName(c->immutable_cf_options()->cf_paths, file_number,
3061                               desc.GetPathId());
3062       compaction_job_info->input_files.push_back(fn);
3063       compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
3064           static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
3065       if (compaction_job_info->table_properties.count(fn) == 0) {
3066         std::shared_ptr<const TableProperties> tp;
3067         auto s = current->GetTableProperties(&tp, fmd, &fn);
3068         if (s.ok()) {
3069           compaction_job_info->table_properties[fn] = tp;
3070         }
3071       }
3072     }
3073   }
3074   for (const auto& newf : c->edit()->GetNewFiles()) {
3075     const FileMetaData& meta = newf.second;
3076     const FileDescriptor& desc = meta.fd;
3077     const uint64_t file_number = desc.GetNumber();
3078     compaction_job_info->output_files.push_back(TableFileName(
3079         c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
3080     compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
3081         newf.first, file_number, meta.oldest_blob_file_number});
3082   }
3083 }
3084 #endif
3085 
3086 // SuperVersionContext gets created and destructed outside of the lock --
3087 // we use this conveniently to:
3088 // * malloc one SuperVersion() outside of the lock -- new_superversion
3089 // * delete SuperVersion()s outside of the lock -- superversions_to_free
3090 //
3091 // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
3092 // same sv_context, we can't reuse the SuperVersion() that got
3093 // malloced because
3094 // first call already used it. In that rare case, we take a hit and create a
3095 // new SuperVersion() inside of the mutex. We do similar thing
3096 // for superversion_to_free
3097 
InstallSuperVersionAndScheduleWork(ColumnFamilyData * cfd,SuperVersionContext * sv_context,const MutableCFOptions & mutable_cf_options)3098 void DBImpl::InstallSuperVersionAndScheduleWork(
3099     ColumnFamilyData* cfd, SuperVersionContext* sv_context,
3100     const MutableCFOptions& mutable_cf_options) {
3101   mutex_.AssertHeld();
3102 
3103   // Update max_total_in_memory_state_
3104   size_t old_memtable_size = 0;
3105   auto* old_sv = cfd->GetSuperVersion();
3106   if (old_sv) {
3107     old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
3108                         old_sv->mutable_cf_options.max_write_buffer_number;
3109   }
3110 
3111   // this branch is unlikely to step in
3112   if (UNLIKELY(sv_context->new_superversion == nullptr)) {
3113     sv_context->NewSuperVersion();
3114   }
3115   cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
3116 
3117   // There may be a small data race here. The snapshot tricking bottommost
3118   // compaction may already be released here. But assuming there will always be
3119   // newer snapshot created and released frequently, the compaction will be
3120   // triggered soon anyway.
3121   bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3122   for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
3123     bottommost_files_mark_threshold_ = std::min(
3124         bottommost_files_mark_threshold_,
3125         my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
3126   }
3127 
3128   // Whenever we install new SuperVersion, we might need to issue new flushes or
3129   // compactions.
3130   SchedulePendingCompaction(cfd);
3131   MaybeScheduleFlushOrCompaction();
3132 
3133   // Update max_total_in_memory_state_
3134   max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
3135                                mutable_cf_options.write_buffer_size *
3136                                    mutable_cf_options.max_write_buffer_number;
3137 }
3138 
3139 // ShouldPurge is called by FindObsoleteFiles when doing a full scan,
3140 // and db mutex (mutex_) should already be held.
3141 // Actually, the current implementation of FindObsoleteFiles with
3142 // full_scan=true can issue I/O requests to obtain list of files in
3143 // directories, e.g. env_->getChildren while holding db mutex.
ShouldPurge(uint64_t file_number) const3144 bool DBImpl::ShouldPurge(uint64_t file_number) const {
3145   return files_grabbed_for_purge_.find(file_number) ==
3146              files_grabbed_for_purge_.end() &&
3147          purge_files_.find(file_number) == purge_files_.end();
3148 }
3149 
3150 // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
3151 // (mutex_) should already be held.
MarkAsGrabbedForPurge(uint64_t file_number)3152 void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
3153   files_grabbed_for_purge_.insert(file_number);
3154 }
3155 
SetSnapshotChecker(SnapshotChecker * snapshot_checker)3156 void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
3157   InstrumentedMutexLock l(&mutex_);
3158   // snapshot_checker_ should only set once. If we need to set it multiple
3159   // times, we need to make sure the old one is not deleted while it is still
3160   // using by a compaction job.
3161   assert(!snapshot_checker_);
3162   snapshot_checker_.reset(snapshot_checker);
3163 }
3164 
GetSnapshotContext(JobContext * job_context,std::vector<SequenceNumber> * snapshot_seqs,SequenceNumber * earliest_write_conflict_snapshot,SnapshotChecker ** snapshot_checker_ptr)3165 void DBImpl::GetSnapshotContext(
3166     JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
3167     SequenceNumber* earliest_write_conflict_snapshot,
3168     SnapshotChecker** snapshot_checker_ptr) {
3169   mutex_.AssertHeld();
3170   assert(job_context != nullptr);
3171   assert(snapshot_seqs != nullptr);
3172   assert(earliest_write_conflict_snapshot != nullptr);
3173   assert(snapshot_checker_ptr != nullptr);
3174 
3175   *snapshot_checker_ptr = snapshot_checker_.get();
3176   if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
3177     *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
3178   }
3179   if (*snapshot_checker_ptr != nullptr) {
3180     // If snapshot_checker is used, that means the flush/compaction may
3181     // contain values not visible to snapshot taken after
3182     // flush/compaction job starts. Take a snapshot and it will appear
3183     // in snapshot_seqs and force compaction iterator to consider such
3184     // snapshots.
3185     const Snapshot* job_snapshot =
3186         GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
3187     job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
3188   }
3189   *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
3190 }
3191 }  // namespace ROCKSDB_NAMESPACE
3192