1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/column_family.h"
11
12 #include <algorithm>
13 #include <cinttypes>
14 #include <limits>
15 #include <string>
16 #include <vector>
17
18 #include "db/compaction/compaction_picker.h"
19 #include "db/compaction/compaction_picker_fifo.h"
20 #include "db/compaction/compaction_picker_level.h"
21 #include "db/compaction/compaction_picker_universal.h"
22 #include "db/db_impl/db_impl.h"
23 #include "db/internal_stats.h"
24 #include "db/job_context.h"
25 #include "db/range_del_aggregator.h"
26 #include "db/table_properties_collector.h"
27 #include "db/version_set.h"
28 #include "db/write_controller.h"
29 #include "file/sst_file_manager_impl.h"
30 #include "memtable/hash_skiplist_rep.h"
31 #include "monitoring/thread_status_util.h"
32 #include "options/options_helper.h"
33 #include "port/port.h"
34 #include "table/block_based/block_based_table_factory.h"
35 #include "table/merging_iterator.h"
36 #include "util/autovector.h"
37 #include "util/compression.h"
38
39 namespace ROCKSDB_NAMESPACE {
40
ColumnFamilyHandleImpl(ColumnFamilyData * column_family_data,DBImpl * db,InstrumentedMutex * mutex)41 ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
42 ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
43 : cfd_(column_family_data), db_(db), mutex_(mutex) {
44 if (cfd_ != nullptr) {
45 cfd_->Ref();
46 }
47 }
48
~ColumnFamilyHandleImpl()49 ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
50 if (cfd_ != nullptr) {
51 #ifndef ROCKSDB_LITE
52 for (auto& listener : cfd_->ioptions()->listeners) {
53 listener->OnColumnFamilyHandleDeletionStarted(this);
54 }
55 #endif // ROCKSDB_LITE
56 // Job id == 0 means that this is not our background process, but rather
57 // user thread
58 // Need to hold some shared pointers owned by the initial_cf_options
59 // before final cleaning up finishes.
60 ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
61 JobContext job_context(0);
62 mutex_->Lock();
63 bool dropped = cfd_->IsDropped();
64 if (cfd_->UnrefAndTryDelete()) {
65 if (dropped) {
66 db_->FindObsoleteFiles(&job_context, false, true);
67 }
68 }
69 mutex_->Unlock();
70 if (job_context.HaveSomethingToDelete()) {
71 bool defer_purge =
72 db_->immutable_db_options().avoid_unnecessary_blocking_io;
73 db_->PurgeObsoleteFiles(job_context, defer_purge);
74 if (defer_purge) {
75 mutex_->Lock();
76 db_->SchedulePurge();
77 mutex_->Unlock();
78 }
79 }
80 job_context.Clean();
81 }
82 }
83
GetID() const84 uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
85
GetName() const86 const std::string& ColumnFamilyHandleImpl::GetName() const {
87 return cfd()->GetName();
88 }
89
GetDescriptor(ColumnFamilyDescriptor * desc)90 Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
91 #ifndef ROCKSDB_LITE
92 // accessing mutable cf-options requires db mutex.
93 InstrumentedMutexLock l(mutex_);
94 *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
95 return Status::OK();
96 #else
97 (void)desc;
98 return Status::NotSupported();
99 #endif // !ROCKSDB_LITE
100 }
101
GetComparator() const102 const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
103 return cfd()->user_comparator();
104 }
105
GetIntTblPropCollectorFactory(const ImmutableCFOptions & ioptions,std::vector<std::unique_ptr<IntTblPropCollectorFactory>> * int_tbl_prop_collector_factories)106 void GetIntTblPropCollectorFactory(
107 const ImmutableCFOptions& ioptions,
108 std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
109 int_tbl_prop_collector_factories) {
110 auto& collector_factories = ioptions.table_properties_collector_factories;
111 for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
112 ++i) {
113 assert(collector_factories[i]);
114 int_tbl_prop_collector_factories->emplace_back(
115 new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
116 }
117 }
118
CheckCompressionSupported(const ColumnFamilyOptions & cf_options)119 Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
120 if (!cf_options.compression_per_level.empty()) {
121 for (size_t level = 0; level < cf_options.compression_per_level.size();
122 ++level) {
123 if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
124 return Status::InvalidArgument(
125 "Compression type " +
126 CompressionTypeToString(cf_options.compression_per_level[level]) +
127 " is not linked with the binary.");
128 }
129 }
130 } else {
131 if (!CompressionTypeSupported(cf_options.compression)) {
132 return Status::InvalidArgument(
133 "Compression type " +
134 CompressionTypeToString(cf_options.compression) +
135 " is not linked with the binary.");
136 }
137 }
138 if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
139 if (!ZSTD_TrainDictionarySupported()) {
140 return Status::InvalidArgument(
141 "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
142 "is not linked with the binary.");
143 }
144 if (cf_options.compression_opts.max_dict_bytes == 0) {
145 return Status::InvalidArgument(
146 "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
147 "should be nonzero if we're using zstd's dictionary generator.");
148 }
149 }
150 return Status::OK();
151 }
152
CheckConcurrentWritesSupported(const ColumnFamilyOptions & cf_options)153 Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
154 if (cf_options.inplace_update_support) {
155 return Status::InvalidArgument(
156 "In-place memtable updates (inplace_update_support) is not compatible "
157 "with concurrent writes (allow_concurrent_memtable_write)");
158 }
159 if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
160 return Status::InvalidArgument(
161 "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
162 }
163 return Status::OK();
164 }
165
CheckCFPathsSupported(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)166 Status CheckCFPathsSupported(const DBOptions& db_options,
167 const ColumnFamilyOptions& cf_options) {
168 // More than one cf_paths are supported only in universal
169 // and level compaction styles. This function also checks the case
170 // in which cf_paths is not specified, which results in db_paths
171 // being used.
172 if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
173 (cf_options.compaction_style != kCompactionStyleLevel)) {
174 if (cf_options.cf_paths.size() > 1) {
175 return Status::NotSupported(
176 "More than one CF paths are only supported in "
177 "universal and level compaction styles. ");
178 } else if (cf_options.cf_paths.empty() &&
179 db_options.db_paths.size() > 1) {
180 return Status::NotSupported(
181 "More than one DB paths are only supported in "
182 "universal and level compaction styles. ");
183 }
184 }
185 return Status::OK();
186 }
187
188 namespace {
189 const uint64_t kDefaultTtl = 0xfffffffffffffffe;
190 const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
191 }; // namespace
192
SanitizeOptions(const ImmutableDBOptions & db_options,const ColumnFamilyOptions & src)193 ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
194 const ColumnFamilyOptions& src) {
195 ColumnFamilyOptions result = src;
196 size_t clamp_max = std::conditional<
197 sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
198 std::integral_constant<uint64_t, 64ull << 30>>::type::value;
199 ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
200 // if user sets arena_block_size, we trust user to use this value. Otherwise,
201 // calculate a proper value from writer_buffer_size;
202 if (result.arena_block_size <= 0) {
203 result.arena_block_size = result.write_buffer_size / 8;
204
205 // Align up to 4k
206 const size_t align = 4 * 1024;
207 result.arena_block_size =
208 ((result.arena_block_size + align - 1) / align) * align;
209 }
210 result.min_write_buffer_number_to_merge =
211 std::min(result.min_write_buffer_number_to_merge,
212 result.max_write_buffer_number - 1);
213 if (result.min_write_buffer_number_to_merge < 1) {
214 result.min_write_buffer_number_to_merge = 1;
215 }
216
217 if (result.num_levels < 1) {
218 result.num_levels = 1;
219 }
220 if (result.compaction_style == kCompactionStyleLevel &&
221 result.num_levels < 2) {
222 result.num_levels = 2;
223 }
224
225 if (result.compaction_style == kCompactionStyleUniversal &&
226 db_options.allow_ingest_behind && result.num_levels < 3) {
227 result.num_levels = 3;
228 }
229
230 if (result.max_write_buffer_number < 2) {
231 result.max_write_buffer_number = 2;
232 }
233 // fall back max_write_buffer_number_to_maintain if
234 // max_write_buffer_size_to_maintain is not set
235 if (result.max_write_buffer_size_to_maintain < 0) {
236 result.max_write_buffer_size_to_maintain =
237 result.max_write_buffer_number *
238 static_cast<int64_t>(result.write_buffer_size);
239 } else if (result.max_write_buffer_size_to_maintain == 0 &&
240 result.max_write_buffer_number_to_maintain < 0) {
241 result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
242 }
243 // bloom filter size shouldn't exceed 1/4 of memtable size.
244 if (result.memtable_prefix_bloom_size_ratio > 0.25) {
245 result.memtable_prefix_bloom_size_ratio = 0.25;
246 } else if (result.memtable_prefix_bloom_size_ratio < 0) {
247 result.memtable_prefix_bloom_size_ratio = 0;
248 }
249
250 if (!result.prefix_extractor) {
251 assert(result.memtable_factory);
252 Slice name = result.memtable_factory->Name();
253 if (name.compare("HashSkipListRepFactory") == 0 ||
254 name.compare("HashLinkListRepFactory") == 0) {
255 result.memtable_factory = std::make_shared<SkipListFactory>();
256 }
257 }
258
259 if (result.compaction_style == kCompactionStyleFIFO) {
260 result.num_levels = 1;
261 // since we delete level0 files in FIFO compaction when there are too many
262 // of them, these options don't really mean anything
263 result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
264 result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
265 }
266
267 if (result.max_bytes_for_level_multiplier <= 0) {
268 result.max_bytes_for_level_multiplier = 1;
269 }
270
271 if (result.level0_file_num_compaction_trigger == 0) {
272 ROCKS_LOG_WARN(db_options.info_log.get(),
273 "level0_file_num_compaction_trigger cannot be 0");
274 result.level0_file_num_compaction_trigger = 1;
275 }
276
277 if (result.level0_stop_writes_trigger <
278 result.level0_slowdown_writes_trigger ||
279 result.level0_slowdown_writes_trigger <
280 result.level0_file_num_compaction_trigger) {
281 ROCKS_LOG_WARN(db_options.info_log.get(),
282 "This condition must be satisfied: "
283 "level0_stop_writes_trigger(%d) >= "
284 "level0_slowdown_writes_trigger(%d) >= "
285 "level0_file_num_compaction_trigger(%d)",
286 result.level0_stop_writes_trigger,
287 result.level0_slowdown_writes_trigger,
288 result.level0_file_num_compaction_trigger);
289 if (result.level0_slowdown_writes_trigger <
290 result.level0_file_num_compaction_trigger) {
291 result.level0_slowdown_writes_trigger =
292 result.level0_file_num_compaction_trigger;
293 }
294 if (result.level0_stop_writes_trigger <
295 result.level0_slowdown_writes_trigger) {
296 result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
297 }
298 ROCKS_LOG_WARN(db_options.info_log.get(),
299 "Adjust the value to "
300 "level0_stop_writes_trigger(%d)"
301 "level0_slowdown_writes_trigger(%d)"
302 "level0_file_num_compaction_trigger(%d)",
303 result.level0_stop_writes_trigger,
304 result.level0_slowdown_writes_trigger,
305 result.level0_file_num_compaction_trigger);
306 }
307
308 if (result.soft_pending_compaction_bytes_limit == 0) {
309 result.soft_pending_compaction_bytes_limit =
310 result.hard_pending_compaction_bytes_limit;
311 } else if (result.hard_pending_compaction_bytes_limit > 0 &&
312 result.soft_pending_compaction_bytes_limit >
313 result.hard_pending_compaction_bytes_limit) {
314 result.soft_pending_compaction_bytes_limit =
315 result.hard_pending_compaction_bytes_limit;
316 }
317
318 #ifndef ROCKSDB_LITE
319 // When the DB is stopped, it's possible that there are some .trash files that
320 // were not deleted yet, when we open the DB we will find these .trash files
321 // and schedule them to be deleted (or delete immediately if SstFileManager
322 // was not used)
323 auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
324 for (size_t i = 0; i < result.cf_paths.size(); i++) {
325 DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
326 }
327 #endif
328
329 if (result.cf_paths.empty()) {
330 result.cf_paths = db_options.db_paths;
331 }
332
333 if (result.level_compaction_dynamic_level_bytes) {
334 if (result.compaction_style != kCompactionStyleLevel ||
335 result.cf_paths.size() > 1U) {
336 // 1. level_compaction_dynamic_level_bytes only makes sense for
337 // level-based compaction.
338 // 2. we don't yet know how to make both of this feature and multiple
339 // DB path work.
340 result.level_compaction_dynamic_level_bytes = false;
341 }
342 }
343
344 if (result.max_compaction_bytes == 0) {
345 result.max_compaction_bytes = result.target_file_size_base * 25;
346 }
347
348 bool is_block_based_table =
349 (result.table_factory->Name() == BlockBasedTableFactory().Name());
350
351 const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
352 if (result.ttl == kDefaultTtl) {
353 if (is_block_based_table &&
354 result.compaction_style != kCompactionStyleFIFO) {
355 result.ttl = kAdjustedTtl;
356 } else {
357 result.ttl = 0;
358 }
359 }
360
361 const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
362
363 // Turn on periodic compactions and set them to occur once every 30 days if
364 // compaction filters are used and periodic_compaction_seconds is set to the
365 // default value.
366 if (result.compaction_style != kCompactionStyleFIFO) {
367 if ((result.compaction_filter != nullptr ||
368 result.compaction_filter_factory != nullptr) &&
369 result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
370 is_block_based_table) {
371 result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
372 }
373 } else {
374 // result.compaction_style == kCompactionStyleFIFO
375 if (result.ttl == 0) {
376 if (is_block_based_table) {
377 if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
378 result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
379 }
380 result.ttl = result.periodic_compaction_seconds;
381 }
382 } else if (result.periodic_compaction_seconds != 0) {
383 result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
384 }
385 }
386
387 // TTL compactions would work similar to Periodic Compactions in Universal in
388 // most of the cases. So, if ttl is set, execute the periodic compaction
389 // codepath.
390 if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
391 if (result.periodic_compaction_seconds != 0) {
392 result.periodic_compaction_seconds =
393 std::min(result.ttl, result.periodic_compaction_seconds);
394 } else {
395 result.periodic_compaction_seconds = result.ttl;
396 }
397 }
398
399 if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
400 result.periodic_compaction_seconds = 0;
401 }
402
403 return result;
404 }
405
406 int SuperVersion::dummy = 0;
407 void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
408 void* const SuperVersion::kSVObsolete = nullptr;
409
~SuperVersion()410 SuperVersion::~SuperVersion() {
411 for (auto td : to_delete) {
412 delete td;
413 }
414 }
415
Ref()416 SuperVersion* SuperVersion::Ref() {
417 refs.fetch_add(1, std::memory_order_relaxed);
418 return this;
419 }
420
Unref()421 bool SuperVersion::Unref() {
422 // fetch_sub returns the previous value of ref
423 uint32_t previous_refs = refs.fetch_sub(1);
424 assert(previous_refs > 0);
425 return previous_refs == 1;
426 }
427
Cleanup()428 void SuperVersion::Cleanup() {
429 assert(refs.load(std::memory_order_relaxed) == 0);
430 imm->Unref(&to_delete);
431 MemTable* m = mem->Unref();
432 if (m != nullptr) {
433 auto* memory_usage = current->cfd()->imm()->current_memory_usage();
434 assert(*memory_usage >= m->ApproximateMemoryUsage());
435 *memory_usage -= m->ApproximateMemoryUsage();
436 to_delete.push_back(m);
437 }
438 current->Unref();
439 if (cfd->Unref()) {
440 delete cfd;
441 }
442 }
443
Init(ColumnFamilyData * new_cfd,MemTable * new_mem,MemTableListVersion * new_imm,Version * new_current)444 void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
445 MemTableListVersion* new_imm, Version* new_current) {
446 cfd = new_cfd;
447 mem = new_mem;
448 imm = new_imm;
449 current = new_current;
450 cfd->Ref();
451 mem->Ref();
452 imm->Ref();
453 current->Ref();
454 refs.store(1, std::memory_order_relaxed);
455 }
456
457 namespace {
SuperVersionUnrefHandle(void * ptr)458 void SuperVersionUnrefHandle(void* ptr) {
459 // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
460 // destroyed. When former happens, the thread shouldn't see kSVInUse.
461 // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
462 // well.
463 SuperVersion* sv = static_cast<SuperVersion*>(ptr);
464 bool was_last_ref __attribute__((__unused__));
465 was_last_ref = sv->Unref();
466 // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
467 // This is important because we can't do SuperVersion cleanup here.
468 // That would require locking DB mutex, which would deadlock because
469 // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
470 assert(!was_last_ref);
471 }
472 } // anonymous namespace
473
GetDbPaths() const474 std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
475 std::vector<std::string> paths;
476 paths.reserve(ioptions_.cf_paths.size());
477 for (const DbPath& db_path : ioptions_.cf_paths) {
478 paths.emplace_back(db_path.path);
479 }
480 return paths;
481 }
482
483 const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32;
484
ColumnFamilyData(uint32_t id,const std::string & name,Version * _dummy_versions,Cache * _table_cache,WriteBufferManager * write_buffer_manager,const ColumnFamilyOptions & cf_options,const ImmutableDBOptions & db_options,const FileOptions & file_options,ColumnFamilySet * column_family_set,BlockCacheTracer * const block_cache_tracer)485 ColumnFamilyData::ColumnFamilyData(
486 uint32_t id, const std::string& name, Version* _dummy_versions,
487 Cache* _table_cache, WriteBufferManager* write_buffer_manager,
488 const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
489 const FileOptions& file_options, ColumnFamilySet* column_family_set,
490 BlockCacheTracer* const block_cache_tracer)
491 : id_(id),
492 name_(name),
493 dummy_versions_(_dummy_versions),
494 current_(nullptr),
495 refs_(0),
496 initialized_(false),
497 dropped_(false),
498 internal_comparator_(cf_options.comparator),
499 initial_cf_options_(SanitizeOptions(db_options, cf_options)),
500 ioptions_(db_options, initial_cf_options_),
501 mutable_cf_options_(initial_cf_options_),
502 is_delete_range_supported_(
503 cf_options.table_factory->IsDeleteRangeSupported()),
504 write_buffer_manager_(write_buffer_manager),
505 mem_(nullptr),
506 imm_(ioptions_.min_write_buffer_number_to_merge,
507 ioptions_.max_write_buffer_number_to_maintain,
508 ioptions_.max_write_buffer_size_to_maintain),
509 super_version_(nullptr),
510 super_version_number_(0),
511 local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
512 next_(nullptr),
513 prev_(nullptr),
514 log_number_(0),
515 flush_reason_(FlushReason::kOthers),
516 column_family_set_(column_family_set),
517 queued_for_flush_(false),
518 queued_for_compaction_(false),
519 prev_compaction_needed_bytes_(0),
520 allow_2pc_(db_options.allow_2pc),
521 last_memtable_id_(0),
522 db_paths_registered_(false) {
523 if (id_ != kDummyColumnFamilyDataId) {
524 // TODO(cc): RegisterDbPaths can be expensive, considering moving it
525 // outside of this constructor which might be called with db mutex held.
526 // TODO(cc): considering using ioptions_.fs, currently some tests rely on
527 // EnvWrapper, that's the main reason why we use env here.
528 Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
529 if (s.ok()) {
530 db_paths_registered_ = true;
531 } else {
532 ROCKS_LOG_ERROR(
533 ioptions_.info_log,
534 "Failed to register data paths of column family (id: %d, name: %s)",
535 id_, name_.c_str());
536 }
537 }
538 Ref();
539
540 // Convert user defined table properties collector factories to internal ones.
541 GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
542
543 // if _dummy_versions is nullptr, then this is a dummy column family.
544 if (_dummy_versions != nullptr) {
545 internal_stats_.reset(
546 new InternalStats(ioptions_.num_levels, db_options.env, this));
547 table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
548 block_cache_tracer));
549 if (ioptions_.compaction_style == kCompactionStyleLevel) {
550 compaction_picker_.reset(
551 new LevelCompactionPicker(ioptions_, &internal_comparator_));
552 #ifndef ROCKSDB_LITE
553 } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
554 compaction_picker_.reset(
555 new UniversalCompactionPicker(ioptions_, &internal_comparator_));
556 } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
557 compaction_picker_.reset(
558 new FIFOCompactionPicker(ioptions_, &internal_comparator_));
559 } else if (ioptions_.compaction_style == kCompactionStyleNone) {
560 compaction_picker_.reset(new NullCompactionPicker(
561 ioptions_, &internal_comparator_));
562 ROCKS_LOG_WARN(ioptions_.info_log,
563 "Column family %s does not use any background compaction. "
564 "Compactions can only be done via CompactFiles\n",
565 GetName().c_str());
566 #endif // !ROCKSDB_LITE
567 } else {
568 ROCKS_LOG_ERROR(ioptions_.info_log,
569 "Unable to recognize the specified compaction style %d. "
570 "Column family %s will use kCompactionStyleLevel.\n",
571 ioptions_.compaction_style, GetName().c_str());
572 compaction_picker_.reset(
573 new LevelCompactionPicker(ioptions_, &internal_comparator_));
574 }
575
576 if (column_family_set_->NumberOfColumnFamilies() < 10) {
577 ROCKS_LOG_INFO(ioptions_.info_log,
578 "--------------- Options for column family [%s]:\n",
579 name.c_str());
580 initial_cf_options_.Dump(ioptions_.info_log);
581 } else {
582 ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
583 }
584 }
585
586 RecalculateWriteStallConditions(mutable_cf_options_);
587 }
588
589 // DB mutex held
~ColumnFamilyData()590 ColumnFamilyData::~ColumnFamilyData() {
591 assert(refs_.load(std::memory_order_relaxed) == 0);
592 // remove from linked list
593 auto prev = prev_;
594 auto next = next_;
595 prev->next_ = next;
596 next->prev_ = prev;
597
598 if (!dropped_ && column_family_set_ != nullptr) {
599 // If it's dropped, it's already removed from column family set
600 // If column_family_set_ == nullptr, this is dummy CFD and not in
601 // ColumnFamilySet
602 column_family_set_->RemoveColumnFamily(this);
603 }
604
605 if (current_ != nullptr) {
606 current_->Unref();
607 }
608
609 // It would be wrong if this ColumnFamilyData is in flush_queue_ or
610 // compaction_queue_ and we destroyed it
611 assert(!queued_for_flush_);
612 assert(!queued_for_compaction_);
613 assert(super_version_ == nullptr);
614
615 if (dummy_versions_ != nullptr) {
616 // List must be empty
617 assert(dummy_versions_->TEST_Next() == dummy_versions_);
618 bool deleted __attribute__((__unused__));
619 deleted = dummy_versions_->Unref();
620 assert(deleted);
621 }
622
623 if (mem_ != nullptr) {
624 delete mem_->Unref();
625 }
626 autovector<MemTable*> to_delete;
627 imm_.current()->Unref(&to_delete);
628 for (MemTable* m : to_delete) {
629 delete m;
630 }
631
632 if (db_paths_registered_) {
633 // TODO(cc): considering using ioptions_.fs, currently some tests rely on
634 // EnvWrapper, that's the main reason why we use env here.
635 Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
636 if (!s.ok()) {
637 ROCKS_LOG_ERROR(
638 ioptions_.info_log,
639 "Failed to unregister data paths of column family (id: %d, name: %s)",
640 id_, name_.c_str());
641 }
642 }
643 }
644
UnrefAndTryDelete()645 bool ColumnFamilyData::UnrefAndTryDelete() {
646 int old_refs = refs_.fetch_sub(1);
647 assert(old_refs > 0);
648
649 if (old_refs == 1) {
650 assert(super_version_ == nullptr);
651 delete this;
652 return true;
653 }
654
655 if (old_refs == 2 && super_version_ != nullptr) {
656 // Only the super_version_ holds me
657 SuperVersion* sv = super_version_;
658 super_version_ = nullptr;
659 // Release SuperVersion reference kept in ThreadLocalPtr.
660 // This must be done outside of mutex_ since unref handler can lock mutex.
661 sv->db_mutex->Unlock();
662 local_sv_.reset();
663 sv->db_mutex->Lock();
664
665 if (sv->Unref()) {
666 // May delete this ColumnFamilyData after calling Cleanup()
667 sv->Cleanup();
668 delete sv;
669 return true;
670 }
671 }
672 return false;
673 }
674
SetDropped()675 void ColumnFamilyData::SetDropped() {
676 // can't drop default CF
677 assert(id_ != 0);
678 dropped_ = true;
679 write_controller_token_.reset();
680
681 // remove from column_family_set
682 column_family_set_->RemoveColumnFamily(this);
683 }
684
GetLatestCFOptions() const685 ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
686 return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
687 }
688
OldestLogToKeep()689 uint64_t ColumnFamilyData::OldestLogToKeep() {
690 auto current_log = GetLogNumber();
691
692 if (allow_2pc_) {
693 autovector<MemTable*> empty_list;
694 auto imm_prep_log =
695 imm()->PrecomputeMinLogContainingPrepSection(empty_list);
696 auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
697
698 if (imm_prep_log > 0 && imm_prep_log < current_log) {
699 current_log = imm_prep_log;
700 }
701
702 if (mem_prep_log > 0 && mem_prep_log < current_log) {
703 current_log = mem_prep_log;
704 }
705 }
706
707 return current_log;
708 }
709
710 const double kIncSlowdownRatio = 0.8;
711 const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
712 const double kNearStopSlowdownRatio = 0.6;
713 const double kDelayRecoverSlowdownRatio = 1.4;
714
715 namespace {
716 // If penalize_stop is true, we further reduce slowdown rate.
SetupDelay(WriteController * write_controller,uint64_t compaction_needed_bytes,uint64_t prev_compaction_need_bytes,bool penalize_stop,bool auto_comapctions_disabled)717 std::unique_ptr<WriteControllerToken> SetupDelay(
718 WriteController* write_controller, uint64_t compaction_needed_bytes,
719 uint64_t prev_compaction_need_bytes, bool penalize_stop,
720 bool auto_comapctions_disabled) {
721 const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
722
723 uint64_t max_write_rate = write_controller->max_delayed_write_rate();
724 uint64_t write_rate = write_controller->delayed_write_rate();
725
726 if (auto_comapctions_disabled) {
727 // When auto compaction is disabled, always use the value user gave.
728 write_rate = max_write_rate;
729 } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
730 // If user gives rate less than kMinWriteRate, don't adjust it.
731 //
732 // If already delayed, need to adjust based on previous compaction debt.
733 // When there are two or more column families require delay, we always
734 // increase or reduce write rate based on information for one single
735 // column family. It is likely to be OK but we can improve if there is a
736 // problem.
737 // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
738 // is only available in level-based compaction
739 //
740 // If the compaction debt stays the same as previously, we also further slow
741 // down. It usually means a mem table is full. It's mainly for the case
742 // where both of flush and compaction are much slower than the speed we
743 // insert to mem tables, so we need to actively slow down before we get
744 // feedback signal from compaction and flushes to avoid the full stop
745 // because of hitting the max write buffer number.
746 //
747 // If DB just falled into the stop condition, we need to further reduce
748 // the write rate to avoid the stop condition.
749 if (penalize_stop) {
750 // Penalize the near stop or stop condition by more aggressive slowdown.
751 // This is to provide the long term slowdown increase signal.
752 // The penalty is more than the reward of recovering to the normal
753 // condition.
754 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
755 kNearStopSlowdownRatio);
756 if (write_rate < kMinWriteRate) {
757 write_rate = kMinWriteRate;
758 }
759 } else if (prev_compaction_need_bytes > 0 &&
760 prev_compaction_need_bytes <= compaction_needed_bytes) {
761 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
762 kIncSlowdownRatio);
763 if (write_rate < kMinWriteRate) {
764 write_rate = kMinWriteRate;
765 }
766 } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
767 // We are speeding up by ratio of kSlowdownRatio when we have paid
768 // compaction debt. But we'll never speed up to faster than the write rate
769 // given by users.
770 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
771 kDecSlowdownRatio);
772 if (write_rate > max_write_rate) {
773 write_rate = max_write_rate;
774 }
775 }
776 }
777 return write_controller->GetDelayToken(write_rate);
778 }
779
GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,int level0_slowdown_writes_trigger)780 int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
781 int level0_slowdown_writes_trigger) {
782 // SanitizeOptions() ensures it.
783 assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
784
785 if (level0_file_num_compaction_trigger < 0) {
786 return std::numeric_limits<int>::max();
787 }
788
789 const int64_t twice_level0_trigger =
790 static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
791
792 const int64_t one_fourth_trigger_slowdown =
793 static_cast<int64_t>(level0_file_num_compaction_trigger) +
794 ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
795 4);
796
797 assert(twice_level0_trigger >= 0);
798 assert(one_fourth_trigger_slowdown >= 0);
799
800 // 1/4 of the way between L0 compaction trigger threshold and slowdown
801 // condition.
802 // Or twice as compaction trigger, if it is smaller.
803 int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
804 if (res >= port::kMaxInt32) {
805 return port::kMaxInt32;
806 } else {
807 // res fits in int
808 return static_cast<int>(res);
809 }
810 }
811 } // namespace
812
813 std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
GetWriteStallConditionAndCause(int num_unflushed_memtables,int num_l0_files,uint64_t num_compaction_needed_bytes,const MutableCFOptions & mutable_cf_options)814 ColumnFamilyData::GetWriteStallConditionAndCause(
815 int num_unflushed_memtables, int num_l0_files,
816 uint64_t num_compaction_needed_bytes,
817 const MutableCFOptions& mutable_cf_options) {
818 if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
819 return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
820 } else if (!mutable_cf_options.disable_auto_compactions &&
821 num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
822 return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
823 } else if (!mutable_cf_options.disable_auto_compactions &&
824 mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
825 num_compaction_needed_bytes >=
826 mutable_cf_options.hard_pending_compaction_bytes_limit) {
827 return {WriteStallCondition::kStopped,
828 WriteStallCause::kPendingCompactionBytes};
829 } else if (mutable_cf_options.max_write_buffer_number > 3 &&
830 num_unflushed_memtables >=
831 mutable_cf_options.max_write_buffer_number - 1) {
832 return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
833 } else if (!mutable_cf_options.disable_auto_compactions &&
834 mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
835 num_l0_files >=
836 mutable_cf_options.level0_slowdown_writes_trigger) {
837 return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
838 } else if (!mutable_cf_options.disable_auto_compactions &&
839 mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
840 num_compaction_needed_bytes >=
841 mutable_cf_options.soft_pending_compaction_bytes_limit) {
842 return {WriteStallCondition::kDelayed,
843 WriteStallCause::kPendingCompactionBytes};
844 }
845 return {WriteStallCondition::kNormal, WriteStallCause::kNone};
846 }
847
RecalculateWriteStallConditions(const MutableCFOptions & mutable_cf_options)848 WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
849 const MutableCFOptions& mutable_cf_options) {
850 auto write_stall_condition = WriteStallCondition::kNormal;
851 if (current_ != nullptr) {
852 auto* vstorage = current_->storage_info();
853 auto write_controller = column_family_set_->write_controller_;
854 uint64_t compaction_needed_bytes =
855 vstorage->estimated_compaction_needed_bytes();
856
857 auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
858 imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
859 vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
860 write_stall_condition = write_stall_condition_and_cause.first;
861 auto write_stall_cause = write_stall_condition_and_cause.second;
862
863 bool was_stopped = write_controller->IsStopped();
864 bool needed_delay = write_controller->NeedsDelay();
865
866 if (write_stall_condition == WriteStallCondition::kStopped &&
867 write_stall_cause == WriteStallCause::kMemtableLimit) {
868 write_controller_token_ = write_controller->GetStopToken();
869 internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
870 ROCKS_LOG_WARN(
871 ioptions_.info_log,
872 "[%s] Stopping writes because we have %d immutable memtables "
873 "(waiting for flush), max_write_buffer_number is set to %d",
874 name_.c_str(), imm()->NumNotFlushed(),
875 mutable_cf_options.max_write_buffer_number);
876 } else if (write_stall_condition == WriteStallCondition::kStopped &&
877 write_stall_cause == WriteStallCause::kL0FileCountLimit) {
878 write_controller_token_ = write_controller->GetStopToken();
879 internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
880 if (compaction_picker_->IsLevel0CompactionInProgress()) {
881 internal_stats_->AddCFStats(
882 InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
883 }
884 ROCKS_LOG_WARN(ioptions_.info_log,
885 "[%s] Stopping writes because we have %d level-0 files",
886 name_.c_str(), vstorage->l0_delay_trigger_count());
887 } else if (write_stall_condition == WriteStallCondition::kStopped &&
888 write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
889 write_controller_token_ = write_controller->GetStopToken();
890 internal_stats_->AddCFStats(
891 InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
892 ROCKS_LOG_WARN(
893 ioptions_.info_log,
894 "[%s] Stopping writes because of estimated pending compaction "
895 "bytes %" PRIu64,
896 name_.c_str(), compaction_needed_bytes);
897 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
898 write_stall_cause == WriteStallCause::kMemtableLimit) {
899 write_controller_token_ =
900 SetupDelay(write_controller, compaction_needed_bytes,
901 prev_compaction_needed_bytes_, was_stopped,
902 mutable_cf_options.disable_auto_compactions);
903 internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
904 ROCKS_LOG_WARN(
905 ioptions_.info_log,
906 "[%s] Stalling writes because we have %d immutable memtables "
907 "(waiting for flush), max_write_buffer_number is set to %d "
908 "rate %" PRIu64,
909 name_.c_str(), imm()->NumNotFlushed(),
910 mutable_cf_options.max_write_buffer_number,
911 write_controller->delayed_write_rate());
912 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
913 write_stall_cause == WriteStallCause::kL0FileCountLimit) {
914 // L0 is the last two files from stopping.
915 bool near_stop = vstorage->l0_delay_trigger_count() >=
916 mutable_cf_options.level0_stop_writes_trigger - 2;
917 write_controller_token_ =
918 SetupDelay(write_controller, compaction_needed_bytes,
919 prev_compaction_needed_bytes_, was_stopped || near_stop,
920 mutable_cf_options.disable_auto_compactions);
921 internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
922 1);
923 if (compaction_picker_->IsLevel0CompactionInProgress()) {
924 internal_stats_->AddCFStats(
925 InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
926 }
927 ROCKS_LOG_WARN(ioptions_.info_log,
928 "[%s] Stalling writes because we have %d level-0 files "
929 "rate %" PRIu64,
930 name_.c_str(), vstorage->l0_delay_trigger_count(),
931 write_controller->delayed_write_rate());
932 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
933 write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
934 // If the distance to hard limit is less than 1/4 of the gap between soft
935 // and
936 // hard bytes limit, we think it is near stop and speed up the slowdown.
937 bool near_stop =
938 mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
939 (compaction_needed_bytes -
940 mutable_cf_options.soft_pending_compaction_bytes_limit) >
941 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
942 mutable_cf_options.soft_pending_compaction_bytes_limit) /
943 4;
944
945 write_controller_token_ =
946 SetupDelay(write_controller, compaction_needed_bytes,
947 prev_compaction_needed_bytes_, was_stopped || near_stop,
948 mutable_cf_options.disable_auto_compactions);
949 internal_stats_->AddCFStats(
950 InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
951 ROCKS_LOG_WARN(
952 ioptions_.info_log,
953 "[%s] Stalling writes because of estimated pending compaction "
954 "bytes %" PRIu64 " rate %" PRIu64,
955 name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
956 write_controller->delayed_write_rate());
957 } else {
958 assert(write_stall_condition == WriteStallCondition::kNormal);
959 if (vstorage->l0_delay_trigger_count() >=
960 GetL0ThresholdSpeedupCompaction(
961 mutable_cf_options.level0_file_num_compaction_trigger,
962 mutable_cf_options.level0_slowdown_writes_trigger)) {
963 write_controller_token_ =
964 write_controller->GetCompactionPressureToken();
965 ROCKS_LOG_INFO(
966 ioptions_.info_log,
967 "[%s] Increasing compaction threads because we have %d level-0 "
968 "files ",
969 name_.c_str(), vstorage->l0_delay_trigger_count());
970 } else if (vstorage->estimated_compaction_needed_bytes() >=
971 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
972 // Increase compaction threads if bytes needed for compaction exceeds
973 // 1/4 of threshold for slowing down.
974 // If soft pending compaction byte limit is not set, always speed up
975 // compaction.
976 write_controller_token_ =
977 write_controller->GetCompactionPressureToken();
978 if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
979 ROCKS_LOG_INFO(
980 ioptions_.info_log,
981 "[%s] Increasing compaction threads because of estimated pending "
982 "compaction "
983 "bytes %" PRIu64,
984 name_.c_str(), vstorage->estimated_compaction_needed_bytes());
985 }
986 } else {
987 write_controller_token_.reset();
988 }
989 // If the DB recovers from delay conditions, we reward with reducing
990 // double the slowdown ratio. This is to balance the long term slowdown
991 // increase signal.
992 if (needed_delay) {
993 uint64_t write_rate = write_controller->delayed_write_rate();
994 write_controller->set_delayed_write_rate(static_cast<uint64_t>(
995 static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
996 // Set the low pri limit to be 1/4 the delayed write rate.
997 // Note we don't reset this value even after delay condition is relased.
998 // Low-pri rate will continue to apply if there is a compaction
999 // pressure.
1000 write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
1001 4);
1002 }
1003 }
1004 prev_compaction_needed_bytes_ = compaction_needed_bytes;
1005 }
1006 return write_stall_condition;
1007 }
1008
soptions() const1009 const FileOptions* ColumnFamilyData::soptions() const {
1010 return &(column_family_set_->file_options_);
1011 }
1012
SetCurrent(Version * current_version)1013 void ColumnFamilyData::SetCurrent(Version* current_version) {
1014 current_ = current_version;
1015 }
1016
GetNumLiveVersions() const1017 uint64_t ColumnFamilyData::GetNumLiveVersions() const {
1018 return VersionSet::GetNumLiveVersions(dummy_versions_);
1019 }
1020
GetTotalSstFilesSize() const1021 uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
1022 return VersionSet::GetTotalSstFilesSize(dummy_versions_);
1023 }
1024
GetLiveSstFilesSize() const1025 uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
1026 return current_->GetSstFilesSize();
1027 }
1028
ConstructNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)1029 MemTable* ColumnFamilyData::ConstructNewMemtable(
1030 const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1031 return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
1032 write_buffer_manager_, earliest_seq, id_);
1033 }
1034
CreateNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)1035 void ColumnFamilyData::CreateNewMemtable(
1036 const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1037 if (mem_ != nullptr) {
1038 delete mem_->Unref();
1039 }
1040 SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1041 mem_->Ref();
1042 }
1043
NeedsCompaction() const1044 bool ColumnFamilyData::NeedsCompaction() const {
1045 return compaction_picker_->NeedsCompaction(current_->storage_info());
1046 }
1047
PickCompaction(const MutableCFOptions & mutable_options,LogBuffer * log_buffer)1048 Compaction* ColumnFamilyData::PickCompaction(
1049 const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
1050 SequenceNumber earliest_mem_seqno =
1051 std::min(mem_->GetEarliestSequenceNumber(),
1052 imm_.current()->GetEarliestSequenceNumber(false));
1053 auto* result = compaction_picker_->PickCompaction(
1054 GetName(), mutable_options, current_->storage_info(), log_buffer,
1055 earliest_mem_seqno);
1056 if (result != nullptr) {
1057 result->SetInputVersion(current_);
1058 }
1059 return result;
1060 }
1061
RangeOverlapWithCompaction(const Slice & smallest_user_key,const Slice & largest_user_key,int level) const1062 bool ColumnFamilyData::RangeOverlapWithCompaction(
1063 const Slice& smallest_user_key, const Slice& largest_user_key,
1064 int level) const {
1065 return compaction_picker_->RangeOverlapWithCompaction(
1066 smallest_user_key, largest_user_key, level);
1067 }
1068
RangesOverlapWithMemtables(const autovector<Range> & ranges,SuperVersion * super_version,bool * overlap)1069 Status ColumnFamilyData::RangesOverlapWithMemtables(
1070 const autovector<Range>& ranges, SuperVersion* super_version,
1071 bool* overlap) {
1072 assert(overlap != nullptr);
1073 *overlap = false;
1074 // Create an InternalIterator over all unflushed memtables
1075 Arena arena;
1076 ReadOptions read_opts;
1077 read_opts.total_order_seek = true;
1078 MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
1079 merge_iter_builder.AddIterator(
1080 super_version->mem->NewIterator(read_opts, &arena));
1081 super_version->imm->AddIterators(read_opts, &merge_iter_builder);
1082 ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
1083
1084 auto read_seq = super_version->current->version_set()->LastSequence();
1085 ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1086 auto* active_range_del_iter =
1087 super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
1088 range_del_agg.AddTombstones(
1089 std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
1090 super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
1091 &range_del_agg);
1092
1093 Status status;
1094 for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
1095 auto* vstorage = super_version->current->storage_info();
1096 auto* ucmp = vstorage->InternalComparator()->user_comparator();
1097 InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
1098 kValueTypeForSeek);
1099 memtable_iter->Seek(range_start.Encode());
1100 status = memtable_iter->status();
1101 ParsedInternalKey seek_result;
1102 if (status.ok()) {
1103 if (memtable_iter->Valid() &&
1104 !ParseInternalKey(memtable_iter->key(), &seek_result)) {
1105 status = Status::Corruption("DB have corrupted keys");
1106 }
1107 }
1108 if (status.ok()) {
1109 if (memtable_iter->Valid() &&
1110 ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
1111 *overlap = true;
1112 } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
1113 ranges[i].limit)) {
1114 *overlap = true;
1115 }
1116 }
1117 }
1118 return status;
1119 }
1120
1121 const int ColumnFamilyData::kCompactAllLevels = -1;
1122 const int ColumnFamilyData::kCompactToBaseLevel = -2;
1123
CompactRange(const MutableCFOptions & mutable_cf_options,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const InternalKey * begin,const InternalKey * end,InternalKey ** compaction_end,bool * conflict,uint64_t max_file_num_to_ignore)1124 Compaction* ColumnFamilyData::CompactRange(
1125 const MutableCFOptions& mutable_cf_options, int input_level,
1126 int output_level, const CompactRangeOptions& compact_range_options,
1127 const InternalKey* begin, const InternalKey* end,
1128 InternalKey** compaction_end, bool* conflict,
1129 uint64_t max_file_num_to_ignore) {
1130 auto* result = compaction_picker_->CompactRange(
1131 GetName(), mutable_cf_options, current_->storage_info(), input_level,
1132 output_level, compact_range_options, begin, end, compaction_end, conflict,
1133 max_file_num_to_ignore);
1134 if (result != nullptr) {
1135 result->SetInputVersion(current_);
1136 }
1137 return result;
1138 }
1139
GetReferencedSuperVersion(DBImpl * db)1140 SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
1141 SuperVersion* sv = GetThreadLocalSuperVersion(db);
1142 sv->Ref();
1143 if (!ReturnThreadLocalSuperVersion(sv)) {
1144 // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
1145 // when the thread-local pointer was populated. So, the Ref() earlier in
1146 // this function still prevents the returned SuperVersion* from being
1147 // deleted out from under the caller.
1148 sv->Unref();
1149 }
1150 return sv;
1151 }
1152
GetThreadLocalSuperVersion(DBImpl * db)1153 SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1154 // The SuperVersion is cached in thread local storage to avoid acquiring
1155 // mutex when SuperVersion does not change since the last use. When a new
1156 // SuperVersion is installed, the compaction or flush thread cleans up
1157 // cached SuperVersion in all existing thread local storage. To avoid
1158 // acquiring mutex for this operation, we use atomic Swap() on the thread
1159 // local pointer to guarantee exclusive access. If the thread local pointer
1160 // is being used while a new SuperVersion is installed, the cached
1161 // SuperVersion can become stale. In that case, the background thread would
1162 // have swapped in kSVObsolete. We re-check the value at when returning
1163 // SuperVersion back to thread local, with an atomic compare and swap.
1164 // The superversion will need to be released if detected to be stale.
1165 void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
1166 // Invariant:
1167 // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
1168 // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
1169 // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
1170 // (if no Scrape happens).
1171 assert(ptr != SuperVersion::kSVInUse);
1172 SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1173 if (sv == SuperVersion::kSVObsolete ||
1174 sv->version_number != super_version_number_.load()) {
1175 RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1176 SuperVersion* sv_to_delete = nullptr;
1177
1178 if (sv && sv->Unref()) {
1179 RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1180 db->mutex()->Lock();
1181 // NOTE: underlying resources held by superversion (sst files) might
1182 // not be released until the next background job.
1183 sv->Cleanup();
1184 if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
1185 db->AddSuperVersionsToFreeQueue(sv);
1186 db->SchedulePurge();
1187 } else {
1188 sv_to_delete = sv;
1189 }
1190 } else {
1191 db->mutex()->Lock();
1192 }
1193 sv = super_version_->Ref();
1194 db->mutex()->Unlock();
1195
1196 delete sv_to_delete;
1197 }
1198 assert(sv != nullptr);
1199 return sv;
1200 }
1201
ReturnThreadLocalSuperVersion(SuperVersion * sv)1202 bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
1203 assert(sv != nullptr);
1204 // Put the SuperVersion back
1205 void* expected = SuperVersion::kSVInUse;
1206 if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
1207 // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
1208 // storage has not been altered and no Scrape has happened. The
1209 // SuperVersion is still current.
1210 return true;
1211 } else {
1212 // ThreadLocal scrape happened in the process of this GetImpl call (after
1213 // thread local Swap() at the beginning and before CompareAndSwap()).
1214 // This means the SuperVersion it holds is obsolete.
1215 assert(expected == SuperVersion::kSVObsolete);
1216 }
1217 return false;
1218 }
1219
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex)1220 void ColumnFamilyData::InstallSuperVersion(
1221 SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1222 db_mutex->AssertHeld();
1223 return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1224 }
1225
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex,const MutableCFOptions & mutable_cf_options)1226 void ColumnFamilyData::InstallSuperVersion(
1227 SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1228 const MutableCFOptions& mutable_cf_options) {
1229 SuperVersion* new_superversion = sv_context->new_superversion.release();
1230 new_superversion->db_mutex = db_mutex;
1231 new_superversion->mutable_cf_options = mutable_cf_options;
1232 new_superversion->Init(this, mem_, imm_.current(), current_);
1233 SuperVersion* old_superversion = super_version_;
1234 super_version_ = new_superversion;
1235 ++super_version_number_;
1236 super_version_->version_number = super_version_number_;
1237 super_version_->write_stall_condition =
1238 RecalculateWriteStallConditions(mutable_cf_options);
1239
1240 if (old_superversion != nullptr) {
1241 // Reset SuperVersions cached in thread local storage.
1242 // This should be done before old_superversion->Unref(). That's to ensure
1243 // that local_sv_ never holds the last reference to SuperVersion, since
1244 // it has no means to safely do SuperVersion cleanup.
1245 ResetThreadLocalSuperVersions();
1246
1247 if (old_superversion->mutable_cf_options.write_buffer_size !=
1248 mutable_cf_options.write_buffer_size) {
1249 mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
1250 }
1251 if (old_superversion->write_stall_condition !=
1252 new_superversion->write_stall_condition) {
1253 sv_context->PushWriteStallNotification(
1254 old_superversion->write_stall_condition,
1255 new_superversion->write_stall_condition, GetName(), ioptions());
1256 }
1257 if (old_superversion->Unref()) {
1258 old_superversion->Cleanup();
1259 sv_context->superversions_to_free.push_back(old_superversion);
1260 }
1261 }
1262 }
1263
ResetThreadLocalSuperVersions()1264 void ColumnFamilyData::ResetThreadLocalSuperVersions() {
1265 autovector<void*> sv_ptrs;
1266 local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1267 for (auto ptr : sv_ptrs) {
1268 assert(ptr);
1269 if (ptr == SuperVersion::kSVInUse) {
1270 continue;
1271 }
1272 auto sv = static_cast<SuperVersion*>(ptr);
1273 bool was_last_ref __attribute__((__unused__));
1274 was_last_ref = sv->Unref();
1275 // sv couldn't have been the last reference because
1276 // ResetThreadLocalSuperVersions() is called before
1277 // unref'ing super_version_.
1278 assert(!was_last_ref);
1279 }
1280 }
1281
ValidateOptions(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)1282 Status ColumnFamilyData::ValidateOptions(
1283 const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
1284 Status s;
1285 s = CheckCompressionSupported(cf_options);
1286 if (s.ok() && db_options.allow_concurrent_memtable_write) {
1287 s = CheckConcurrentWritesSupported(cf_options);
1288 }
1289 if (s.ok() && db_options.unordered_write &&
1290 cf_options.max_successive_merges != 0) {
1291 s = Status::InvalidArgument(
1292 "max_successive_merges > 0 is incompatible with unordered_write");
1293 }
1294 if (s.ok()) {
1295 s = CheckCFPathsSupported(db_options, cf_options);
1296 }
1297 if (!s.ok()) {
1298 return s;
1299 }
1300
1301 if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1302 if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
1303 return Status::NotSupported(
1304 "TTL is only supported in Block-Based Table format. ");
1305 }
1306 }
1307
1308 if (cf_options.periodic_compaction_seconds > 0 &&
1309 cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1310 if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
1311 return Status::NotSupported(
1312 "Periodic Compaction is only supported in "
1313 "Block-Based Table format. ");
1314 }
1315 }
1316 return s;
1317 }
1318
1319 #ifndef ROCKSDB_LITE
SetOptions(const DBOptions & db_options,const std::unordered_map<std::string,std::string> & options_map)1320 Status ColumnFamilyData::SetOptions(
1321 const DBOptions& db_options,
1322 const std::unordered_map<std::string, std::string>& options_map) {
1323 MutableCFOptions new_mutable_cf_options;
1324 Status s =
1325 GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
1326 ioptions_.info_log, &new_mutable_cf_options);
1327 if (s.ok()) {
1328 ColumnFamilyOptions cf_options =
1329 BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
1330 s = ValidateOptions(db_options, cf_options);
1331 }
1332 if (s.ok()) {
1333 mutable_cf_options_ = new_mutable_cf_options;
1334 mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1335 }
1336 return s;
1337 }
1338 #endif // ROCKSDB_LITE
1339
1340 // REQUIRES: DB mutex held
CalculateSSTWriteHint(int level)1341 Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
1342 if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
1343 return Env::WLTH_NOT_SET;
1344 }
1345 if (level == 0) {
1346 return Env::WLTH_MEDIUM;
1347 }
1348 int base_level = current_->storage_info()->base_level();
1349
1350 // L1: medium, L2: long, ...
1351 if (level - base_level >= 2) {
1352 return Env::WLTH_EXTREME;
1353 } else if (level < base_level) {
1354 // There is no restriction which prevents level passed in to be smaller
1355 // than base_level.
1356 return Env::WLTH_MEDIUM;
1357 }
1358 return static_cast<Env::WriteLifeTimeHint>(level - base_level +
1359 static_cast<int>(Env::WLTH_MEDIUM));
1360 }
1361
AddDirectories(std::map<std::string,std::shared_ptr<FSDirectory>> * created_dirs)1362 Status ColumnFamilyData::AddDirectories(
1363 std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
1364 Status s;
1365 assert(created_dirs != nullptr);
1366 assert(data_dirs_.empty());
1367 for (auto& p : ioptions_.cf_paths) {
1368 auto existing_dir = created_dirs->find(p.path);
1369
1370 if (existing_dir == created_dirs->end()) {
1371 std::unique_ptr<FSDirectory> path_directory;
1372 s = DBImpl::CreateAndNewDirectory(ioptions_.fs, p.path, &path_directory);
1373 if (!s.ok()) {
1374 return s;
1375 }
1376 assert(path_directory != nullptr);
1377 data_dirs_.emplace_back(path_directory.release());
1378 (*created_dirs)[p.path] = data_dirs_.back();
1379 } else {
1380 data_dirs_.emplace_back(existing_dir->second);
1381 }
1382 }
1383 assert(data_dirs_.size() == ioptions_.cf_paths.size());
1384 return s;
1385 }
1386
GetDataDir(size_t path_id) const1387 FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
1388 if (data_dirs_.empty()) {
1389 return nullptr;
1390 }
1391
1392 assert(path_id < data_dirs_.size());
1393 return data_dirs_[path_id].get();
1394 }
1395
ColumnFamilySet(const std::string & dbname,const ImmutableDBOptions * db_options,const FileOptions & file_options,Cache * table_cache,WriteBufferManager * _write_buffer_manager,WriteController * _write_controller,BlockCacheTracer * const block_cache_tracer)1396 ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1397 const ImmutableDBOptions* db_options,
1398 const FileOptions& file_options,
1399 Cache* table_cache,
1400 WriteBufferManager* _write_buffer_manager,
1401 WriteController* _write_controller,
1402 BlockCacheTracer* const block_cache_tracer)
1403 : max_column_family_(0),
1404 dummy_cfd_(new ColumnFamilyData(
1405 ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
1406 nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr,
1407 block_cache_tracer)),
1408 default_cfd_cache_(nullptr),
1409 db_name_(dbname),
1410 db_options_(db_options),
1411 file_options_(file_options),
1412 table_cache_(table_cache),
1413 write_buffer_manager_(_write_buffer_manager),
1414 write_controller_(_write_controller),
1415 block_cache_tracer_(block_cache_tracer) {
1416 // initialize linked list
1417 dummy_cfd_->prev_ = dummy_cfd_;
1418 dummy_cfd_->next_ = dummy_cfd_;
1419 }
1420
~ColumnFamilySet()1421 ColumnFamilySet::~ColumnFamilySet() {
1422 while (column_family_data_.size() > 0) {
1423 // cfd destructor will delete itself from column_family_data_
1424 auto cfd = column_family_data_.begin()->second;
1425 bool last_ref __attribute__((__unused__));
1426 last_ref = cfd->UnrefAndTryDelete();
1427 assert(last_ref);
1428 }
1429 bool dummy_last_ref __attribute__((__unused__));
1430 dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1431 assert(dummy_last_ref);
1432 }
1433
GetDefault() const1434 ColumnFamilyData* ColumnFamilySet::GetDefault() const {
1435 assert(default_cfd_cache_ != nullptr);
1436 return default_cfd_cache_;
1437 }
1438
GetColumnFamily(uint32_t id) const1439 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
1440 auto cfd_iter = column_family_data_.find(id);
1441 if (cfd_iter != column_family_data_.end()) {
1442 return cfd_iter->second;
1443 } else {
1444 return nullptr;
1445 }
1446 }
1447
GetColumnFamily(const std::string & name) const1448 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
1449 const {
1450 auto cfd_iter = column_families_.find(name);
1451 if (cfd_iter != column_families_.end()) {
1452 auto cfd = GetColumnFamily(cfd_iter->second);
1453 assert(cfd != nullptr);
1454 return cfd;
1455 } else {
1456 return nullptr;
1457 }
1458 }
1459
GetNextColumnFamilyID()1460 uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
1461 return ++max_column_family_;
1462 }
1463
GetMaxColumnFamily()1464 uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
1465
UpdateMaxColumnFamily(uint32_t new_max_column_family)1466 void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
1467 max_column_family_ = std::max(new_max_column_family, max_column_family_);
1468 }
1469
NumberOfColumnFamilies() const1470 size_t ColumnFamilySet::NumberOfColumnFamilies() const {
1471 return column_families_.size();
1472 }
1473
1474 // under a DB mutex AND write thread
CreateColumnFamily(const std::string & name,uint32_t id,Version * dummy_versions,const ColumnFamilyOptions & options)1475 ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
1476 const std::string& name, uint32_t id, Version* dummy_versions,
1477 const ColumnFamilyOptions& options) {
1478 assert(column_families_.find(name) == column_families_.end());
1479 ColumnFamilyData* new_cfd = new ColumnFamilyData(
1480 id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1481 *db_options_, file_options_, this, block_cache_tracer_);
1482 column_families_.insert({name, id});
1483 column_family_data_.insert({id, new_cfd});
1484 max_column_family_ = std::max(max_column_family_, id);
1485 // add to linked list
1486 new_cfd->next_ = dummy_cfd_;
1487 auto prev = dummy_cfd_->prev_;
1488 new_cfd->prev_ = prev;
1489 prev->next_ = new_cfd;
1490 dummy_cfd_->prev_ = new_cfd;
1491 if (id == 0) {
1492 default_cfd_cache_ = new_cfd;
1493 }
1494 return new_cfd;
1495 }
1496
1497 // REQUIRES: DB mutex held
FreeDeadColumnFamilies()1498 void ColumnFamilySet::FreeDeadColumnFamilies() {
1499 autovector<ColumnFamilyData*> to_delete;
1500 for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
1501 if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1502 to_delete.push_back(cfd);
1503 }
1504 }
1505 for (auto cfd : to_delete) {
1506 // this is very rare, so it's not a problem that we do it under a mutex
1507 delete cfd;
1508 }
1509 }
1510
1511 // under a DB mutex AND from a write thread
RemoveColumnFamily(ColumnFamilyData * cfd)1512 void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1513 auto cfd_iter = column_family_data_.find(cfd->GetID());
1514 assert(cfd_iter != column_family_data_.end());
1515 column_family_data_.erase(cfd_iter);
1516 column_families_.erase(cfd->GetName());
1517 }
1518
1519 // under a DB mutex OR from a write thread
Seek(uint32_t column_family_id)1520 bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1521 if (column_family_id == 0) {
1522 // optimization for common case
1523 current_ = column_family_set_->GetDefault();
1524 } else {
1525 current_ = column_family_set_->GetColumnFamily(column_family_id);
1526 }
1527 handle_.SetCFD(current_);
1528 return current_ != nullptr;
1529 }
1530
GetLogNumber() const1531 uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1532 assert(current_ != nullptr);
1533 return current_->GetLogNumber();
1534 }
1535
GetMemTable() const1536 MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1537 assert(current_ != nullptr);
1538 return current_->mem();
1539 }
1540
GetColumnFamilyHandle()1541 ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1542 assert(current_ != nullptr);
1543 return &handle_;
1544 }
1545
GetColumnFamilyID(ColumnFamilyHandle * column_family)1546 uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1547 uint32_t column_family_id = 0;
1548 if (column_family != nullptr) {
1549 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1550 column_family_id = cfh->GetID();
1551 }
1552 return column_family_id;
1553 }
1554
GetColumnFamilyUserComparator(ColumnFamilyHandle * column_family)1555 const Comparator* GetColumnFamilyUserComparator(
1556 ColumnFamilyHandle* column_family) {
1557 if (column_family != nullptr) {
1558 return column_family->GetComparator();
1559 }
1560 return nullptr;
1561 }
1562
1563 } // namespace ROCKSDB_NAMESPACE
1564