1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/column_family.h"
11 
12 #include <algorithm>
13 #include <cinttypes>
14 #include <limits>
15 #include <string>
16 #include <vector>
17 
18 #include "db/compaction/compaction_picker.h"
19 #include "db/compaction/compaction_picker_fifo.h"
20 #include "db/compaction/compaction_picker_level.h"
21 #include "db/compaction/compaction_picker_universal.h"
22 #include "db/db_impl/db_impl.h"
23 #include "db/internal_stats.h"
24 #include "db/job_context.h"
25 #include "db/range_del_aggregator.h"
26 #include "db/table_properties_collector.h"
27 #include "db/version_set.h"
28 #include "db/write_controller.h"
29 #include "file/sst_file_manager_impl.h"
30 #include "memtable/hash_skiplist_rep.h"
31 #include "monitoring/thread_status_util.h"
32 #include "options/options_helper.h"
33 #include "port/port.h"
34 #include "table/block_based/block_based_table_factory.h"
35 #include "table/merging_iterator.h"
36 #include "util/autovector.h"
37 #include "util/compression.h"
38 
39 namespace ROCKSDB_NAMESPACE {
40 
ColumnFamilyHandleImpl(ColumnFamilyData * column_family_data,DBImpl * db,InstrumentedMutex * mutex)41 ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
42     ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
43     : cfd_(column_family_data), db_(db), mutex_(mutex) {
44   if (cfd_ != nullptr) {
45     cfd_->Ref();
46   }
47 }
48 
~ColumnFamilyHandleImpl()49 ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
50   if (cfd_ != nullptr) {
51 #ifndef ROCKSDB_LITE
52     for (auto& listener : cfd_->ioptions()->listeners) {
53       listener->OnColumnFamilyHandleDeletionStarted(this);
54     }
55 #endif  // ROCKSDB_LITE
56     // Job id == 0 means that this is not our background process, but rather
57     // user thread
58     // Need to hold some shared pointers owned by the initial_cf_options
59     // before final cleaning up finishes.
60     ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
61     JobContext job_context(0);
62     mutex_->Lock();
63     bool dropped = cfd_->IsDropped();
64     if (cfd_->UnrefAndTryDelete()) {
65       if (dropped) {
66         db_->FindObsoleteFiles(&job_context, false, true);
67       }
68     }
69     mutex_->Unlock();
70     if (job_context.HaveSomethingToDelete()) {
71       bool defer_purge =
72           db_->immutable_db_options().avoid_unnecessary_blocking_io;
73       db_->PurgeObsoleteFiles(job_context, defer_purge);
74       if (defer_purge) {
75         mutex_->Lock();
76         db_->SchedulePurge();
77         mutex_->Unlock();
78       }
79     }
80     job_context.Clean();
81   }
82 }
83 
GetID() const84 uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
85 
GetName() const86 const std::string& ColumnFamilyHandleImpl::GetName() const {
87   return cfd()->GetName();
88 }
89 
GetDescriptor(ColumnFamilyDescriptor * desc)90 Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
91 #ifndef ROCKSDB_LITE
92   // accessing mutable cf-options requires db mutex.
93   InstrumentedMutexLock l(mutex_);
94   *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
95   return Status::OK();
96 #else
97   (void)desc;
98   return Status::NotSupported();
99 #endif  // !ROCKSDB_LITE
100 }
101 
GetComparator() const102 const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
103   return cfd()->user_comparator();
104 }
105 
GetIntTblPropCollectorFactory(const ImmutableCFOptions & ioptions,std::vector<std::unique_ptr<IntTblPropCollectorFactory>> * int_tbl_prop_collector_factories)106 void GetIntTblPropCollectorFactory(
107     const ImmutableCFOptions& ioptions,
108     std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
109         int_tbl_prop_collector_factories) {
110   auto& collector_factories = ioptions.table_properties_collector_factories;
111   for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
112        ++i) {
113     assert(collector_factories[i]);
114     int_tbl_prop_collector_factories->emplace_back(
115         new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
116   }
117 }
118 
CheckCompressionSupported(const ColumnFamilyOptions & cf_options)119 Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
120   if (!cf_options.compression_per_level.empty()) {
121     for (size_t level = 0; level < cf_options.compression_per_level.size();
122          ++level) {
123       if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
124         return Status::InvalidArgument(
125             "Compression type " +
126             CompressionTypeToString(cf_options.compression_per_level[level]) +
127             " is not linked with the binary.");
128       }
129     }
130   } else {
131     if (!CompressionTypeSupported(cf_options.compression)) {
132       return Status::InvalidArgument(
133           "Compression type " +
134           CompressionTypeToString(cf_options.compression) +
135           " is not linked with the binary.");
136     }
137   }
138   if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
139     if (!ZSTD_TrainDictionarySupported()) {
140       return Status::InvalidArgument(
141           "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
142           "is not linked with the binary.");
143     }
144     if (cf_options.compression_opts.max_dict_bytes == 0) {
145       return Status::InvalidArgument(
146           "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
147           "should be nonzero if we're using zstd's dictionary generator.");
148     }
149   }
150   return Status::OK();
151 }
152 
CheckConcurrentWritesSupported(const ColumnFamilyOptions & cf_options)153 Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
154   if (cf_options.inplace_update_support) {
155     return Status::InvalidArgument(
156         "In-place memtable updates (inplace_update_support) is not compatible "
157         "with concurrent writes (allow_concurrent_memtable_write)");
158   }
159   if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
160     return Status::InvalidArgument(
161         "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
162   }
163   return Status::OK();
164 }
165 
CheckCFPathsSupported(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)166 Status CheckCFPathsSupported(const DBOptions& db_options,
167                              const ColumnFamilyOptions& cf_options) {
168   // More than one cf_paths are supported only in universal
169   // and level compaction styles. This function also checks the case
170   // in which cf_paths is not specified, which results in db_paths
171   // being used.
172   if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
173       (cf_options.compaction_style != kCompactionStyleLevel)) {
174     if (cf_options.cf_paths.size() > 1) {
175       return Status::NotSupported(
176           "More than one CF paths are only supported in "
177           "universal and level compaction styles. ");
178     } else if (cf_options.cf_paths.empty() &&
179                db_options.db_paths.size() > 1) {
180       return Status::NotSupported(
181           "More than one DB paths are only supported in "
182           "universal and level compaction styles. ");
183     }
184   }
185   return Status::OK();
186 }
187 
188 namespace {
189 const uint64_t kDefaultTtl = 0xfffffffffffffffe;
190 const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
191 };  // namespace
192 
SanitizeOptions(const ImmutableDBOptions & db_options,const ColumnFamilyOptions & src)193 ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
194                                     const ColumnFamilyOptions& src) {
195   ColumnFamilyOptions result = src;
196   size_t clamp_max = std::conditional<
197       sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
198       std::integral_constant<uint64_t, 64ull << 30>>::type::value;
199   ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
200   // if user sets arena_block_size, we trust user to use this value. Otherwise,
201   // calculate a proper value from writer_buffer_size;
202   if (result.arena_block_size <= 0) {
203     result.arena_block_size = result.write_buffer_size / 8;
204 
205     // Align up to 4k
206     const size_t align = 4 * 1024;
207     result.arena_block_size =
208         ((result.arena_block_size + align - 1) / align) * align;
209   }
210   result.min_write_buffer_number_to_merge =
211       std::min(result.min_write_buffer_number_to_merge,
212                result.max_write_buffer_number - 1);
213   if (result.min_write_buffer_number_to_merge < 1) {
214     result.min_write_buffer_number_to_merge = 1;
215   }
216 
217   if (result.num_levels < 1) {
218     result.num_levels = 1;
219   }
220   if (result.compaction_style == kCompactionStyleLevel &&
221       result.num_levels < 2) {
222     result.num_levels = 2;
223   }
224 
225   if (result.compaction_style == kCompactionStyleUniversal &&
226       db_options.allow_ingest_behind && result.num_levels < 3) {
227     result.num_levels = 3;
228   }
229 
230   if (result.max_write_buffer_number < 2) {
231     result.max_write_buffer_number = 2;
232   }
233   // fall back max_write_buffer_number_to_maintain if
234   // max_write_buffer_size_to_maintain is not set
235   if (result.max_write_buffer_size_to_maintain < 0) {
236     result.max_write_buffer_size_to_maintain =
237         result.max_write_buffer_number *
238         static_cast<int64_t>(result.write_buffer_size);
239   } else if (result.max_write_buffer_size_to_maintain == 0 &&
240              result.max_write_buffer_number_to_maintain < 0) {
241     result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
242   }
243   // bloom filter size shouldn't exceed 1/4 of memtable size.
244   if (result.memtable_prefix_bloom_size_ratio > 0.25) {
245     result.memtable_prefix_bloom_size_ratio = 0.25;
246   } else if (result.memtable_prefix_bloom_size_ratio < 0) {
247     result.memtable_prefix_bloom_size_ratio = 0;
248   }
249 
250   if (!result.prefix_extractor) {
251     assert(result.memtable_factory);
252     Slice name = result.memtable_factory->Name();
253     if (name.compare("HashSkipListRepFactory") == 0 ||
254         name.compare("HashLinkListRepFactory") == 0) {
255       result.memtable_factory = std::make_shared<SkipListFactory>();
256     }
257   }
258 
259   if (result.compaction_style == kCompactionStyleFIFO) {
260     result.num_levels = 1;
261     // since we delete level0 files in FIFO compaction when there are too many
262     // of them, these options don't really mean anything
263     result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
264     result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
265   }
266 
267   if (result.max_bytes_for_level_multiplier <= 0) {
268     result.max_bytes_for_level_multiplier = 1;
269   }
270 
271   if (result.level0_file_num_compaction_trigger == 0) {
272     ROCKS_LOG_WARN(db_options.info_log.get(),
273                    "level0_file_num_compaction_trigger cannot be 0");
274     result.level0_file_num_compaction_trigger = 1;
275   }
276 
277   if (result.level0_stop_writes_trigger <
278           result.level0_slowdown_writes_trigger ||
279       result.level0_slowdown_writes_trigger <
280           result.level0_file_num_compaction_trigger) {
281     ROCKS_LOG_WARN(db_options.info_log.get(),
282                    "This condition must be satisfied: "
283                    "level0_stop_writes_trigger(%d) >= "
284                    "level0_slowdown_writes_trigger(%d) >= "
285                    "level0_file_num_compaction_trigger(%d)",
286                    result.level0_stop_writes_trigger,
287                    result.level0_slowdown_writes_trigger,
288                    result.level0_file_num_compaction_trigger);
289     if (result.level0_slowdown_writes_trigger <
290         result.level0_file_num_compaction_trigger) {
291       result.level0_slowdown_writes_trigger =
292           result.level0_file_num_compaction_trigger;
293     }
294     if (result.level0_stop_writes_trigger <
295         result.level0_slowdown_writes_trigger) {
296       result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
297     }
298     ROCKS_LOG_WARN(db_options.info_log.get(),
299                    "Adjust the value to "
300                    "level0_stop_writes_trigger(%d)"
301                    "level0_slowdown_writes_trigger(%d)"
302                    "level0_file_num_compaction_trigger(%d)",
303                    result.level0_stop_writes_trigger,
304                    result.level0_slowdown_writes_trigger,
305                    result.level0_file_num_compaction_trigger);
306   }
307 
308   if (result.soft_pending_compaction_bytes_limit == 0) {
309     result.soft_pending_compaction_bytes_limit =
310         result.hard_pending_compaction_bytes_limit;
311   } else if (result.hard_pending_compaction_bytes_limit > 0 &&
312              result.soft_pending_compaction_bytes_limit >
313                  result.hard_pending_compaction_bytes_limit) {
314     result.soft_pending_compaction_bytes_limit =
315         result.hard_pending_compaction_bytes_limit;
316   }
317 
318 #ifndef ROCKSDB_LITE
319   // When the DB is stopped, it's possible that there are some .trash files that
320   // were not deleted yet, when we open the DB we will find these .trash files
321   // and schedule them to be deleted (or delete immediately if SstFileManager
322   // was not used)
323   auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
324   for (size_t i = 0; i < result.cf_paths.size(); i++) {
325     DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
326   }
327 #endif
328 
329   if (result.cf_paths.empty()) {
330     result.cf_paths = db_options.db_paths;
331   }
332 
333   if (result.level_compaction_dynamic_level_bytes) {
334     if (result.compaction_style != kCompactionStyleLevel ||
335         result.cf_paths.size() > 1U) {
336       // 1. level_compaction_dynamic_level_bytes only makes sense for
337       //    level-based compaction.
338       // 2. we don't yet know how to make both of this feature and multiple
339       //    DB path work.
340       result.level_compaction_dynamic_level_bytes = false;
341     }
342   }
343 
344   if (result.max_compaction_bytes == 0) {
345     result.max_compaction_bytes = result.target_file_size_base * 25;
346   }
347 
348   bool is_block_based_table =
349       (result.table_factory->Name() == BlockBasedTableFactory().Name());
350 
351   const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
352   if (result.ttl == kDefaultTtl) {
353     if (is_block_based_table &&
354         result.compaction_style != kCompactionStyleFIFO) {
355       result.ttl = kAdjustedTtl;
356     } else {
357       result.ttl = 0;
358     }
359   }
360 
361   const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
362 
363   // Turn on periodic compactions and set them to occur once every 30 days if
364   // compaction filters are used and periodic_compaction_seconds is set to the
365   // default value.
366   if (result.compaction_style != kCompactionStyleFIFO) {
367     if ((result.compaction_filter != nullptr ||
368          result.compaction_filter_factory != nullptr) &&
369         result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
370         is_block_based_table) {
371       result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
372     }
373   } else {
374     // result.compaction_style == kCompactionStyleFIFO
375     if (result.ttl == 0) {
376       if (is_block_based_table) {
377         if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
378           result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
379         }
380         result.ttl = result.periodic_compaction_seconds;
381       }
382     } else if (result.periodic_compaction_seconds != 0) {
383       result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
384     }
385   }
386 
387   // TTL compactions would work similar to Periodic Compactions in Universal in
388   // most of the cases. So, if ttl is set, execute the periodic compaction
389   // codepath.
390   if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
391     if (result.periodic_compaction_seconds != 0) {
392       result.periodic_compaction_seconds =
393           std::min(result.ttl, result.periodic_compaction_seconds);
394     } else {
395       result.periodic_compaction_seconds = result.ttl;
396     }
397   }
398 
399   if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
400     result.periodic_compaction_seconds = 0;
401   }
402 
403   return result;
404 }
405 
406 int SuperVersion::dummy = 0;
407 void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
408 void* const SuperVersion::kSVObsolete = nullptr;
409 
~SuperVersion()410 SuperVersion::~SuperVersion() {
411   for (auto td : to_delete) {
412     delete td;
413   }
414 }
415 
Ref()416 SuperVersion* SuperVersion::Ref() {
417   refs.fetch_add(1, std::memory_order_relaxed);
418   return this;
419 }
420 
Unref()421 bool SuperVersion::Unref() {
422   // fetch_sub returns the previous value of ref
423   uint32_t previous_refs = refs.fetch_sub(1);
424   assert(previous_refs > 0);
425   return previous_refs == 1;
426 }
427 
Cleanup()428 void SuperVersion::Cleanup() {
429   assert(refs.load(std::memory_order_relaxed) == 0);
430   imm->Unref(&to_delete);
431   MemTable* m = mem->Unref();
432   if (m != nullptr) {
433     auto* memory_usage = current->cfd()->imm()->current_memory_usage();
434     assert(*memory_usage >= m->ApproximateMemoryUsage());
435     *memory_usage -= m->ApproximateMemoryUsage();
436     to_delete.push_back(m);
437   }
438   current->Unref();
439   if (cfd->Unref()) {
440     delete cfd;
441   }
442 }
443 
Init(ColumnFamilyData * new_cfd,MemTable * new_mem,MemTableListVersion * new_imm,Version * new_current)444 void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
445                         MemTableListVersion* new_imm, Version* new_current) {
446   cfd = new_cfd;
447   mem = new_mem;
448   imm = new_imm;
449   current = new_current;
450   cfd->Ref();
451   mem->Ref();
452   imm->Ref();
453   current->Ref();
454   refs.store(1, std::memory_order_relaxed);
455 }
456 
457 namespace {
SuperVersionUnrefHandle(void * ptr)458 void SuperVersionUnrefHandle(void* ptr) {
459   // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
460   // destroyed. When former happens, the thread shouldn't see kSVInUse.
461   // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
462   // well.
463   SuperVersion* sv = static_cast<SuperVersion*>(ptr);
464   bool was_last_ref __attribute__((__unused__));
465   was_last_ref = sv->Unref();
466   // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
467   // This is important because we can't do SuperVersion cleanup here.
468   // That would require locking DB mutex, which would deadlock because
469   // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
470   assert(!was_last_ref);
471 }
472 }  // anonymous namespace
473 
GetDbPaths() const474 std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
475   std::vector<std::string> paths;
476   paths.reserve(ioptions_.cf_paths.size());
477   for (const DbPath& db_path : ioptions_.cf_paths) {
478     paths.emplace_back(db_path.path);
479   }
480   return paths;
481 }
482 
483 const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32;
484 
ColumnFamilyData(uint32_t id,const std::string & name,Version * _dummy_versions,Cache * _table_cache,WriteBufferManager * write_buffer_manager,const ColumnFamilyOptions & cf_options,const ImmutableDBOptions & db_options,const FileOptions & file_options,ColumnFamilySet * column_family_set,BlockCacheTracer * const block_cache_tracer)485 ColumnFamilyData::ColumnFamilyData(
486     uint32_t id, const std::string& name, Version* _dummy_versions,
487     Cache* _table_cache, WriteBufferManager* write_buffer_manager,
488     const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
489     const FileOptions& file_options, ColumnFamilySet* column_family_set,
490     BlockCacheTracer* const block_cache_tracer)
491     : id_(id),
492       name_(name),
493       dummy_versions_(_dummy_versions),
494       current_(nullptr),
495       refs_(0),
496       initialized_(false),
497       dropped_(false),
498       internal_comparator_(cf_options.comparator),
499       initial_cf_options_(SanitizeOptions(db_options, cf_options)),
500       ioptions_(db_options, initial_cf_options_),
501       mutable_cf_options_(initial_cf_options_),
502       is_delete_range_supported_(
503           cf_options.table_factory->IsDeleteRangeSupported()),
504       write_buffer_manager_(write_buffer_manager),
505       mem_(nullptr),
506       imm_(ioptions_.min_write_buffer_number_to_merge,
507            ioptions_.max_write_buffer_number_to_maintain,
508            ioptions_.max_write_buffer_size_to_maintain),
509       super_version_(nullptr),
510       super_version_number_(0),
511       local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
512       next_(nullptr),
513       prev_(nullptr),
514       log_number_(0),
515       flush_reason_(FlushReason::kOthers),
516       column_family_set_(column_family_set),
517       queued_for_flush_(false),
518       queued_for_compaction_(false),
519       prev_compaction_needed_bytes_(0),
520       allow_2pc_(db_options.allow_2pc),
521       last_memtable_id_(0),
522       db_paths_registered_(false) {
523   if (id_ != kDummyColumnFamilyDataId) {
524     // TODO(cc): RegisterDbPaths can be expensive, considering moving it
525     // outside of this constructor which might be called with db mutex held.
526     // TODO(cc): considering using ioptions_.fs, currently some tests rely on
527     // EnvWrapper, that's the main reason why we use env here.
528     Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
529     if (s.ok()) {
530       db_paths_registered_ = true;
531     } else {
532       ROCKS_LOG_ERROR(
533           ioptions_.info_log,
534           "Failed to register data paths of column family (id: %d, name: %s)",
535           id_, name_.c_str());
536     }
537   }
538   Ref();
539 
540   // Convert user defined table properties collector factories to internal ones.
541   GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
542 
543   // if _dummy_versions is nullptr, then this is a dummy column family.
544   if (_dummy_versions != nullptr) {
545     internal_stats_.reset(
546         new InternalStats(ioptions_.num_levels, db_options.env, this));
547     table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
548                                       block_cache_tracer));
549     if (ioptions_.compaction_style == kCompactionStyleLevel) {
550       compaction_picker_.reset(
551           new LevelCompactionPicker(ioptions_, &internal_comparator_));
552 #ifndef ROCKSDB_LITE
553     } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
554       compaction_picker_.reset(
555           new UniversalCompactionPicker(ioptions_, &internal_comparator_));
556     } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
557       compaction_picker_.reset(
558           new FIFOCompactionPicker(ioptions_, &internal_comparator_));
559     } else if (ioptions_.compaction_style == kCompactionStyleNone) {
560       compaction_picker_.reset(new NullCompactionPicker(
561           ioptions_, &internal_comparator_));
562       ROCKS_LOG_WARN(ioptions_.info_log,
563                      "Column family %s does not use any background compaction. "
564                      "Compactions can only be done via CompactFiles\n",
565                      GetName().c_str());
566 #endif  // !ROCKSDB_LITE
567     } else {
568       ROCKS_LOG_ERROR(ioptions_.info_log,
569                       "Unable to recognize the specified compaction style %d. "
570                       "Column family %s will use kCompactionStyleLevel.\n",
571                       ioptions_.compaction_style, GetName().c_str());
572       compaction_picker_.reset(
573           new LevelCompactionPicker(ioptions_, &internal_comparator_));
574     }
575 
576     if (column_family_set_->NumberOfColumnFamilies() < 10) {
577       ROCKS_LOG_INFO(ioptions_.info_log,
578                      "--------------- Options for column family [%s]:\n",
579                      name.c_str());
580       initial_cf_options_.Dump(ioptions_.info_log);
581     } else {
582       ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
583     }
584   }
585 
586   RecalculateWriteStallConditions(mutable_cf_options_);
587 }
588 
589 // DB mutex held
~ColumnFamilyData()590 ColumnFamilyData::~ColumnFamilyData() {
591   assert(refs_.load(std::memory_order_relaxed) == 0);
592   // remove from linked list
593   auto prev = prev_;
594   auto next = next_;
595   prev->next_ = next;
596   next->prev_ = prev;
597 
598   if (!dropped_ && column_family_set_ != nullptr) {
599     // If it's dropped, it's already removed from column family set
600     // If column_family_set_ == nullptr, this is dummy CFD and not in
601     // ColumnFamilySet
602     column_family_set_->RemoveColumnFamily(this);
603   }
604 
605   if (current_ != nullptr) {
606     current_->Unref();
607   }
608 
609   // It would be wrong if this ColumnFamilyData is in flush_queue_ or
610   // compaction_queue_ and we destroyed it
611   assert(!queued_for_flush_);
612   assert(!queued_for_compaction_);
613   assert(super_version_ == nullptr);
614 
615   if (dummy_versions_ != nullptr) {
616     // List must be empty
617     assert(dummy_versions_->TEST_Next() == dummy_versions_);
618     bool deleted __attribute__((__unused__));
619     deleted = dummy_versions_->Unref();
620     assert(deleted);
621   }
622 
623   if (mem_ != nullptr) {
624     delete mem_->Unref();
625   }
626   autovector<MemTable*> to_delete;
627   imm_.current()->Unref(&to_delete);
628   for (MemTable* m : to_delete) {
629     delete m;
630   }
631 
632   if (db_paths_registered_) {
633     // TODO(cc): considering using ioptions_.fs, currently some tests rely on
634     // EnvWrapper, that's the main reason why we use env here.
635     Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
636     if (!s.ok()) {
637       ROCKS_LOG_ERROR(
638           ioptions_.info_log,
639           "Failed to unregister data paths of column family (id: %d, name: %s)",
640           id_, name_.c_str());
641     }
642   }
643 }
644 
UnrefAndTryDelete()645 bool ColumnFamilyData::UnrefAndTryDelete() {
646   int old_refs = refs_.fetch_sub(1);
647   assert(old_refs > 0);
648 
649   if (old_refs == 1) {
650     assert(super_version_ == nullptr);
651     delete this;
652     return true;
653   }
654 
655   if (old_refs == 2 && super_version_ != nullptr) {
656     // Only the super_version_ holds me
657     SuperVersion* sv = super_version_;
658     super_version_ = nullptr;
659     // Release SuperVersion reference kept in ThreadLocalPtr.
660     // This must be done outside of mutex_ since unref handler can lock mutex.
661     sv->db_mutex->Unlock();
662     local_sv_.reset();
663     sv->db_mutex->Lock();
664 
665     if (sv->Unref()) {
666       // May delete this ColumnFamilyData after calling Cleanup()
667       sv->Cleanup();
668       delete sv;
669       return true;
670     }
671   }
672   return false;
673 }
674 
SetDropped()675 void ColumnFamilyData::SetDropped() {
676   // can't drop default CF
677   assert(id_ != 0);
678   dropped_ = true;
679   write_controller_token_.reset();
680 
681   // remove from column_family_set
682   column_family_set_->RemoveColumnFamily(this);
683 }
684 
GetLatestCFOptions() const685 ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
686   return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
687 }
688 
OldestLogToKeep()689 uint64_t ColumnFamilyData::OldestLogToKeep() {
690   auto current_log = GetLogNumber();
691 
692   if (allow_2pc_) {
693     autovector<MemTable*> empty_list;
694     auto imm_prep_log =
695         imm()->PrecomputeMinLogContainingPrepSection(empty_list);
696     auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
697 
698     if (imm_prep_log > 0 && imm_prep_log < current_log) {
699       current_log = imm_prep_log;
700     }
701 
702     if (mem_prep_log > 0 && mem_prep_log < current_log) {
703       current_log = mem_prep_log;
704     }
705   }
706 
707   return current_log;
708 }
709 
710 const double kIncSlowdownRatio = 0.8;
711 const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
712 const double kNearStopSlowdownRatio = 0.6;
713 const double kDelayRecoverSlowdownRatio = 1.4;
714 
715 namespace {
716 // If penalize_stop is true, we further reduce slowdown rate.
SetupDelay(WriteController * write_controller,uint64_t compaction_needed_bytes,uint64_t prev_compaction_need_bytes,bool penalize_stop,bool auto_comapctions_disabled)717 std::unique_ptr<WriteControllerToken> SetupDelay(
718     WriteController* write_controller, uint64_t compaction_needed_bytes,
719     uint64_t prev_compaction_need_bytes, bool penalize_stop,
720     bool auto_comapctions_disabled) {
721   const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
722 
723   uint64_t max_write_rate = write_controller->max_delayed_write_rate();
724   uint64_t write_rate = write_controller->delayed_write_rate();
725 
726   if (auto_comapctions_disabled) {
727     // When auto compaction is disabled, always use the value user gave.
728     write_rate = max_write_rate;
729   } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
730     // If user gives rate less than kMinWriteRate, don't adjust it.
731     //
732     // If already delayed, need to adjust based on previous compaction debt.
733     // When there are two or more column families require delay, we always
734     // increase or reduce write rate based on information for one single
735     // column family. It is likely to be OK but we can improve if there is a
736     // problem.
737     // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
738     // is only available in level-based compaction
739     //
740     // If the compaction debt stays the same as previously, we also further slow
741     // down. It usually means a mem table is full. It's mainly for the case
742     // where both of flush and compaction are much slower than the speed we
743     // insert to mem tables, so we need to actively slow down before we get
744     // feedback signal from compaction and flushes to avoid the full stop
745     // because of hitting the max write buffer number.
746     //
747     // If DB just falled into the stop condition, we need to further reduce
748     // the write rate to avoid the stop condition.
749     if (penalize_stop) {
750       // Penalize the near stop or stop condition by more aggressive slowdown.
751       // This is to provide the long term slowdown increase signal.
752       // The penalty is more than the reward of recovering to the normal
753       // condition.
754       write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
755                                          kNearStopSlowdownRatio);
756       if (write_rate < kMinWriteRate) {
757         write_rate = kMinWriteRate;
758       }
759     } else if (prev_compaction_need_bytes > 0 &&
760                prev_compaction_need_bytes <= compaction_needed_bytes) {
761       write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
762                                          kIncSlowdownRatio);
763       if (write_rate < kMinWriteRate) {
764         write_rate = kMinWriteRate;
765       }
766     } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
767       // We are speeding up by ratio of kSlowdownRatio when we have paid
768       // compaction debt. But we'll never speed up to faster than the write rate
769       // given by users.
770       write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
771                                          kDecSlowdownRatio);
772       if (write_rate > max_write_rate) {
773         write_rate = max_write_rate;
774       }
775     }
776   }
777   return write_controller->GetDelayToken(write_rate);
778 }
779 
GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,int level0_slowdown_writes_trigger)780 int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
781                                     int level0_slowdown_writes_trigger) {
782   // SanitizeOptions() ensures it.
783   assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
784 
785   if (level0_file_num_compaction_trigger < 0) {
786     return std::numeric_limits<int>::max();
787   }
788 
789   const int64_t twice_level0_trigger =
790       static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
791 
792   const int64_t one_fourth_trigger_slowdown =
793       static_cast<int64_t>(level0_file_num_compaction_trigger) +
794       ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
795        4);
796 
797   assert(twice_level0_trigger >= 0);
798   assert(one_fourth_trigger_slowdown >= 0);
799 
800   // 1/4 of the way between L0 compaction trigger threshold and slowdown
801   // condition.
802   // Or twice as compaction trigger, if it is smaller.
803   int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
804   if (res >= port::kMaxInt32) {
805     return port::kMaxInt32;
806   } else {
807     // res fits in int
808     return static_cast<int>(res);
809   }
810 }
811 }  // namespace
812 
813 std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
GetWriteStallConditionAndCause(int num_unflushed_memtables,int num_l0_files,uint64_t num_compaction_needed_bytes,const MutableCFOptions & mutable_cf_options)814 ColumnFamilyData::GetWriteStallConditionAndCause(
815     int num_unflushed_memtables, int num_l0_files,
816     uint64_t num_compaction_needed_bytes,
817     const MutableCFOptions& mutable_cf_options) {
818   if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
819     return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
820   } else if (!mutable_cf_options.disable_auto_compactions &&
821              num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
822     return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
823   } else if (!mutable_cf_options.disable_auto_compactions &&
824              mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
825              num_compaction_needed_bytes >=
826                  mutable_cf_options.hard_pending_compaction_bytes_limit) {
827     return {WriteStallCondition::kStopped,
828             WriteStallCause::kPendingCompactionBytes};
829   } else if (mutable_cf_options.max_write_buffer_number > 3 &&
830              num_unflushed_memtables >=
831                  mutable_cf_options.max_write_buffer_number - 1) {
832     return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
833   } else if (!mutable_cf_options.disable_auto_compactions &&
834              mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
835              num_l0_files >=
836                  mutable_cf_options.level0_slowdown_writes_trigger) {
837     return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
838   } else if (!mutable_cf_options.disable_auto_compactions &&
839              mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
840              num_compaction_needed_bytes >=
841                  mutable_cf_options.soft_pending_compaction_bytes_limit) {
842     return {WriteStallCondition::kDelayed,
843             WriteStallCause::kPendingCompactionBytes};
844   }
845   return {WriteStallCondition::kNormal, WriteStallCause::kNone};
846 }
847 
RecalculateWriteStallConditions(const MutableCFOptions & mutable_cf_options)848 WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
849       const MutableCFOptions& mutable_cf_options) {
850   auto write_stall_condition = WriteStallCondition::kNormal;
851   if (current_ != nullptr) {
852     auto* vstorage = current_->storage_info();
853     auto write_controller = column_family_set_->write_controller_;
854     uint64_t compaction_needed_bytes =
855         vstorage->estimated_compaction_needed_bytes();
856 
857     auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
858         imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
859         vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
860     write_stall_condition = write_stall_condition_and_cause.first;
861     auto write_stall_cause = write_stall_condition_and_cause.second;
862 
863     bool was_stopped = write_controller->IsStopped();
864     bool needed_delay = write_controller->NeedsDelay();
865 
866     if (write_stall_condition == WriteStallCondition::kStopped &&
867         write_stall_cause == WriteStallCause::kMemtableLimit) {
868       write_controller_token_ = write_controller->GetStopToken();
869       internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
870       ROCKS_LOG_WARN(
871           ioptions_.info_log,
872           "[%s] Stopping writes because we have %d immutable memtables "
873           "(waiting for flush), max_write_buffer_number is set to %d",
874           name_.c_str(), imm()->NumNotFlushed(),
875           mutable_cf_options.max_write_buffer_number);
876     } else if (write_stall_condition == WriteStallCondition::kStopped &&
877                write_stall_cause == WriteStallCause::kL0FileCountLimit) {
878       write_controller_token_ = write_controller->GetStopToken();
879       internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
880       if (compaction_picker_->IsLevel0CompactionInProgress()) {
881         internal_stats_->AddCFStats(
882             InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
883       }
884       ROCKS_LOG_WARN(ioptions_.info_log,
885                      "[%s] Stopping writes because we have %d level-0 files",
886                      name_.c_str(), vstorage->l0_delay_trigger_count());
887     } else if (write_stall_condition == WriteStallCondition::kStopped &&
888                write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
889       write_controller_token_ = write_controller->GetStopToken();
890       internal_stats_->AddCFStats(
891           InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
892       ROCKS_LOG_WARN(
893           ioptions_.info_log,
894           "[%s] Stopping writes because of estimated pending compaction "
895           "bytes %" PRIu64,
896           name_.c_str(), compaction_needed_bytes);
897     } else if (write_stall_condition == WriteStallCondition::kDelayed &&
898                write_stall_cause == WriteStallCause::kMemtableLimit) {
899       write_controller_token_ =
900           SetupDelay(write_controller, compaction_needed_bytes,
901                      prev_compaction_needed_bytes_, was_stopped,
902                      mutable_cf_options.disable_auto_compactions);
903       internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
904       ROCKS_LOG_WARN(
905           ioptions_.info_log,
906           "[%s] Stalling writes because we have %d immutable memtables "
907           "(waiting for flush), max_write_buffer_number is set to %d "
908           "rate %" PRIu64,
909           name_.c_str(), imm()->NumNotFlushed(),
910           mutable_cf_options.max_write_buffer_number,
911           write_controller->delayed_write_rate());
912     } else if (write_stall_condition == WriteStallCondition::kDelayed &&
913                write_stall_cause == WriteStallCause::kL0FileCountLimit) {
914       // L0 is the last two files from stopping.
915       bool near_stop = vstorage->l0_delay_trigger_count() >=
916                        mutable_cf_options.level0_stop_writes_trigger - 2;
917       write_controller_token_ =
918           SetupDelay(write_controller, compaction_needed_bytes,
919                      prev_compaction_needed_bytes_, was_stopped || near_stop,
920                      mutable_cf_options.disable_auto_compactions);
921       internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
922                                   1);
923       if (compaction_picker_->IsLevel0CompactionInProgress()) {
924         internal_stats_->AddCFStats(
925             InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
926       }
927       ROCKS_LOG_WARN(ioptions_.info_log,
928                      "[%s] Stalling writes because we have %d level-0 files "
929                      "rate %" PRIu64,
930                      name_.c_str(), vstorage->l0_delay_trigger_count(),
931                      write_controller->delayed_write_rate());
932     } else if (write_stall_condition == WriteStallCondition::kDelayed &&
933                write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
934       // If the distance to hard limit is less than 1/4 of the gap between soft
935       // and
936       // hard bytes limit, we think it is near stop and speed up the slowdown.
937       bool near_stop =
938           mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
939           (compaction_needed_bytes -
940            mutable_cf_options.soft_pending_compaction_bytes_limit) >
941               3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
942                    mutable_cf_options.soft_pending_compaction_bytes_limit) /
943                   4;
944 
945       write_controller_token_ =
946           SetupDelay(write_controller, compaction_needed_bytes,
947                      prev_compaction_needed_bytes_, was_stopped || near_stop,
948                      mutable_cf_options.disable_auto_compactions);
949       internal_stats_->AddCFStats(
950           InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
951       ROCKS_LOG_WARN(
952           ioptions_.info_log,
953           "[%s] Stalling writes because of estimated pending compaction "
954           "bytes %" PRIu64 " rate %" PRIu64,
955           name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
956           write_controller->delayed_write_rate());
957     } else {
958       assert(write_stall_condition == WriteStallCondition::kNormal);
959       if (vstorage->l0_delay_trigger_count() >=
960           GetL0ThresholdSpeedupCompaction(
961               mutable_cf_options.level0_file_num_compaction_trigger,
962               mutable_cf_options.level0_slowdown_writes_trigger)) {
963         write_controller_token_ =
964             write_controller->GetCompactionPressureToken();
965         ROCKS_LOG_INFO(
966             ioptions_.info_log,
967             "[%s] Increasing compaction threads because we have %d level-0 "
968             "files ",
969             name_.c_str(), vstorage->l0_delay_trigger_count());
970       } else if (vstorage->estimated_compaction_needed_bytes() >=
971                  mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
972         // Increase compaction threads if bytes needed for compaction exceeds
973         // 1/4 of threshold for slowing down.
974         // If soft pending compaction byte limit is not set, always speed up
975         // compaction.
976         write_controller_token_ =
977             write_controller->GetCompactionPressureToken();
978         if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
979           ROCKS_LOG_INFO(
980               ioptions_.info_log,
981               "[%s] Increasing compaction threads because of estimated pending "
982               "compaction "
983               "bytes %" PRIu64,
984               name_.c_str(), vstorage->estimated_compaction_needed_bytes());
985         }
986       } else {
987         write_controller_token_.reset();
988       }
989       // If the DB recovers from delay conditions, we reward with reducing
990       // double the slowdown ratio. This is to balance the long term slowdown
991       // increase signal.
992       if (needed_delay) {
993         uint64_t write_rate = write_controller->delayed_write_rate();
994         write_controller->set_delayed_write_rate(static_cast<uint64_t>(
995             static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
996         // Set the low pri limit to be 1/4 the delayed write rate.
997         // Note we don't reset this value even after delay condition is relased.
998         // Low-pri rate will continue to apply if there is a compaction
999         // pressure.
1000         write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
1001                                                                     4);
1002       }
1003     }
1004     prev_compaction_needed_bytes_ = compaction_needed_bytes;
1005   }
1006   return write_stall_condition;
1007 }
1008 
soptions() const1009 const FileOptions* ColumnFamilyData::soptions() const {
1010   return &(column_family_set_->file_options_);
1011 }
1012 
SetCurrent(Version * current_version)1013 void ColumnFamilyData::SetCurrent(Version* current_version) {
1014   current_ = current_version;
1015 }
1016 
GetNumLiveVersions() const1017 uint64_t ColumnFamilyData::GetNumLiveVersions() const {
1018   return VersionSet::GetNumLiveVersions(dummy_versions_);
1019 }
1020 
GetTotalSstFilesSize() const1021 uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
1022   return VersionSet::GetTotalSstFilesSize(dummy_versions_);
1023 }
1024 
GetLiveSstFilesSize() const1025 uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
1026   return current_->GetSstFilesSize();
1027 }
1028 
ConstructNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)1029 MemTable* ColumnFamilyData::ConstructNewMemtable(
1030     const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1031   return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
1032                       write_buffer_manager_, earliest_seq, id_);
1033 }
1034 
CreateNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)1035 void ColumnFamilyData::CreateNewMemtable(
1036     const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1037   if (mem_ != nullptr) {
1038     delete mem_->Unref();
1039   }
1040   SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1041   mem_->Ref();
1042 }
1043 
NeedsCompaction() const1044 bool ColumnFamilyData::NeedsCompaction() const {
1045   return compaction_picker_->NeedsCompaction(current_->storage_info());
1046 }
1047 
PickCompaction(const MutableCFOptions & mutable_options,LogBuffer * log_buffer)1048 Compaction* ColumnFamilyData::PickCompaction(
1049     const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
1050   SequenceNumber earliest_mem_seqno =
1051       std::min(mem_->GetEarliestSequenceNumber(),
1052                imm_.current()->GetEarliestSequenceNumber(false));
1053   auto* result = compaction_picker_->PickCompaction(
1054       GetName(), mutable_options, current_->storage_info(), log_buffer,
1055       earliest_mem_seqno);
1056   if (result != nullptr) {
1057     result->SetInputVersion(current_);
1058   }
1059   return result;
1060 }
1061 
RangeOverlapWithCompaction(const Slice & smallest_user_key,const Slice & largest_user_key,int level) const1062 bool ColumnFamilyData::RangeOverlapWithCompaction(
1063     const Slice& smallest_user_key, const Slice& largest_user_key,
1064     int level) const {
1065   return compaction_picker_->RangeOverlapWithCompaction(
1066       smallest_user_key, largest_user_key, level);
1067 }
1068 
RangesOverlapWithMemtables(const autovector<Range> & ranges,SuperVersion * super_version,bool * overlap)1069 Status ColumnFamilyData::RangesOverlapWithMemtables(
1070     const autovector<Range>& ranges, SuperVersion* super_version,
1071     bool* overlap) {
1072   assert(overlap != nullptr);
1073   *overlap = false;
1074   // Create an InternalIterator over all unflushed memtables
1075   Arena arena;
1076   ReadOptions read_opts;
1077   read_opts.total_order_seek = true;
1078   MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
1079   merge_iter_builder.AddIterator(
1080       super_version->mem->NewIterator(read_opts, &arena));
1081   super_version->imm->AddIterators(read_opts, &merge_iter_builder);
1082   ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
1083 
1084   auto read_seq = super_version->current->version_set()->LastSequence();
1085   ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1086   auto* active_range_del_iter =
1087       super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
1088   range_del_agg.AddTombstones(
1089       std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
1090   super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
1091                                                  &range_del_agg);
1092 
1093   Status status;
1094   for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
1095     auto* vstorage = super_version->current->storage_info();
1096     auto* ucmp = vstorage->InternalComparator()->user_comparator();
1097     InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
1098                             kValueTypeForSeek);
1099     memtable_iter->Seek(range_start.Encode());
1100     status = memtable_iter->status();
1101     ParsedInternalKey seek_result;
1102     if (status.ok()) {
1103       if (memtable_iter->Valid() &&
1104           !ParseInternalKey(memtable_iter->key(), &seek_result)) {
1105         status = Status::Corruption("DB have corrupted keys");
1106       }
1107     }
1108     if (status.ok()) {
1109       if (memtable_iter->Valid() &&
1110           ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
1111         *overlap = true;
1112       } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
1113                                                  ranges[i].limit)) {
1114         *overlap = true;
1115       }
1116     }
1117   }
1118   return status;
1119 }
1120 
1121 const int ColumnFamilyData::kCompactAllLevels = -1;
1122 const int ColumnFamilyData::kCompactToBaseLevel = -2;
1123 
CompactRange(const MutableCFOptions & mutable_cf_options,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const InternalKey * begin,const InternalKey * end,InternalKey ** compaction_end,bool * conflict,uint64_t max_file_num_to_ignore)1124 Compaction* ColumnFamilyData::CompactRange(
1125     const MutableCFOptions& mutable_cf_options, int input_level,
1126     int output_level, const CompactRangeOptions& compact_range_options,
1127     const InternalKey* begin, const InternalKey* end,
1128     InternalKey** compaction_end, bool* conflict,
1129     uint64_t max_file_num_to_ignore) {
1130   auto* result = compaction_picker_->CompactRange(
1131       GetName(), mutable_cf_options, current_->storage_info(), input_level,
1132       output_level, compact_range_options, begin, end, compaction_end, conflict,
1133       max_file_num_to_ignore);
1134   if (result != nullptr) {
1135     result->SetInputVersion(current_);
1136   }
1137   return result;
1138 }
1139 
GetReferencedSuperVersion(DBImpl * db)1140 SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
1141   SuperVersion* sv = GetThreadLocalSuperVersion(db);
1142   sv->Ref();
1143   if (!ReturnThreadLocalSuperVersion(sv)) {
1144     // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
1145     // when the thread-local pointer was populated. So, the Ref() earlier in
1146     // this function still prevents the returned SuperVersion* from being
1147     // deleted out from under the caller.
1148     sv->Unref();
1149   }
1150   return sv;
1151 }
1152 
GetThreadLocalSuperVersion(DBImpl * db)1153 SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1154   // The SuperVersion is cached in thread local storage to avoid acquiring
1155   // mutex when SuperVersion does not change since the last use. When a new
1156   // SuperVersion is installed, the compaction or flush thread cleans up
1157   // cached SuperVersion in all existing thread local storage. To avoid
1158   // acquiring mutex for this operation, we use atomic Swap() on the thread
1159   // local pointer to guarantee exclusive access. If the thread local pointer
1160   // is being used while a new SuperVersion is installed, the cached
1161   // SuperVersion can become stale. In that case, the background thread would
1162   // have swapped in kSVObsolete. We re-check the value at when returning
1163   // SuperVersion back to thread local, with an atomic compare and swap.
1164   // The superversion will need to be released if detected to be stale.
1165   void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
1166   // Invariant:
1167   // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
1168   // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
1169   // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
1170   // (if no Scrape happens).
1171   assert(ptr != SuperVersion::kSVInUse);
1172   SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1173   if (sv == SuperVersion::kSVObsolete ||
1174       sv->version_number != super_version_number_.load()) {
1175     RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1176     SuperVersion* sv_to_delete = nullptr;
1177 
1178     if (sv && sv->Unref()) {
1179       RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1180       db->mutex()->Lock();
1181       // NOTE: underlying resources held by superversion (sst files) might
1182       // not be released until the next background job.
1183       sv->Cleanup();
1184       if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
1185         db->AddSuperVersionsToFreeQueue(sv);
1186         db->SchedulePurge();
1187       } else {
1188         sv_to_delete = sv;
1189       }
1190     } else {
1191       db->mutex()->Lock();
1192     }
1193     sv = super_version_->Ref();
1194     db->mutex()->Unlock();
1195 
1196     delete sv_to_delete;
1197   }
1198   assert(sv != nullptr);
1199   return sv;
1200 }
1201 
ReturnThreadLocalSuperVersion(SuperVersion * sv)1202 bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
1203   assert(sv != nullptr);
1204   // Put the SuperVersion back
1205   void* expected = SuperVersion::kSVInUse;
1206   if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
1207     // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
1208     // storage has not been altered and no Scrape has happened. The
1209     // SuperVersion is still current.
1210     return true;
1211   } else {
1212     // ThreadLocal scrape happened in the process of this GetImpl call (after
1213     // thread local Swap() at the beginning and before CompareAndSwap()).
1214     // This means the SuperVersion it holds is obsolete.
1215     assert(expected == SuperVersion::kSVObsolete);
1216   }
1217   return false;
1218 }
1219 
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex)1220 void ColumnFamilyData::InstallSuperVersion(
1221     SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1222   db_mutex->AssertHeld();
1223   return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1224 }
1225 
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex,const MutableCFOptions & mutable_cf_options)1226 void ColumnFamilyData::InstallSuperVersion(
1227     SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1228     const MutableCFOptions& mutable_cf_options) {
1229   SuperVersion* new_superversion = sv_context->new_superversion.release();
1230   new_superversion->db_mutex = db_mutex;
1231   new_superversion->mutable_cf_options = mutable_cf_options;
1232   new_superversion->Init(this, mem_, imm_.current(), current_);
1233   SuperVersion* old_superversion = super_version_;
1234   super_version_ = new_superversion;
1235   ++super_version_number_;
1236   super_version_->version_number = super_version_number_;
1237   super_version_->write_stall_condition =
1238       RecalculateWriteStallConditions(mutable_cf_options);
1239 
1240   if (old_superversion != nullptr) {
1241     // Reset SuperVersions cached in thread local storage.
1242     // This should be done before old_superversion->Unref(). That's to ensure
1243     // that local_sv_ never holds the last reference to SuperVersion, since
1244     // it has no means to safely do SuperVersion cleanup.
1245     ResetThreadLocalSuperVersions();
1246 
1247     if (old_superversion->mutable_cf_options.write_buffer_size !=
1248         mutable_cf_options.write_buffer_size) {
1249       mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
1250     }
1251     if (old_superversion->write_stall_condition !=
1252         new_superversion->write_stall_condition) {
1253       sv_context->PushWriteStallNotification(
1254           old_superversion->write_stall_condition,
1255           new_superversion->write_stall_condition, GetName(), ioptions());
1256     }
1257     if (old_superversion->Unref()) {
1258       old_superversion->Cleanup();
1259       sv_context->superversions_to_free.push_back(old_superversion);
1260     }
1261   }
1262 }
1263 
ResetThreadLocalSuperVersions()1264 void ColumnFamilyData::ResetThreadLocalSuperVersions() {
1265   autovector<void*> sv_ptrs;
1266   local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1267   for (auto ptr : sv_ptrs) {
1268     assert(ptr);
1269     if (ptr == SuperVersion::kSVInUse) {
1270       continue;
1271     }
1272     auto sv = static_cast<SuperVersion*>(ptr);
1273     bool was_last_ref __attribute__((__unused__));
1274     was_last_ref = sv->Unref();
1275     // sv couldn't have been the last reference because
1276     // ResetThreadLocalSuperVersions() is called before
1277     // unref'ing super_version_.
1278     assert(!was_last_ref);
1279   }
1280 }
1281 
ValidateOptions(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)1282 Status ColumnFamilyData::ValidateOptions(
1283     const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
1284   Status s;
1285   s = CheckCompressionSupported(cf_options);
1286   if (s.ok() && db_options.allow_concurrent_memtable_write) {
1287     s = CheckConcurrentWritesSupported(cf_options);
1288   }
1289   if (s.ok() && db_options.unordered_write &&
1290       cf_options.max_successive_merges != 0) {
1291     s = Status::InvalidArgument(
1292         "max_successive_merges > 0 is incompatible with unordered_write");
1293   }
1294   if (s.ok()) {
1295     s = CheckCFPathsSupported(db_options, cf_options);
1296   }
1297   if (!s.ok()) {
1298     return s;
1299   }
1300 
1301   if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1302     if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
1303       return Status::NotSupported(
1304           "TTL is only supported in Block-Based Table format. ");
1305     }
1306   }
1307 
1308   if (cf_options.periodic_compaction_seconds > 0 &&
1309       cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1310     if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
1311       return Status::NotSupported(
1312           "Periodic Compaction is only supported in "
1313           "Block-Based Table format. ");
1314     }
1315   }
1316   return s;
1317 }
1318 
1319 #ifndef ROCKSDB_LITE
SetOptions(const DBOptions & db_options,const std::unordered_map<std::string,std::string> & options_map)1320 Status ColumnFamilyData::SetOptions(
1321     const DBOptions& db_options,
1322     const std::unordered_map<std::string, std::string>& options_map) {
1323   MutableCFOptions new_mutable_cf_options;
1324   Status s =
1325       GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
1326                                    ioptions_.info_log, &new_mutable_cf_options);
1327   if (s.ok()) {
1328     ColumnFamilyOptions cf_options =
1329         BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
1330     s = ValidateOptions(db_options, cf_options);
1331   }
1332   if (s.ok()) {
1333     mutable_cf_options_ = new_mutable_cf_options;
1334     mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1335   }
1336   return s;
1337 }
1338 #endif  // ROCKSDB_LITE
1339 
1340 // REQUIRES: DB mutex held
CalculateSSTWriteHint(int level)1341 Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
1342   if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
1343     return Env::WLTH_NOT_SET;
1344   }
1345   if (level == 0) {
1346     return Env::WLTH_MEDIUM;
1347   }
1348   int base_level = current_->storage_info()->base_level();
1349 
1350   // L1: medium, L2: long, ...
1351   if (level - base_level >= 2) {
1352     return Env::WLTH_EXTREME;
1353   } else if (level < base_level) {
1354     // There is no restriction which prevents level passed in to be smaller
1355     // than base_level.
1356     return Env::WLTH_MEDIUM;
1357   }
1358   return static_cast<Env::WriteLifeTimeHint>(level - base_level +
1359                             static_cast<int>(Env::WLTH_MEDIUM));
1360 }
1361 
AddDirectories(std::map<std::string,std::shared_ptr<FSDirectory>> * created_dirs)1362 Status ColumnFamilyData::AddDirectories(
1363     std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
1364   Status s;
1365   assert(created_dirs != nullptr);
1366   assert(data_dirs_.empty());
1367   for (auto& p : ioptions_.cf_paths) {
1368     auto existing_dir = created_dirs->find(p.path);
1369 
1370     if (existing_dir == created_dirs->end()) {
1371       std::unique_ptr<FSDirectory> path_directory;
1372       s = DBImpl::CreateAndNewDirectory(ioptions_.fs, p.path, &path_directory);
1373       if (!s.ok()) {
1374         return s;
1375       }
1376       assert(path_directory != nullptr);
1377       data_dirs_.emplace_back(path_directory.release());
1378       (*created_dirs)[p.path] = data_dirs_.back();
1379     } else {
1380       data_dirs_.emplace_back(existing_dir->second);
1381     }
1382   }
1383   assert(data_dirs_.size() == ioptions_.cf_paths.size());
1384   return s;
1385 }
1386 
GetDataDir(size_t path_id) const1387 FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
1388   if (data_dirs_.empty()) {
1389     return nullptr;
1390   }
1391 
1392   assert(path_id < data_dirs_.size());
1393   return data_dirs_[path_id].get();
1394 }
1395 
ColumnFamilySet(const std::string & dbname,const ImmutableDBOptions * db_options,const FileOptions & file_options,Cache * table_cache,WriteBufferManager * _write_buffer_manager,WriteController * _write_controller,BlockCacheTracer * const block_cache_tracer)1396 ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1397                                  const ImmutableDBOptions* db_options,
1398                                  const FileOptions& file_options,
1399                                  Cache* table_cache,
1400                                  WriteBufferManager* _write_buffer_manager,
1401                                  WriteController* _write_controller,
1402                                  BlockCacheTracer* const block_cache_tracer)
1403     : max_column_family_(0),
1404       dummy_cfd_(new ColumnFamilyData(
1405           ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
1406           nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr,
1407           block_cache_tracer)),
1408       default_cfd_cache_(nullptr),
1409       db_name_(dbname),
1410       db_options_(db_options),
1411       file_options_(file_options),
1412       table_cache_(table_cache),
1413       write_buffer_manager_(_write_buffer_manager),
1414       write_controller_(_write_controller),
1415       block_cache_tracer_(block_cache_tracer) {
1416   // initialize linked list
1417   dummy_cfd_->prev_ = dummy_cfd_;
1418   dummy_cfd_->next_ = dummy_cfd_;
1419 }
1420 
~ColumnFamilySet()1421 ColumnFamilySet::~ColumnFamilySet() {
1422   while (column_family_data_.size() > 0) {
1423     // cfd destructor will delete itself from column_family_data_
1424     auto cfd = column_family_data_.begin()->second;
1425     bool last_ref __attribute__((__unused__));
1426     last_ref = cfd->UnrefAndTryDelete();
1427     assert(last_ref);
1428   }
1429   bool dummy_last_ref __attribute__((__unused__));
1430   dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1431   assert(dummy_last_ref);
1432 }
1433 
GetDefault() const1434 ColumnFamilyData* ColumnFamilySet::GetDefault() const {
1435   assert(default_cfd_cache_ != nullptr);
1436   return default_cfd_cache_;
1437 }
1438 
GetColumnFamily(uint32_t id) const1439 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
1440   auto cfd_iter = column_family_data_.find(id);
1441   if (cfd_iter != column_family_data_.end()) {
1442     return cfd_iter->second;
1443   } else {
1444     return nullptr;
1445   }
1446 }
1447 
GetColumnFamily(const std::string & name) const1448 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
1449     const {
1450   auto cfd_iter = column_families_.find(name);
1451   if (cfd_iter != column_families_.end()) {
1452     auto cfd = GetColumnFamily(cfd_iter->second);
1453     assert(cfd != nullptr);
1454     return cfd;
1455   } else {
1456     return nullptr;
1457   }
1458 }
1459 
GetNextColumnFamilyID()1460 uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
1461   return ++max_column_family_;
1462 }
1463 
GetMaxColumnFamily()1464 uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
1465 
UpdateMaxColumnFamily(uint32_t new_max_column_family)1466 void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
1467   max_column_family_ = std::max(new_max_column_family, max_column_family_);
1468 }
1469 
NumberOfColumnFamilies() const1470 size_t ColumnFamilySet::NumberOfColumnFamilies() const {
1471   return column_families_.size();
1472 }
1473 
1474 // under a DB mutex AND write thread
CreateColumnFamily(const std::string & name,uint32_t id,Version * dummy_versions,const ColumnFamilyOptions & options)1475 ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
1476     const std::string& name, uint32_t id, Version* dummy_versions,
1477     const ColumnFamilyOptions& options) {
1478   assert(column_families_.find(name) == column_families_.end());
1479   ColumnFamilyData* new_cfd = new ColumnFamilyData(
1480       id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1481       *db_options_, file_options_, this, block_cache_tracer_);
1482   column_families_.insert({name, id});
1483   column_family_data_.insert({id, new_cfd});
1484   max_column_family_ = std::max(max_column_family_, id);
1485   // add to linked list
1486   new_cfd->next_ = dummy_cfd_;
1487   auto prev = dummy_cfd_->prev_;
1488   new_cfd->prev_ = prev;
1489   prev->next_ = new_cfd;
1490   dummy_cfd_->prev_ = new_cfd;
1491   if (id == 0) {
1492     default_cfd_cache_ = new_cfd;
1493   }
1494   return new_cfd;
1495 }
1496 
1497 // REQUIRES: DB mutex held
FreeDeadColumnFamilies()1498 void ColumnFamilySet::FreeDeadColumnFamilies() {
1499   autovector<ColumnFamilyData*> to_delete;
1500   for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
1501     if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1502       to_delete.push_back(cfd);
1503     }
1504   }
1505   for (auto cfd : to_delete) {
1506     // this is very rare, so it's not a problem that we do it under a mutex
1507     delete cfd;
1508   }
1509 }
1510 
1511 // under a DB mutex AND from a write thread
RemoveColumnFamily(ColumnFamilyData * cfd)1512 void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1513   auto cfd_iter = column_family_data_.find(cfd->GetID());
1514   assert(cfd_iter != column_family_data_.end());
1515   column_family_data_.erase(cfd_iter);
1516   column_families_.erase(cfd->GetName());
1517 }
1518 
1519 // under a DB mutex OR from a write thread
Seek(uint32_t column_family_id)1520 bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1521   if (column_family_id == 0) {
1522     // optimization for common case
1523     current_ = column_family_set_->GetDefault();
1524   } else {
1525     current_ = column_family_set_->GetColumnFamily(column_family_id);
1526   }
1527   handle_.SetCFD(current_);
1528   return current_ != nullptr;
1529 }
1530 
GetLogNumber() const1531 uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1532   assert(current_ != nullptr);
1533   return current_->GetLogNumber();
1534 }
1535 
GetMemTable() const1536 MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1537   assert(current_ != nullptr);
1538   return current_->mem();
1539 }
1540 
GetColumnFamilyHandle()1541 ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1542   assert(current_ != nullptr);
1543   return &handle_;
1544 }
1545 
GetColumnFamilyID(ColumnFamilyHandle * column_family)1546 uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1547   uint32_t column_family_id = 0;
1548   if (column_family != nullptr) {
1549     auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1550     column_family_id = cfh->GetID();
1551   }
1552   return column_family_id;
1553 }
1554 
GetColumnFamilyUserComparator(ColumnFamilyHandle * column_family)1555 const Comparator* GetColumnFamilyUserComparator(
1556     ColumnFamilyHandle* column_family) {
1557   if (column_family != nullptr) {
1558     return column_family->GetComparator();
1559   }
1560   return nullptr;
1561 }
1562 
1563 }  // namespace ROCKSDB_NAMESPACE
1564