1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <stdint.h>
12 #ifdef OS_SOLARIS
13 #include <alloca.h>
14 #endif
15
16 #include <algorithm>
17 #include <cinttypes>
18 #include <cstdio>
19 #include <map>
20 #include <set>
21 #include <stdexcept>
22 #include <string>
23 #include <unordered_map>
24 #include <utility>
25 #include <vector>
26
27 #include "db/arena_wrapped_db_iter.h"
28 #include "db/builder.h"
29 #include "db/compaction/compaction_job.h"
30 #include "db/db_info_dumper.h"
31 #include "db/db_iter.h"
32 #include "db/dbformat.h"
33 #include "db/error_handler.h"
34 #include "db/event_helpers.h"
35 #include "db/external_sst_file_ingestion_job.h"
36 #include "db/flush_job.h"
37 #include "db/forward_iterator.h"
38 #include "db/import_column_family_job.h"
39 #include "db/job_context.h"
40 #include "db/log_reader.h"
41 #include "db/log_writer.h"
42 #include "db/malloc_stats.h"
43 #include "db/memtable.h"
44 #include "db/memtable_list.h"
45 #include "db/merge_context.h"
46 #include "db/merge_helper.h"
47 #include "db/range_tombstone_fragmenter.h"
48 #include "db/table_cache.h"
49 #include "db/table_properties_collector.h"
50 #include "db/transaction_log_impl.h"
51 #include "db/version_set.h"
52 #include "db/write_batch_internal.h"
53 #include "db/write_callback.h"
54 #include "env/composite_env_wrapper.h"
55 #include "file/file_util.h"
56 #include "file/filename.h"
57 #include "file/random_access_file_reader.h"
58 #include "file/sst_file_manager_impl.h"
59 #include "logging/auto_roll_logger.h"
60 #include "logging/log_buffer.h"
61 #include "logging/logging.h"
62 #include "memtable/hash_linklist_rep.h"
63 #include "memtable/hash_skiplist_rep.h"
64 #include "monitoring/in_memory_stats_history.h"
65 #include "monitoring/iostats_context_imp.h"
66 #include "monitoring/perf_context_imp.h"
67 #include "monitoring/persistent_stats_history.h"
68 #include "monitoring/thread_status_updater.h"
69 #include "monitoring/thread_status_util.h"
70 #include "options/cf_options.h"
71 #include "options/options_helper.h"
72 #include "options/options_parser.h"
73 #include "port/port.h"
74 #include "rocksdb/cache.h"
75 #include "rocksdb/compaction_filter.h"
76 #include "rocksdb/convenience.h"
77 #include "rocksdb/db.h"
78 #include "rocksdb/env.h"
79 #include "rocksdb/merge_operator.h"
80 #include "rocksdb/statistics.h"
81 #include "rocksdb/stats_history.h"
82 #include "rocksdb/status.h"
83 #include "rocksdb/table.h"
84 #include "rocksdb/write_buffer_manager.h"
85 #include "table/block_based/block.h"
86 #include "table/block_based/block_based_table_factory.h"
87 #include "table/get_context.h"
88 #include "table/merging_iterator.h"
89 #include "table/multiget_context.h"
90 #include "table/table_builder.h"
91 #include "table/two_level_iterator.h"
92 #include "test_util/sync_point.h"
93 #include "tools/sst_dump_tool_imp.h"
94 #include "util/autovector.h"
95 #include "util/build_version.h"
96 #include "util/cast_util.h"
97 #include "util/coding.h"
98 #include "util/compression.h"
99 #include "util/crc32c.h"
100 #include "util/mutexlock.h"
101 #include "util/stop_watch.h"
102 #include "util/string_util.h"
103
104 namespace ROCKSDB_NAMESPACE {
105
106 const std::string kDefaultColumnFamilyName("default");
107 const std::string kPersistentStatsColumnFamilyName(
108 "___rocksdb_stats_history___");
109 void DumpRocksDBBuildVersion(Logger* log);
110
GetCompressionFlush(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options)111 CompressionType GetCompressionFlush(
112 const ImmutableCFOptions& ioptions,
113 const MutableCFOptions& mutable_cf_options) {
114 // Compressing memtable flushes might not help unless the sequential load
115 // optimization is used for leveled compaction. Otherwise the CPU and
116 // latency overhead is not offset by saving much space.
117 if (ioptions.compaction_style == kCompactionStyleUniversal) {
118 if (mutable_cf_options.compaction_options_universal
119 .compression_size_percent < 0) {
120 return mutable_cf_options.compression;
121 } else {
122 return kNoCompression;
123 }
124 } else if (!ioptions.compression_per_level.empty()) {
125 // For leveled compress when min_level_to_compress != 0.
126 return ioptions.compression_per_level[0];
127 } else {
128 return mutable_cf_options.compression;
129 }
130 }
131
132 namespace {
DumpSupportInfo(Logger * logger)133 void DumpSupportInfo(Logger* logger) {
134 ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
135 for (auto& compression : OptionsHelper::compression_type_string_map) {
136 if (compression.second != kNoCompression &&
137 compression.second != kDisableCompressionOption) {
138 ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
139 CompressionTypeSupported(compression.second));
140 }
141 }
142 ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
143 crc32c::IsFastCrc32Supported().c_str());
144 }
145 } // namespace
146
DBImpl(const DBOptions & options,const std::string & dbname,const bool seq_per_batch,const bool batch_per_txn)147 DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
148 const bool seq_per_batch, const bool batch_per_txn)
149 : dbname_(dbname),
150 own_info_log_(options.info_log == nullptr),
151 initial_db_options_(SanitizeOptions(dbname, options)),
152 env_(initial_db_options_.env),
153 fs_(initial_db_options_.env->GetFileSystem()),
154 immutable_db_options_(initial_db_options_),
155 mutable_db_options_(initial_db_options_),
156 stats_(immutable_db_options_.statistics.get()),
157 mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
158 immutable_db_options_.use_adaptive_mutex),
159 default_cf_handle_(nullptr),
160 max_total_in_memory_state_(0),
161 file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
162 file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
163 file_options_, immutable_db_options_)),
164 seq_per_batch_(seq_per_batch),
165 batch_per_txn_(batch_per_txn),
166 db_lock_(nullptr),
167 shutting_down_(false),
168 manual_compaction_paused_(false),
169 bg_cv_(&mutex_),
170 logfile_number_(0),
171 log_dir_synced_(false),
172 log_empty_(true),
173 persist_stats_cf_handle_(nullptr),
174 log_sync_cv_(&mutex_),
175 total_log_size_(0),
176 is_snapshot_supported_(true),
177 write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
178 write_thread_(immutable_db_options_),
179 nonmem_write_thread_(immutable_db_options_),
180 write_controller_(mutable_db_options_.delayed_write_rate),
181 last_batch_group_size_(0),
182 unscheduled_flushes_(0),
183 unscheduled_compactions_(0),
184 bg_bottom_compaction_scheduled_(0),
185 bg_compaction_scheduled_(0),
186 num_running_compactions_(0),
187 bg_flush_scheduled_(0),
188 num_running_flushes_(0),
189 bg_purge_scheduled_(0),
190 disable_delete_obsolete_files_(0),
191 pending_purge_obsolete_files_(0),
192 delete_obsolete_files_last_run_(env_->NowMicros()),
193 last_stats_dump_time_microsec_(0),
194 next_job_id_(1),
195 has_unpersisted_data_(false),
196 unable_to_release_oldest_log_(false),
197 num_running_ingest_file_(0),
198 #ifndef ROCKSDB_LITE
199 wal_manager_(immutable_db_options_, file_options_, seq_per_batch),
200 #endif // ROCKSDB_LITE
201 event_logger_(immutable_db_options_.info_log.get()),
202 bg_work_paused_(0),
203 bg_compaction_paused_(0),
204 refitting_level_(false),
205 opened_successfully_(false),
206 two_write_queues_(options.two_write_queues),
207 manual_wal_flush_(options.manual_wal_flush),
208 // last_sequencee_ is always maintained by the main queue that also writes
209 // to the memtable. When two_write_queues_ is disabled last seq in
210 // memtable is the same as last seq published to the readers. When it is
211 // enabled but seq_per_batch_ is disabled, last seq in memtable still
212 // indicates last published seq since wal-only writes that go to the 2nd
213 // queue do not consume a sequence number. Otherwise writes performed by
214 // the 2nd queue could change what is visible to the readers. In this
215 // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
216 // separate variable to indicate the last published sequence.
217 last_seq_same_as_publish_seq_(
218 !(seq_per_batch && options.two_write_queues)),
219 // Since seq_per_batch_ is currently set only by WritePreparedTxn which
220 // requires a custom gc for compaction, we use that to set use_custom_gc_
221 // as well.
222 use_custom_gc_(seq_per_batch),
223 shutdown_initiated_(false),
224 own_sfm_(options.sst_file_manager == nullptr),
225 preserve_deletes_(options.preserve_deletes),
226 closed_(false),
227 error_handler_(this, immutable_db_options_, &mutex_),
228 atomic_flush_install_cv_(&mutex_) {
229 // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
230 // WriteUnprepared, which should use seq_per_batch_.
231 assert(batch_per_txn_ || seq_per_batch_);
232 env_->GetAbsolutePath(dbname, &db_absolute_path_);
233
234 // Reserve ten files or so for other uses and give the rest to TableCache.
235 // Give a large number for setting of "infinite" open files.
236 const int table_cache_size = (mutable_db_options_.max_open_files == -1)
237 ? TableCache::kInfiniteCapacity
238 : mutable_db_options_.max_open_files - 10;
239 LRUCacheOptions co;
240 co.capacity = table_cache_size;
241 co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
242 co.metadata_charge_policy = kDontChargeCacheMetadata;
243 table_cache_ = NewLRUCache(co);
244
245 versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
246 table_cache_.get(), write_buffer_manager_,
247 &write_controller_, &block_cache_tracer_));
248 column_family_memtables_.reset(
249 new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
250
251 DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
252 DumpDBFileSummary(immutable_db_options_, dbname_);
253 immutable_db_options_.Dump(immutable_db_options_.info_log.get());
254 mutable_db_options_.Dump(immutable_db_options_.info_log.get());
255 DumpSupportInfo(immutable_db_options_.info_log.get());
256
257 // always open the DB with 0 here, which means if preserve_deletes_==true
258 // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
259 // is called by client and this seqnum is advanced.
260 preserve_deletes_seqnum_.store(0);
261 }
262
Resume()263 Status DBImpl::Resume() {
264 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
265
266 InstrumentedMutexLock db_mutex(&mutex_);
267
268 if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
269 // Nothing to do
270 return Status::OK();
271 }
272
273 if (error_handler_.IsRecoveryInProgress()) {
274 // Don't allow a mix of manual and automatic recovery
275 return Status::Busy();
276 }
277
278 mutex_.Unlock();
279 Status s = error_handler_.RecoverFromBGError(true);
280 mutex_.Lock();
281 return s;
282 }
283
284 // This function implements the guts of recovery from a background error. It
285 // is eventually called for both manual as well as automatic recovery. It does
286 // the following -
287 // 1. Wait for currently scheduled background flush/compaction to exit, in
288 // order to inadvertently causing an error and thinking recovery failed
289 // 2. Flush memtables if there's any data for all the CFs. This may result
290 // another error, which will be saved by error_handler_ and reported later
291 // as the recovery status
292 // 3. Find and delete any obsolete files
293 // 4. Schedule compactions if needed for all the CFs. This is needed as the
294 // flush in the prior step might have been a no-op for some CFs, which
295 // means a new super version wouldn't have been installed
ResumeImpl()296 Status DBImpl::ResumeImpl() {
297 mutex_.AssertHeld();
298 WaitForBackgroundWork();
299
300 Status bg_error = error_handler_.GetBGError();
301 Status s;
302 if (shutdown_initiated_) {
303 // Returning shutdown status to SFM during auto recovery will cause it
304 // to abort the recovery and allow the shutdown to progress
305 s = Status::ShutdownInProgress();
306 }
307 if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
308 ROCKS_LOG_INFO(
309 immutable_db_options_.info_log,
310 "DB resume requested but failed due to Fatal/Unrecoverable error");
311 s = bg_error;
312 }
313
314 // Make sure the IO Status stored in version set is set to OK.
315 if (s.ok()) {
316 versions_->SetIOStatusOK();
317 }
318
319 // We cannot guarantee consistency of the WAL. So force flush Memtables of
320 // all the column families
321 if (s.ok()) {
322 FlushOptions flush_opts;
323 // We allow flush to stall write since we are trying to resume from error.
324 flush_opts.allow_write_stall = true;
325 if (immutable_db_options_.atomic_flush) {
326 autovector<ColumnFamilyData*> cfds;
327 SelectColumnFamiliesForAtomicFlush(&cfds);
328 mutex_.Unlock();
329 s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery);
330 mutex_.Lock();
331 } else {
332 for (auto cfd : *versions_->GetColumnFamilySet()) {
333 if (cfd->IsDropped()) {
334 continue;
335 }
336 cfd->Ref();
337 mutex_.Unlock();
338 s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
339 mutex_.Lock();
340 cfd->UnrefAndTryDelete();
341 if (!s.ok()) {
342 break;
343 }
344 }
345 }
346 if (!s.ok()) {
347 ROCKS_LOG_INFO(immutable_db_options_.info_log,
348 "DB resume requested but failed due to Flush failure [%s]",
349 s.ToString().c_str());
350 }
351 }
352
353 JobContext job_context(0);
354 FindObsoleteFiles(&job_context, true);
355 if (s.ok()) {
356 s = error_handler_.ClearBGError();
357 }
358 mutex_.Unlock();
359
360 job_context.manifest_file_number = 1;
361 if (job_context.HaveSomethingToDelete()) {
362 PurgeObsoleteFiles(job_context);
363 }
364 job_context.Clean();
365
366 if (s.ok()) {
367 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
368 }
369 mutex_.Lock();
370 // Check for shutdown again before scheduling further compactions,
371 // since we released and re-acquired the lock above
372 if (shutdown_initiated_) {
373 s = Status::ShutdownInProgress();
374 }
375 if (s.ok()) {
376 for (auto cfd : *versions_->GetColumnFamilySet()) {
377 SchedulePendingCompaction(cfd);
378 }
379 MaybeScheduleFlushOrCompaction();
380 }
381
382 // Wake up any waiters - in this case, it could be the shutdown thread
383 bg_cv_.SignalAll();
384
385 // No need to check BGError again. If something happened, event listener would
386 // be notified and the operation causing it would have failed
387 return s;
388 }
389
WaitForBackgroundWork()390 void DBImpl::WaitForBackgroundWork() {
391 // Wait for background work to finish
392 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
393 bg_flush_scheduled_) {
394 bg_cv_.Wait();
395 }
396 }
397
398 // Will lock the mutex_, will wait for completion if wait is true
CancelAllBackgroundWork(bool wait)399 void DBImpl::CancelAllBackgroundWork(bool wait) {
400 ROCKS_LOG_INFO(immutable_db_options_.info_log,
401 "Shutdown: canceling all background work");
402
403 if (thread_dump_stats_ != nullptr) {
404 thread_dump_stats_->cancel();
405 thread_dump_stats_.reset();
406 }
407 if (thread_persist_stats_ != nullptr) {
408 thread_persist_stats_->cancel();
409 thread_persist_stats_.reset();
410 }
411 InstrumentedMutexLock l(&mutex_);
412 if (!shutting_down_.load(std::memory_order_acquire) &&
413 has_unpersisted_data_.load(std::memory_order_relaxed) &&
414 !mutable_db_options_.avoid_flush_during_shutdown) {
415 if (immutable_db_options_.atomic_flush) {
416 autovector<ColumnFamilyData*> cfds;
417 SelectColumnFamiliesForAtomicFlush(&cfds);
418 mutex_.Unlock();
419 AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
420 mutex_.Lock();
421 } else {
422 for (auto cfd : *versions_->GetColumnFamilySet()) {
423 if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
424 cfd->Ref();
425 mutex_.Unlock();
426 FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
427 mutex_.Lock();
428 cfd->UnrefAndTryDelete();
429 }
430 }
431 }
432 versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
433 }
434
435 shutting_down_.store(true, std::memory_order_release);
436 bg_cv_.SignalAll();
437 if (!wait) {
438 return;
439 }
440 WaitForBackgroundWork();
441 }
442
CloseHelper()443 Status DBImpl::CloseHelper() {
444 // Guarantee that there is no background error recovery in progress before
445 // continuing with the shutdown
446 mutex_.Lock();
447 shutdown_initiated_ = true;
448 error_handler_.CancelErrorRecovery();
449 while (error_handler_.IsRecoveryInProgress()) {
450 bg_cv_.Wait();
451 }
452 mutex_.Unlock();
453
454 // CancelAllBackgroundWork called with false means we just set the shutdown
455 // marker. After this we do a variant of the waiting and unschedule work
456 // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
457 CancelAllBackgroundWork(false);
458 int bottom_compactions_unscheduled =
459 env_->UnSchedule(this, Env::Priority::BOTTOM);
460 int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
461 int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
462 Status ret;
463 mutex_.Lock();
464 bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
465 bg_compaction_scheduled_ -= compactions_unscheduled;
466 bg_flush_scheduled_ -= flushes_unscheduled;
467
468 // Wait for background work to finish
469 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
470 bg_flush_scheduled_ || bg_purge_scheduled_ ||
471 pending_purge_obsolete_files_ ||
472 error_handler_.IsRecoveryInProgress()) {
473 TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
474 bg_cv_.Wait();
475 }
476 TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
477 &files_grabbed_for_purge_);
478 EraseThreadStatusDbInfo();
479 flush_scheduler_.Clear();
480 trim_history_scheduler_.Clear();
481
482 while (!flush_queue_.empty()) {
483 const FlushRequest& flush_req = PopFirstFromFlushQueue();
484 for (const auto& iter : flush_req) {
485 iter.first->UnrefAndTryDelete();
486 }
487 }
488 while (!compaction_queue_.empty()) {
489 auto cfd = PopFirstFromCompactionQueue();
490 cfd->UnrefAndTryDelete();
491 }
492
493 if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
494 // we need to delete handle outside of lock because it does its own locking
495 mutex_.Unlock();
496 if (default_cf_handle_) {
497 delete default_cf_handle_;
498 default_cf_handle_ = nullptr;
499 }
500 if (persist_stats_cf_handle_) {
501 delete persist_stats_cf_handle_;
502 persist_stats_cf_handle_ = nullptr;
503 }
504 mutex_.Lock();
505 }
506
507 // Clean up obsolete files due to SuperVersion release.
508 // (1) Need to delete to obsolete files before closing because RepairDB()
509 // scans all existing files in the file system and builds manifest file.
510 // Keeping obsolete files confuses the repair process.
511 // (2) Need to check if we Open()/Recover() the DB successfully before
512 // deleting because if VersionSet recover fails (may be due to corrupted
513 // manifest file), it is not able to identify live files correctly. As a
514 // result, all "live" files can get deleted by accident. However, corrupted
515 // manifest is recoverable by RepairDB().
516 if (opened_successfully_) {
517 JobContext job_context(next_job_id_.fetch_add(1));
518 FindObsoleteFiles(&job_context, true);
519
520 mutex_.Unlock();
521 // manifest number starting from 2
522 job_context.manifest_file_number = 1;
523 if (job_context.HaveSomethingToDelete()) {
524 PurgeObsoleteFiles(job_context);
525 }
526 job_context.Clean();
527 mutex_.Lock();
528 }
529
530 for (auto l : logs_to_free_) {
531 delete l;
532 }
533 for (auto& log : logs_) {
534 uint64_t log_number = log.writer->get_log_number();
535 Status s = log.ClearWriter();
536 if (!s.ok()) {
537 ROCKS_LOG_WARN(
538 immutable_db_options_.info_log,
539 "Unable to Sync WAL file %s with error -- %s",
540 LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
541 s.ToString().c_str());
542 // Retain the first error
543 if (ret.ok()) {
544 ret = s;
545 }
546 }
547 }
548 logs_.clear();
549
550 // Table cache may have table handles holding blocks from the block cache.
551 // We need to release them before the block cache is destroyed. The block
552 // cache may be destroyed inside versions_.reset(), when column family data
553 // list is destroyed, so leaving handles in table cache after
554 // versions_.reset() may cause issues.
555 // Here we clean all unreferenced handles in table cache.
556 // Now we assume all user queries have finished, so only version set itself
557 // can possibly hold the blocks from block cache. After releasing unreferenced
558 // handles here, only handles held by version set left and inside
559 // versions_.reset(), we will release them. There, we need to make sure every
560 // time a handle is released, we erase it from the cache too. By doing that,
561 // we can guarantee that after versions_.reset(), table cache is empty
562 // so the cache can be safely destroyed.
563 table_cache_->EraseUnRefEntries();
564
565 for (auto& txn_entry : recovered_transactions_) {
566 delete txn_entry.second;
567 }
568
569 // versions need to be destroyed before table_cache since it can hold
570 // references to table_cache.
571 versions_.reset();
572 mutex_.Unlock();
573 if (db_lock_ != nullptr) {
574 env_->UnlockFile(db_lock_);
575 }
576
577 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
578 LogFlush(immutable_db_options_.info_log);
579
580 #ifndef ROCKSDB_LITE
581 // If the sst_file_manager was allocated by us during DB::Open(), ccall
582 // Close() on it before closing the info_log. Otherwise, background thread
583 // in SstFileManagerImpl might try to log something
584 if (immutable_db_options_.sst_file_manager && own_sfm_) {
585 auto sfm = static_cast<SstFileManagerImpl*>(
586 immutable_db_options_.sst_file_manager.get());
587 sfm->Close();
588 }
589 #endif // ROCKSDB_LITE
590
591 if (immutable_db_options_.info_log && own_info_log_) {
592 Status s = immutable_db_options_.info_log->Close();
593 if (ret.ok()) {
594 ret = s;
595 }
596 }
597
598 if (ret.IsAborted()) {
599 // Reserve IsAborted() error for those where users didn't release
600 // certain resource and they can release them and come back and
601 // retry. In this case, we wrap this exception to something else.
602 return Status::Incomplete(ret.ToString());
603 }
604 return ret;
605 }
606
CloseImpl()607 Status DBImpl::CloseImpl() { return CloseHelper(); }
608
~DBImpl()609 DBImpl::~DBImpl() {
610 if (!closed_) {
611 closed_ = true;
612 CloseHelper();
613 }
614 }
615
MaybeIgnoreError(Status * s) const616 void DBImpl::MaybeIgnoreError(Status* s) const {
617 if (s->ok() || immutable_db_options_.paranoid_checks) {
618 // No change needed
619 } else {
620 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
621 s->ToString().c_str());
622 *s = Status::OK();
623 }
624 }
625
CreateArchivalDirectory()626 const Status DBImpl::CreateArchivalDirectory() {
627 if (immutable_db_options_.wal_ttl_seconds > 0 ||
628 immutable_db_options_.wal_size_limit_mb > 0) {
629 std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
630 return env_->CreateDirIfMissing(archivalPath);
631 }
632 return Status::OK();
633 }
634
PrintStatistics()635 void DBImpl::PrintStatistics() {
636 auto dbstats = immutable_db_options_.statistics.get();
637 if (dbstats) {
638 ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
639 dbstats->ToString().c_str());
640 }
641 }
642
StartTimedTasks()643 void DBImpl::StartTimedTasks() {
644 unsigned int stats_dump_period_sec = 0;
645 unsigned int stats_persist_period_sec = 0;
646 {
647 InstrumentedMutexLock l(&mutex_);
648 stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
649 if (stats_dump_period_sec > 0) {
650 if (!thread_dump_stats_) {
651 thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
652 [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
653 static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond));
654 }
655 }
656 stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
657 if (stats_persist_period_sec > 0) {
658 if (!thread_persist_stats_) {
659 thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
660 [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
661 static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond));
662 }
663 }
664 }
665 }
666
667 // esitmate the total size of stats_history_
EstimateInMemoryStatsHistorySize() const668 size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
669 size_t size_total =
670 sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
671 if (stats_history_.size() == 0) return size_total;
672 size_t size_per_slice =
673 sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
674 // non-empty map, stats_history_.begin() guaranteed to exist
675 std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
676 for (const auto& pairs : sample_slice) {
677 size_per_slice +=
678 pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
679 }
680 size_total = size_per_slice * stats_history_.size();
681 return size_total;
682 }
683
PersistStats()684 void DBImpl::PersistStats() {
685 TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
686 #ifndef ROCKSDB_LITE
687 if (shutdown_initiated_) {
688 return;
689 }
690 uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond;
691 Statistics* statistics = immutable_db_options_.statistics.get();
692 if (!statistics) {
693 return;
694 }
695 size_t stats_history_size_limit = 0;
696 {
697 InstrumentedMutexLock l(&mutex_);
698 stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
699 }
700
701 std::map<std::string, uint64_t> stats_map;
702 if (!statistics->getTickerMap(&stats_map)) {
703 return;
704 }
705 ROCKS_LOG_INFO(immutable_db_options_.info_log,
706 "------- PERSISTING STATS -------");
707
708 if (immutable_db_options_.persist_stats_to_disk) {
709 WriteBatch batch;
710 if (stats_slice_initialized_) {
711 ROCKS_LOG_INFO(immutable_db_options_.info_log,
712 "Reading %" ROCKSDB_PRIszt " stats from statistics\n",
713 stats_slice_.size());
714 for (const auto& stat : stats_map) {
715 char key[100];
716 int length =
717 EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
718 // calculate the delta from last time
719 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
720 uint64_t delta = stat.second - stats_slice_[stat.first];
721 batch.Put(persist_stats_cf_handle_, Slice(key, std::min(100, length)),
722 ToString(delta));
723 }
724 }
725 }
726 stats_slice_initialized_ = true;
727 std::swap(stats_slice_, stats_map);
728 WriteOptions wo;
729 wo.low_pri = true;
730 wo.no_slowdown = true;
731 wo.sync = false;
732 Status s = Write(wo, &batch);
733 if (!s.ok()) {
734 ROCKS_LOG_INFO(immutable_db_options_.info_log,
735 "Writing to persistent stats CF failed -- %s",
736 s.ToString().c_str());
737 } else {
738 ROCKS_LOG_INFO(immutable_db_options_.info_log,
739 "Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
740 " to persistent stats CF succeeded",
741 stats_slice_.size(), now_seconds);
742 }
743 // TODO(Zhongyi): add purging for persisted data
744 } else {
745 InstrumentedMutexLock l(&stats_history_mutex_);
746 // calculate the delta from last time
747 if (stats_slice_initialized_) {
748 std::map<std::string, uint64_t> stats_delta;
749 for (const auto& stat : stats_map) {
750 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
751 stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
752 }
753 }
754 ROCKS_LOG_INFO(immutable_db_options_.info_log,
755 "Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
756 " to in-memory stats history",
757 stats_slice_.size(), now_seconds);
758 stats_history_[now_seconds] = stats_delta;
759 }
760 stats_slice_initialized_ = true;
761 std::swap(stats_slice_, stats_map);
762 TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
763
764 // delete older stats snapshots to control memory consumption
765 size_t stats_history_size = EstimateInMemoryStatsHistorySize();
766 bool purge_needed = stats_history_size > stats_history_size_limit;
767 ROCKS_LOG_INFO(immutable_db_options_.info_log,
768 "[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
769 " bytes, slice count: %" ROCKSDB_PRIszt,
770 stats_history_size, stats_history_.size());
771 while (purge_needed && !stats_history_.empty()) {
772 stats_history_.erase(stats_history_.begin());
773 purge_needed =
774 EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
775 }
776 ROCKS_LOG_INFO(immutable_db_options_.info_log,
777 "[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
778 " bytes, slice count: %" ROCKSDB_PRIszt,
779 stats_history_size, stats_history_.size());
780 }
781 #endif // !ROCKSDB_LITE
782 }
783
FindStatsByTime(uint64_t start_time,uint64_t end_time,uint64_t * new_time,std::map<std::string,uint64_t> * stats_map)784 bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
785 uint64_t* new_time,
786 std::map<std::string, uint64_t>* stats_map) {
787 assert(new_time);
788 assert(stats_map);
789 if (!new_time || !stats_map) return false;
790 // lock when search for start_time
791 {
792 InstrumentedMutexLock l(&stats_history_mutex_);
793 auto it = stats_history_.lower_bound(start_time);
794 if (it != stats_history_.end() && it->first < end_time) {
795 // make a copy for timestamp and stats_map
796 *new_time = it->first;
797 *stats_map = it->second;
798 return true;
799 } else {
800 return false;
801 }
802 }
803 }
804
GetStatsHistory(uint64_t start_time,uint64_t end_time,std::unique_ptr<StatsHistoryIterator> * stats_iterator)805 Status DBImpl::GetStatsHistory(
806 uint64_t start_time, uint64_t end_time,
807 std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
808 if (!stats_iterator) {
809 return Status::InvalidArgument("stats_iterator not preallocated.");
810 }
811 if (immutable_db_options_.persist_stats_to_disk) {
812 stats_iterator->reset(
813 new PersistentStatsHistoryIterator(start_time, end_time, this));
814 } else {
815 stats_iterator->reset(
816 new InMemoryStatsHistoryIterator(start_time, end_time, this));
817 }
818 return (*stats_iterator)->status();
819 }
820
DumpStats()821 void DBImpl::DumpStats() {
822 TEST_SYNC_POINT("DBImpl::DumpStats:1");
823 #ifndef ROCKSDB_LITE
824 const DBPropertyInfo* cf_property_info =
825 GetPropertyInfo(DB::Properties::kCFStats);
826 assert(cf_property_info != nullptr);
827 const DBPropertyInfo* db_property_info =
828 GetPropertyInfo(DB::Properties::kDBStats);
829 assert(db_property_info != nullptr);
830
831 std::string stats;
832 if (shutdown_initiated_) {
833 return;
834 }
835 {
836 InstrumentedMutexLock l(&mutex_);
837 default_cf_internal_stats_->GetStringProperty(
838 *db_property_info, DB::Properties::kDBStats, &stats);
839 for (auto cfd : *versions_->GetColumnFamilySet()) {
840 if (cfd->initialized()) {
841 cfd->internal_stats()->GetStringProperty(
842 *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
843 }
844 }
845 for (auto cfd : *versions_->GetColumnFamilySet()) {
846 if (cfd->initialized()) {
847 cfd->internal_stats()->GetStringProperty(
848 *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
849 }
850 }
851 }
852 TEST_SYNC_POINT("DBImpl::DumpStats:2");
853 ROCKS_LOG_INFO(immutable_db_options_.info_log,
854 "------- DUMPING STATS -------");
855 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
856 if (immutable_db_options_.dump_malloc_stats) {
857 stats.clear();
858 DumpMallocStats(&stats);
859 if (!stats.empty()) {
860 ROCKS_LOG_INFO(immutable_db_options_.info_log,
861 "------- Malloc STATS -------");
862 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
863 }
864 }
865 #endif // !ROCKSDB_LITE
866
867 PrintStatistics();
868 }
869
TablesRangeTombstoneSummary(ColumnFamilyHandle * column_family,int max_entries_to_print,std::string * out_str)870 Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
871 int max_entries_to_print,
872 std::string* out_str) {
873 auto* cfh =
874 static_cast_with_check<ColumnFamilyHandleImpl, ColumnFamilyHandle>(
875 column_family);
876 ColumnFamilyData* cfd = cfh->cfd();
877
878 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
879 Version* version = super_version->current;
880
881 Status s =
882 version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
883
884 CleanupSuperVersion(super_version);
885 return s;
886 }
887
ScheduleBgLogWriterClose(JobContext * job_context)888 void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
889 if (!job_context->logs_to_free.empty()) {
890 for (auto l : job_context->logs_to_free) {
891 AddToLogsToFreeQueue(l);
892 }
893 job_context->logs_to_free.clear();
894 }
895 }
896
GetDataDir(ColumnFamilyData * cfd,size_t path_id) const897 FSDirectory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
898 assert(cfd);
899 FSDirectory* ret_dir = cfd->GetDataDir(path_id);
900 if (ret_dir == nullptr) {
901 return directories_.GetDataDir(path_id);
902 }
903 return ret_dir;
904 }
905
SetOptions(ColumnFamilyHandle * column_family,const std::unordered_map<std::string,std::string> & options_map)906 Status DBImpl::SetOptions(
907 ColumnFamilyHandle* column_family,
908 const std::unordered_map<std::string, std::string>& options_map) {
909 #ifdef ROCKSDB_LITE
910 (void)column_family;
911 (void)options_map;
912 return Status::NotSupported("Not supported in ROCKSDB LITE");
913 #else
914 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
915 if (options_map.empty()) {
916 ROCKS_LOG_WARN(immutable_db_options_.info_log,
917 "SetOptions() on column family [%s], empty input",
918 cfd->GetName().c_str());
919 return Status::InvalidArgument("empty input");
920 }
921
922 MutableCFOptions new_options;
923 Status s;
924 Status persist_options_status;
925 SuperVersionContext sv_context(/* create_superversion */ true);
926 {
927 auto db_options = GetDBOptions();
928 InstrumentedMutexLock l(&mutex_);
929 s = cfd->SetOptions(db_options, options_map);
930 if (s.ok()) {
931 new_options = *cfd->GetLatestMutableCFOptions();
932 // Append new version to recompute compaction score.
933 VersionEdit dummy_edit;
934 versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
935 directories_.GetDbDir());
936 // Trigger possible flush/compactions. This has to be before we persist
937 // options to file, otherwise there will be a deadlock with writer
938 // thread.
939 InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
940
941 persist_options_status = WriteOptionsFile(
942 false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
943 bg_cv_.SignalAll();
944 }
945 }
946 sv_context.Clean();
947
948 ROCKS_LOG_INFO(
949 immutable_db_options_.info_log,
950 "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
951 for (const auto& o : options_map) {
952 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
953 o.second.c_str());
954 }
955 if (s.ok()) {
956 ROCKS_LOG_INFO(immutable_db_options_.info_log,
957 "[%s] SetOptions() succeeded", cfd->GetName().c_str());
958 new_options.Dump(immutable_db_options_.info_log.get());
959 if (!persist_options_status.ok()) {
960 s = persist_options_status;
961 }
962 } else {
963 ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
964 cfd->GetName().c_str());
965 }
966 LogFlush(immutable_db_options_.info_log);
967 return s;
968 #endif // ROCKSDB_LITE
969 }
970
SetDBOptions(const std::unordered_map<std::string,std::string> & options_map)971 Status DBImpl::SetDBOptions(
972 const std::unordered_map<std::string, std::string>& options_map) {
973 #ifdef ROCKSDB_LITE
974 (void)options_map;
975 return Status::NotSupported("Not supported in ROCKSDB LITE");
976 #else
977 if (options_map.empty()) {
978 ROCKS_LOG_WARN(immutable_db_options_.info_log,
979 "SetDBOptions(), empty input.");
980 return Status::InvalidArgument("empty input");
981 }
982
983 MutableDBOptions new_options;
984 Status s;
985 Status persist_options_status;
986 bool wal_changed = false;
987 WriteContext write_context;
988 {
989 InstrumentedMutexLock l(&mutex_);
990 s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
991 &new_options);
992 if (new_options.bytes_per_sync == 0) {
993 new_options.bytes_per_sync = 1024 * 1024;
994 }
995 DBOptions new_db_options =
996 BuildDBOptions(immutable_db_options_, new_options);
997 if (s.ok()) {
998 s = ValidateOptions(new_db_options);
999 }
1000 if (s.ok()) {
1001 for (auto c : *versions_->GetColumnFamilySet()) {
1002 if (!c->IsDropped()) {
1003 auto cf_options = c->GetLatestCFOptions();
1004 s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
1005 if (!s.ok()) {
1006 break;
1007 }
1008 }
1009 }
1010 }
1011 if (s.ok()) {
1012 const BGJobLimits current_bg_job_limits =
1013 GetBGJobLimits(immutable_db_options_.max_background_flushes,
1014 mutable_db_options_.max_background_compactions,
1015 mutable_db_options_.max_background_jobs,
1016 /* parallelize_compactions */ true);
1017 const BGJobLimits new_bg_job_limits = GetBGJobLimits(
1018 immutable_db_options_.max_background_flushes,
1019 new_options.max_background_compactions,
1020 new_options.max_background_jobs, /* parallelize_compactions */ true);
1021
1022 const bool max_flushes_increased =
1023 new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
1024 const bool max_compactions_increased =
1025 new_bg_job_limits.max_compactions >
1026 current_bg_job_limits.max_compactions;
1027
1028 if (max_flushes_increased || max_compactions_increased) {
1029 if (max_flushes_increased) {
1030 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
1031 Env::Priority::HIGH);
1032 }
1033
1034 if (max_compactions_increased) {
1035 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
1036 Env::Priority::LOW);
1037 }
1038
1039 MaybeScheduleFlushOrCompaction();
1040 }
1041
1042 if (new_options.stats_dump_period_sec !=
1043 mutable_db_options_.stats_dump_period_sec) {
1044 if (thread_dump_stats_) {
1045 mutex_.Unlock();
1046 thread_dump_stats_->cancel();
1047 mutex_.Lock();
1048 }
1049 if (new_options.stats_dump_period_sec > 0) {
1050 thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
1051 [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
1052 static_cast<uint64_t>(new_options.stats_dump_period_sec) *
1053 kMicrosInSecond));
1054 } else {
1055 thread_dump_stats_.reset();
1056 }
1057 }
1058 if (new_options.stats_persist_period_sec !=
1059 mutable_db_options_.stats_persist_period_sec) {
1060 if (thread_persist_stats_) {
1061 mutex_.Unlock();
1062 thread_persist_stats_->cancel();
1063 mutex_.Lock();
1064 }
1065 if (new_options.stats_persist_period_sec > 0) {
1066 thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
1067 [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
1068 static_cast<uint64_t>(new_options.stats_persist_period_sec) *
1069 kMicrosInSecond));
1070 } else {
1071 thread_persist_stats_.reset();
1072 }
1073 }
1074 write_controller_.set_max_delayed_write_rate(
1075 new_options.delayed_write_rate);
1076 table_cache_.get()->SetCapacity(new_options.max_open_files == -1
1077 ? TableCache::kInfiniteCapacity
1078 : new_options.max_open_files - 10);
1079 wal_changed = mutable_db_options_.wal_bytes_per_sync !=
1080 new_options.wal_bytes_per_sync;
1081 mutable_db_options_ = new_options;
1082 file_options_for_compaction_ = FileOptions(new_db_options);
1083 file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
1084 file_options_for_compaction_, immutable_db_options_);
1085 versions_->ChangeFileOptions(mutable_db_options_);
1086 //TODO(xiez): clarify why apply optimize for read to write options
1087 file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
1088 file_options_for_compaction_, immutable_db_options_);
1089 file_options_for_compaction_.compaction_readahead_size =
1090 mutable_db_options_.compaction_readahead_size;
1091 WriteThread::Writer w;
1092 write_thread_.EnterUnbatched(&w, &mutex_);
1093 if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
1094 Status purge_wal_status = SwitchWAL(&write_context);
1095 if (!purge_wal_status.ok()) {
1096 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1097 "Unable to purge WAL files in SetDBOptions() -- %s",
1098 purge_wal_status.ToString().c_str());
1099 }
1100 }
1101 persist_options_status = WriteOptionsFile(
1102 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1103 write_thread_.ExitUnbatched(&w);
1104 }
1105 }
1106 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
1107 for (const auto& o : options_map) {
1108 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1109 o.second.c_str());
1110 }
1111 if (s.ok()) {
1112 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
1113 new_options.Dump(immutable_db_options_.info_log.get());
1114 if (!persist_options_status.ok()) {
1115 if (immutable_db_options_.fail_if_options_file_error) {
1116 s = Status::IOError(
1117 "SetDBOptions() succeeded, but unable to persist options",
1118 persist_options_status.ToString());
1119 }
1120 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1121 "Unable to persist options in SetDBOptions() -- %s",
1122 persist_options_status.ToString().c_str());
1123 }
1124 } else {
1125 ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
1126 }
1127 LogFlush(immutable_db_options_.info_log);
1128 return s;
1129 #endif // ROCKSDB_LITE
1130 }
1131
1132 // return the same level if it cannot be moved
FindMinimumEmptyLevelFitting(ColumnFamilyData * cfd,const MutableCFOptions &,int level)1133 int DBImpl::FindMinimumEmptyLevelFitting(
1134 ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
1135 int level) {
1136 mutex_.AssertHeld();
1137 const auto* vstorage = cfd->current()->storage_info();
1138 int minimum_level = level;
1139 for (int i = level - 1; i > 0; --i) {
1140 // stop if level i is not empty
1141 if (vstorage->NumLevelFiles(i) > 0) break;
1142 // stop if level i is too small (cannot fit the level files)
1143 if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
1144 break;
1145 }
1146
1147 minimum_level = i;
1148 }
1149 return minimum_level;
1150 }
1151
FlushWAL(bool sync)1152 Status DBImpl::FlushWAL(bool sync) {
1153 if (manual_wal_flush_) {
1154 IOStatus io_s;
1155 {
1156 // We need to lock log_write_mutex_ since logs_ might change concurrently
1157 InstrumentedMutexLock wl(&log_write_mutex_);
1158 log::Writer* cur_log_writer = logs_.back().writer;
1159 io_s = cur_log_writer->WriteBuffer();
1160 }
1161 if (!io_s.ok()) {
1162 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1163 io_s.ToString().c_str());
1164 // In case there is a fs error we should set it globally to prevent the
1165 // future writes
1166 IOStatusCheck(io_s);
1167 // whether sync or not, we should abort the rest of function upon error
1168 return std::move(io_s);
1169 }
1170 if (!sync) {
1171 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
1172 return std::move(io_s);
1173 }
1174 }
1175 if (!sync) {
1176 return Status::OK();
1177 }
1178 // sync = true
1179 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
1180 return SyncWAL();
1181 }
1182
SyncWAL()1183 Status DBImpl::SyncWAL() {
1184 autovector<log::Writer*, 1> logs_to_sync;
1185 bool need_log_dir_sync;
1186 uint64_t current_log_number;
1187
1188 {
1189 InstrumentedMutexLock l(&mutex_);
1190 assert(!logs_.empty());
1191
1192 // This SyncWAL() call only cares about logs up to this number.
1193 current_log_number = logfile_number_;
1194
1195 while (logs_.front().number <= current_log_number &&
1196 logs_.front().getting_synced) {
1197 log_sync_cv_.Wait();
1198 }
1199 // First check that logs are safe to sync in background.
1200 for (auto it = logs_.begin();
1201 it != logs_.end() && it->number <= current_log_number; ++it) {
1202 if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
1203 return Status::NotSupported(
1204 "SyncWAL() is not supported for this implementation of WAL file",
1205 immutable_db_options_.allow_mmap_writes
1206 ? "try setting Options::allow_mmap_writes to false"
1207 : Slice());
1208 }
1209 }
1210 for (auto it = logs_.begin();
1211 it != logs_.end() && it->number <= current_log_number; ++it) {
1212 auto& log = *it;
1213 assert(!log.getting_synced);
1214 log.getting_synced = true;
1215 logs_to_sync.push_back(log.writer);
1216 }
1217
1218 need_log_dir_sync = !log_dir_synced_;
1219 }
1220
1221 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
1222 RecordTick(stats_, WAL_FILE_SYNCED);
1223 Status status;
1224 IOStatus io_s;
1225 for (log::Writer* log : logs_to_sync) {
1226 io_s = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
1227 if (!io_s.ok()) {
1228 status = io_s;
1229 break;
1230 }
1231 }
1232 if (!io_s.ok()) {
1233 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL Sync error %s",
1234 io_s.ToString().c_str());
1235 // In case there is a fs error we should set it globally to prevent the
1236 // future writes
1237 IOStatusCheck(io_s);
1238 }
1239 if (status.ok() && need_log_dir_sync) {
1240 status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
1241 }
1242 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
1243
1244 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1245 {
1246 InstrumentedMutexLock l(&mutex_);
1247 MarkLogsSynced(current_log_number, need_log_dir_sync, status);
1248 }
1249 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
1250
1251 return status;
1252 }
1253
LockWAL()1254 Status DBImpl::LockWAL() {
1255 log_write_mutex_.Lock();
1256 auto cur_log_writer = logs_.back().writer;
1257 auto status = cur_log_writer->WriteBuffer();
1258 if (!status.ok()) {
1259 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1260 status.ToString().c_str());
1261 // In case there is a fs error we should set it globally to prevent the
1262 // future writes
1263 WriteStatusCheck(status);
1264 }
1265 return std::move(status);
1266 }
1267
UnlockWAL()1268 Status DBImpl::UnlockWAL() {
1269 log_write_mutex_.Unlock();
1270 return Status::OK();
1271 }
1272
MarkLogsSynced(uint64_t up_to,bool synced_dir,const Status & status)1273 void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
1274 const Status& status) {
1275 mutex_.AssertHeld();
1276 if (synced_dir && logfile_number_ == up_to && status.ok()) {
1277 log_dir_synced_ = true;
1278 }
1279 for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
1280 auto& log = *it;
1281 assert(log.getting_synced);
1282 if (status.ok() && logs_.size() > 1) {
1283 logs_to_free_.push_back(log.ReleaseWriter());
1284 // To modify logs_ both mutex_ and log_write_mutex_ must be held
1285 InstrumentedMutexLock l(&log_write_mutex_);
1286 it = logs_.erase(it);
1287 } else {
1288 log.getting_synced = false;
1289 ++it;
1290 }
1291 }
1292 assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
1293 (logs_.size() == 1 && !logs_[0].getting_synced));
1294 log_sync_cv_.SignalAll();
1295 }
1296
GetLatestSequenceNumber() const1297 SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1298 return versions_->LastSequence();
1299 }
1300
SetLastPublishedSequence(SequenceNumber seq)1301 void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
1302 versions_->SetLastPublishedSequence(seq);
1303 }
1304
SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)1305 bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
1306 if (seqnum > preserve_deletes_seqnum_.load()) {
1307 preserve_deletes_seqnum_.store(seqnum);
1308 return true;
1309 } else {
1310 return false;
1311 }
1312 }
1313
NewInternalIterator(Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence,ColumnFamilyHandle * column_family)1314 InternalIterator* DBImpl::NewInternalIterator(
1315 Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
1316 ColumnFamilyHandle* column_family) {
1317 ColumnFamilyData* cfd;
1318 if (column_family == nullptr) {
1319 cfd = default_cf_handle_->cfd();
1320 } else {
1321 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1322 cfd = cfh->cfd();
1323 }
1324
1325 mutex_.Lock();
1326 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
1327 mutex_.Unlock();
1328 ReadOptions roptions;
1329 return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
1330 sequence);
1331 }
1332
SchedulePurge()1333 void DBImpl::SchedulePurge() {
1334 mutex_.AssertHeld();
1335 assert(opened_successfully_);
1336
1337 // Purge operations are put into High priority queue
1338 bg_purge_scheduled_++;
1339 env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
1340 }
1341
BackgroundCallPurge()1342 void DBImpl::BackgroundCallPurge() {
1343 mutex_.Lock();
1344
1345 while (!logs_to_free_queue_.empty()) {
1346 assert(!logs_to_free_queue_.empty());
1347 log::Writer* log_writer = *(logs_to_free_queue_.begin());
1348 logs_to_free_queue_.pop_front();
1349 mutex_.Unlock();
1350 delete log_writer;
1351 mutex_.Lock();
1352 }
1353 while (!superversions_to_free_queue_.empty()) {
1354 assert(!superversions_to_free_queue_.empty());
1355 SuperVersion* sv = superversions_to_free_queue_.front();
1356 superversions_to_free_queue_.pop_front();
1357 mutex_.Unlock();
1358 delete sv;
1359 mutex_.Lock();
1360 }
1361
1362 // Can't use iterator to go over purge_files_ because inside the loop we're
1363 // unlocking the mutex that protects purge_files_.
1364 while (!purge_files_.empty()) {
1365 auto it = purge_files_.begin();
1366 // Need to make a copy of the PurgeFilesInfo before unlocking the mutex.
1367 PurgeFileInfo purge_file = it->second;
1368
1369 const std::string& fname = purge_file.fname;
1370 const std::string& dir_to_sync = purge_file.dir_to_sync;
1371 FileType type = purge_file.type;
1372 uint64_t number = purge_file.number;
1373 int job_id = purge_file.job_id;
1374
1375 purge_files_.erase(it);
1376
1377 mutex_.Unlock();
1378 DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
1379 mutex_.Lock();
1380 }
1381
1382 bg_purge_scheduled_--;
1383
1384 bg_cv_.SignalAll();
1385 // IMPORTANT:there should be no code after calling SignalAll. This call may
1386 // signal the DB destructor that it's OK to proceed with destruction. In
1387 // that case, all DB variables will be dealloacated and referencing them
1388 // will cause trouble.
1389 mutex_.Unlock();
1390 }
1391
1392 namespace {
1393 struct IterState {
IterStateROCKSDB_NAMESPACE::__anond311f2020411::IterState1394 IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
1395 bool _background_purge)
1396 : db(_db),
1397 mu(_mu),
1398 super_version(_super_version),
1399 background_purge(_background_purge) {}
1400
1401 DBImpl* db;
1402 InstrumentedMutex* mu;
1403 SuperVersion* super_version;
1404 bool background_purge;
1405 };
1406
CleanupIteratorState(void * arg1,void *)1407 static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
1408 IterState* state = reinterpret_cast<IterState*>(arg1);
1409
1410 if (state->super_version->Unref()) {
1411 // Job id == 0 means that this is not our background process, but rather
1412 // user thread
1413 JobContext job_context(0);
1414
1415 state->mu->Lock();
1416 state->super_version->Cleanup();
1417 state->db->FindObsoleteFiles(&job_context, false, true);
1418 if (state->background_purge) {
1419 state->db->ScheduleBgLogWriterClose(&job_context);
1420 state->db->AddSuperVersionsToFreeQueue(state->super_version);
1421 state->db->SchedulePurge();
1422 }
1423 state->mu->Unlock();
1424
1425 if (!state->background_purge) {
1426 delete state->super_version;
1427 }
1428 if (job_context.HaveSomethingToDelete()) {
1429 if (state->background_purge) {
1430 // PurgeObsoleteFiles here does not delete files. Instead, it adds the
1431 // files to be deleted to a job queue, and deletes it in a separate
1432 // background thread.
1433 state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
1434 state->mu->Lock();
1435 state->db->SchedulePurge();
1436 state->mu->Unlock();
1437 } else {
1438 state->db->PurgeObsoleteFiles(job_context);
1439 }
1440 }
1441 job_context.Clean();
1442 }
1443
1444 delete state;
1445 }
1446 } // namespace
1447
NewInternalIterator(const ReadOptions & read_options,ColumnFamilyData * cfd,SuperVersion * super_version,Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence)1448 InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1449 ColumnFamilyData* cfd,
1450 SuperVersion* super_version,
1451 Arena* arena,
1452 RangeDelAggregator* range_del_agg,
1453 SequenceNumber sequence) {
1454 InternalIterator* internal_iter;
1455 assert(arena != nullptr);
1456 assert(range_del_agg != nullptr);
1457 // Need to create internal iterator from the arena.
1458 MergeIteratorBuilder merge_iter_builder(
1459 &cfd->internal_comparator(), arena,
1460 !read_options.total_order_seek &&
1461 super_version->mutable_cf_options.prefix_extractor != nullptr);
1462 // Collect iterator for mutable mem
1463 merge_iter_builder.AddIterator(
1464 super_version->mem->NewIterator(read_options, arena));
1465 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
1466 Status s;
1467 if (!read_options.ignore_range_deletions) {
1468 range_del_iter.reset(
1469 super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
1470 range_del_agg->AddTombstones(std::move(range_del_iter));
1471 }
1472 // Collect all needed child iterators for immutable memtables
1473 if (s.ok()) {
1474 super_version->imm->AddIterators(read_options, &merge_iter_builder);
1475 if (!read_options.ignore_range_deletions) {
1476 s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
1477 range_del_agg);
1478 }
1479 }
1480 TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
1481 if (s.ok()) {
1482 // Collect iterators for files in L0 - Ln
1483 if (read_options.read_tier != kMemtableTier) {
1484 super_version->current->AddIterators(read_options, file_options_,
1485 &merge_iter_builder, range_del_agg);
1486 }
1487 internal_iter = merge_iter_builder.Finish();
1488 IterState* cleanup =
1489 new IterState(this, &mutex_, super_version,
1490 read_options.background_purge_on_iterator_cleanup ||
1491 immutable_db_options_.avoid_unnecessary_blocking_io);
1492 internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1493
1494 return internal_iter;
1495 } else {
1496 CleanupSuperVersion(super_version);
1497 }
1498 return NewErrorInternalIterator<Slice>(s, arena);
1499 }
1500
DefaultColumnFamily() const1501 ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
1502 return default_cf_handle_;
1503 }
1504
PersistentStatsColumnFamily() const1505 ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
1506 return persist_stats_cf_handle_;
1507 }
1508
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1509 Status DBImpl::Get(const ReadOptions& read_options,
1510 ColumnFamilyHandle* column_family, const Slice& key,
1511 PinnableSlice* value) {
1512 return Get(read_options, column_family, key, value, /*timestamp=*/nullptr);
1513 }
1514
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,std::string * timestamp)1515 Status DBImpl::Get(const ReadOptions& read_options,
1516 ColumnFamilyHandle* column_family, const Slice& key,
1517 PinnableSlice* value, std::string* timestamp) {
1518 GetImplOptions get_impl_options;
1519 get_impl_options.column_family = column_family;
1520 get_impl_options.value = value;
1521 get_impl_options.timestamp = timestamp;
1522 Status s = GetImpl(read_options, key, get_impl_options);
1523 return s;
1524 }
1525
GetImpl(const ReadOptions & read_options,const Slice & key,GetImplOptions & get_impl_options)1526 Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
1527 GetImplOptions& get_impl_options) {
1528 assert(get_impl_options.value != nullptr ||
1529 get_impl_options.merge_operands != nullptr);
1530 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
1531 StopWatch sw(env_, stats_, DB_GET);
1532 PERF_TIMER_GUARD(get_snapshot_time);
1533
1534 auto cfh =
1535 reinterpret_cast<ColumnFamilyHandleImpl*>(get_impl_options.column_family);
1536 auto cfd = cfh->cfd();
1537
1538 if (tracer_) {
1539 // TODO: This mutex should be removed later, to improve performance when
1540 // tracing is enabled.
1541 InstrumentedMutexLock lock(&trace_mutex_);
1542 if (tracer_) {
1543 tracer_->Get(get_impl_options.column_family, key);
1544 }
1545 }
1546
1547 // Acquire SuperVersion
1548 SuperVersion* sv = GetAndRefSuperVersion(cfd);
1549
1550 TEST_SYNC_POINT("DBImpl::GetImpl:1");
1551 TEST_SYNC_POINT("DBImpl::GetImpl:2");
1552
1553 SequenceNumber snapshot;
1554 if (read_options.snapshot != nullptr) {
1555 if (get_impl_options.callback) {
1556 // Already calculated based on read_options.snapshot
1557 snapshot = get_impl_options.callback->max_visible_seq();
1558 } else {
1559 snapshot =
1560 reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1561 }
1562 } else {
1563 // Note that the snapshot is assigned AFTER referencing the super
1564 // version because otherwise a flush happening in between may compact away
1565 // data for the snapshot, so the reader would see neither data that was be
1566 // visible to the snapshot before compaction nor the newer data inserted
1567 // afterwards.
1568 snapshot = last_seq_same_as_publish_seq_
1569 ? versions_->LastSequence()
1570 : versions_->LastPublishedSequence();
1571 if (get_impl_options.callback) {
1572 // The unprep_seqs are not published for write unprepared, so it could be
1573 // that max_visible_seq is larger. Seek to the std::max of the two.
1574 // However, we still want our callback to contain the actual snapshot so
1575 // that it can do the correct visibility filtering.
1576 get_impl_options.callback->Refresh(snapshot);
1577
1578 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
1579 // max_visible_seq = max(max_visible_seq, snapshot)
1580 //
1581 // Currently, the commented out assert is broken by
1582 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
1583 // the regular transaction flow, then this special read callback would not
1584 // be needed.
1585 //
1586 // assert(callback->max_visible_seq() >= snapshot);
1587 snapshot = get_impl_options.callback->max_visible_seq();
1588 }
1589 }
1590 TEST_SYNC_POINT("DBImpl::GetImpl:3");
1591 TEST_SYNC_POINT("DBImpl::GetImpl:4");
1592
1593 // Prepare to store a list of merge operations if merge occurs.
1594 MergeContext merge_context;
1595 SequenceNumber max_covering_tombstone_seq = 0;
1596
1597 Status s;
1598 // First look in the memtable, then in the immutable memtable (if any).
1599 // s is both in/out. When in, s could either be OK or MergeInProgress.
1600 // merge_operands will contain the sequence of merges in the latter case.
1601 LookupKey lkey(key, snapshot, read_options.timestamp);
1602 PERF_TIMER_STOP(get_snapshot_time);
1603
1604 bool skip_memtable = (read_options.read_tier == kPersistedTier &&
1605 has_unpersisted_data_.load(std::memory_order_relaxed));
1606 bool done = false;
1607 const Comparator* comparator =
1608 get_impl_options.column_family->GetComparator();
1609 size_t ts_sz = comparator->timestamp_size();
1610 std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr;
1611 if (!skip_memtable) {
1612 // Get value associated with key
1613 if (get_impl_options.get_value) {
1614 if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), timestamp, &s,
1615 &merge_context, &max_covering_tombstone_seq,
1616 read_options, get_impl_options.callback,
1617 get_impl_options.is_blob_index)) {
1618 done = true;
1619 get_impl_options.value->PinSelf();
1620 RecordTick(stats_, MEMTABLE_HIT);
1621 } else if ((s.ok() || s.IsMergeInProgress()) &&
1622 sv->imm->Get(lkey, get_impl_options.value->GetSelf(),
1623 timestamp, &s, &merge_context,
1624 &max_covering_tombstone_seq, read_options,
1625 get_impl_options.callback,
1626 get_impl_options.is_blob_index)) {
1627 done = true;
1628 get_impl_options.value->PinSelf();
1629 RecordTick(stats_, MEMTABLE_HIT);
1630 }
1631 } else {
1632 // Get Merge Operands associated with key, Merge Operands should not be
1633 // merged and raw values should be returned to the user.
1634 if (sv->mem->Get(lkey, /*value*/ nullptr, /*timestamp=*/nullptr, &s,
1635 &merge_context, &max_covering_tombstone_seq,
1636 read_options, nullptr, nullptr, false)) {
1637 done = true;
1638 RecordTick(stats_, MEMTABLE_HIT);
1639 } else if ((s.ok() || s.IsMergeInProgress()) &&
1640 sv->imm->GetMergeOperands(lkey, &s, &merge_context,
1641 &max_covering_tombstone_seq,
1642 read_options)) {
1643 done = true;
1644 RecordTick(stats_, MEMTABLE_HIT);
1645 }
1646 }
1647 if (!done && !s.ok() && !s.IsMergeInProgress()) {
1648 ReturnAndCleanupSuperVersion(cfd, sv);
1649 return s;
1650 }
1651 }
1652 if (!done) {
1653 PERF_TIMER_GUARD(get_from_output_files_time);
1654 sv->current->Get(
1655 read_options, lkey, get_impl_options.value, timestamp, &s,
1656 &merge_context, &max_covering_tombstone_seq,
1657 get_impl_options.get_value ? get_impl_options.value_found : nullptr,
1658 nullptr, nullptr,
1659 get_impl_options.get_value ? get_impl_options.callback : nullptr,
1660 get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
1661 get_impl_options.get_value);
1662 RecordTick(stats_, MEMTABLE_MISS);
1663 }
1664
1665 {
1666 PERF_TIMER_GUARD(get_post_process_time);
1667
1668 ReturnAndCleanupSuperVersion(cfd, sv);
1669
1670 RecordTick(stats_, NUMBER_KEYS_READ);
1671 size_t size = 0;
1672 if (s.ok()) {
1673 if (get_impl_options.get_value) {
1674 size = get_impl_options.value->size();
1675 } else {
1676 // Return all merge operands for get_impl_options.key
1677 *get_impl_options.number_of_operands =
1678 static_cast<int>(merge_context.GetNumOperands());
1679 if (*get_impl_options.number_of_operands >
1680 get_impl_options.get_merge_operands_options
1681 ->expected_max_number_of_operands) {
1682 s = Status::Incomplete(
1683 Status::SubCode::KMergeOperandsInsufficientCapacity);
1684 } else {
1685 for (const Slice& sl : merge_context.GetOperands()) {
1686 size += sl.size();
1687 get_impl_options.merge_operands->PinSelf(sl);
1688 get_impl_options.merge_operands++;
1689 }
1690 }
1691 }
1692 RecordTick(stats_, BYTES_READ, size);
1693 PERF_COUNTER_ADD(get_read_bytes, size);
1694 }
1695 RecordInHistogram(stats_, BYTES_PER_READ, size);
1696 }
1697 return s;
1698 }
1699
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)1700 std::vector<Status> DBImpl::MultiGet(
1701 const ReadOptions& read_options,
1702 const std::vector<ColumnFamilyHandle*>& column_family,
1703 const std::vector<Slice>& keys, std::vector<std::string>* values) {
1704 return MultiGet(read_options, column_family, keys, values,
1705 /*timestamps*/ nullptr);
1706 }
1707
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values,std::vector<std::string> * timestamps)1708 std::vector<Status> DBImpl::MultiGet(
1709 const ReadOptions& read_options,
1710 const std::vector<ColumnFamilyHandle*>& column_family,
1711 const std::vector<Slice>& keys, std::vector<std::string>* values,
1712 std::vector<std::string>* timestamps) {
1713 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
1714 StopWatch sw(env_, stats_, DB_MULTIGET);
1715 PERF_TIMER_GUARD(get_snapshot_time);
1716
1717 SequenceNumber consistent_seqnum;
1718
1719 std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
1720 column_family.size());
1721 for (auto cf : column_family) {
1722 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
1723 auto cfd = cfh->cfd();
1724 if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
1725 multiget_cf_data.emplace(cfd->GetID(),
1726 MultiGetColumnFamilyData(cfh, nullptr));
1727 }
1728 }
1729
1730 std::function<MultiGetColumnFamilyData*(
1731 std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&)>
1732 iter_deref_lambda =
1733 [](std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&
1734 cf_iter) { return &cf_iter->second; };
1735
1736 bool unref_only =
1737 MultiCFSnapshot<std::unordered_map<uint32_t, MultiGetColumnFamilyData>>(
1738 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
1739 &consistent_seqnum);
1740
1741 // Contain a list of merge operations if merge occurs.
1742 MergeContext merge_context;
1743
1744 // Note: this always resizes the values array
1745 size_t num_keys = keys.size();
1746 std::vector<Status> stat_list(num_keys);
1747 values->resize(num_keys);
1748 if (timestamps) {
1749 timestamps->resize(num_keys);
1750 }
1751
1752 // Keep track of bytes that we read for statistics-recording later
1753 uint64_t bytes_read = 0;
1754 PERF_TIMER_STOP(get_snapshot_time);
1755
1756 // For each of the given keys, apply the entire "get" process as follows:
1757 // First look in the memtable, then in the immutable memtable (if any).
1758 // s is both in/out. When in, s could either be OK or MergeInProgress.
1759 // merge_operands will contain the sequence of merges in the latter case.
1760 size_t num_found = 0;
1761 for (size_t i = 0; i < num_keys; ++i) {
1762 merge_context.Clear();
1763 Status& s = stat_list[i];
1764 std::string* value = &(*values)[i];
1765 std::string* timestamp = timestamps ? &(*timestamps)[i] : nullptr;
1766
1767 LookupKey lkey(keys[i], consistent_seqnum, read_options.timestamp);
1768 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
1769 SequenceNumber max_covering_tombstone_seq = 0;
1770 auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
1771 assert(mgd_iter != multiget_cf_data.end());
1772 auto mgd = mgd_iter->second;
1773 auto super_version = mgd.super_version;
1774 bool skip_memtable =
1775 (read_options.read_tier == kPersistedTier &&
1776 has_unpersisted_data_.load(std::memory_order_relaxed));
1777 bool done = false;
1778 if (!skip_memtable) {
1779 if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
1780 &max_covering_tombstone_seq, read_options)) {
1781 done = true;
1782 RecordTick(stats_, MEMTABLE_HIT);
1783 } else if (super_version->imm->Get(
1784 lkey, value, timestamp, &s, &merge_context,
1785 &max_covering_tombstone_seq, read_options)) {
1786 done = true;
1787 RecordTick(stats_, MEMTABLE_HIT);
1788 }
1789 }
1790 if (!done) {
1791 PinnableSlice pinnable_val;
1792 PERF_TIMER_GUARD(get_from_output_files_time);
1793 super_version->current->Get(read_options, lkey, &pinnable_val, timestamp,
1794 &s, &merge_context,
1795 &max_covering_tombstone_seq);
1796 value->assign(pinnable_val.data(), pinnable_val.size());
1797 RecordTick(stats_, MEMTABLE_MISS);
1798 }
1799
1800 if (s.ok()) {
1801 bytes_read += value->size();
1802 num_found++;
1803 }
1804 }
1805
1806 // Post processing (decrement reference counts and record statistics)
1807 PERF_TIMER_GUARD(get_post_process_time);
1808 autovector<SuperVersion*> superversions_to_delete;
1809
1810 for (auto mgd_iter : multiget_cf_data) {
1811 auto mgd = mgd_iter.second;
1812 if (!unref_only) {
1813 ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
1814 } else {
1815 mgd.cfd->GetSuperVersion()->Unref();
1816 }
1817 }
1818 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
1819 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
1820 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
1821 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
1822 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
1823 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
1824 PERF_TIMER_STOP(get_post_process_time);
1825
1826 return stat_list;
1827 }
1828
1829 template <class T>
MultiCFSnapshot(const ReadOptions & read_options,ReadCallback * callback,std::function<MultiGetColumnFamilyData * (typename T::iterator &)> & iter_deref_func,T * cf_list,SequenceNumber * snapshot)1830 bool DBImpl::MultiCFSnapshot(
1831 const ReadOptions& read_options, ReadCallback* callback,
1832 std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
1833 iter_deref_func,
1834 T* cf_list, SequenceNumber* snapshot) {
1835 PERF_TIMER_GUARD(get_snapshot_time);
1836
1837 bool last_try = false;
1838 if (cf_list->size() == 1) {
1839 // Fast path for a single column family. We can simply get the thread loca
1840 // super version
1841 auto cf_iter = cf_list->begin();
1842 auto node = iter_deref_func(cf_iter);
1843 node->super_version = GetAndRefSuperVersion(node->cfd);
1844 if (read_options.snapshot != nullptr) {
1845 // Note: In WritePrepared txns this is not necessary but not harmful
1846 // either. Because prep_seq > snapshot => commit_seq > snapshot so if
1847 // a snapshot is specified we should be fine with skipping seq numbers
1848 // that are greater than that.
1849 //
1850 // In WriteUnprepared, we cannot set snapshot in the lookup key because we
1851 // may skip uncommitted data that should be visible to the transaction for
1852 // reading own writes.
1853 *snapshot =
1854 static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1855 if (callback) {
1856 *snapshot = std::max(*snapshot, callback->max_visible_seq());
1857 }
1858 } else {
1859 // Since we get and reference the super version before getting
1860 // the snapshot number, without a mutex protection, it is possible
1861 // that a memtable switch happened in the middle and not all the
1862 // data for this snapshot is available. But it will contain all
1863 // the data available in the super version we have, which is also
1864 // a valid snapshot to read from.
1865 // We shouldn't get snapshot before finding and referencing the super
1866 // version because a flush happening in between may compact away data for
1867 // the snapshot, but the snapshot is earlier than the data overwriting it,
1868 // so users may see wrong results.
1869 *snapshot = last_seq_same_as_publish_seq_
1870 ? versions_->LastSequence()
1871 : versions_->LastPublishedSequence();
1872 }
1873 } else {
1874 // If we end up with the same issue of memtable geting sealed during 2
1875 // consecutive retries, it means the write rate is very high. In that case
1876 // its probably ok to take the mutex on the 3rd try so we can succeed for
1877 // sure
1878 static const int num_retries = 3;
1879 for (int i = 0; i < num_retries; ++i) {
1880 last_try = (i == num_retries - 1);
1881 bool retry = false;
1882
1883 if (i > 0) {
1884 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
1885 ++cf_iter) {
1886 auto node = iter_deref_func(cf_iter);
1887 SuperVersion* super_version = node->super_version;
1888 ColumnFamilyData* cfd = node->cfd;
1889 if (super_version != nullptr) {
1890 ReturnAndCleanupSuperVersion(cfd, super_version);
1891 }
1892 node->super_version = nullptr;
1893 }
1894 }
1895 if (read_options.snapshot == nullptr) {
1896 if (last_try) {
1897 TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
1898 // We're close to max number of retries. For the last retry,
1899 // acquire the lock so we're sure to succeed
1900 mutex_.Lock();
1901 }
1902 *snapshot = last_seq_same_as_publish_seq_
1903 ? versions_->LastSequence()
1904 : versions_->LastPublishedSequence();
1905 } else {
1906 *snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
1907 ->number_;
1908 }
1909 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
1910 ++cf_iter) {
1911 auto node = iter_deref_func(cf_iter);
1912 if (!last_try) {
1913 node->super_version = GetAndRefSuperVersion(node->cfd);
1914 } else {
1915 node->super_version = node->cfd->GetSuperVersion()->Ref();
1916 }
1917 TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
1918 if (read_options.snapshot != nullptr || last_try) {
1919 // If user passed a snapshot, then we don't care if a memtable is
1920 // sealed or compaction happens because the snapshot would ensure
1921 // that older key versions are kept around. If this is the last
1922 // retry, then we have the lock so nothing bad can happen
1923 continue;
1924 }
1925 // We could get the earliest sequence number for the whole list of
1926 // memtables, which will include immutable memtables as well, but that
1927 // might be tricky to maintain in case we decide, in future, to do
1928 // memtable compaction.
1929 if (!last_try) {
1930 SequenceNumber seq =
1931 node->super_version->mem->GetEarliestSequenceNumber();
1932 if (seq > *snapshot) {
1933 retry = true;
1934 break;
1935 }
1936 }
1937 }
1938 if (!retry) {
1939 if (last_try) {
1940 mutex_.Unlock();
1941 }
1942 break;
1943 }
1944 }
1945 }
1946
1947 // Keep track of bytes that we read for statistics-recording later
1948 PERF_TIMER_STOP(get_snapshot_time);
1949
1950 return last_try;
1951 }
1952
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)1953 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
1954 ColumnFamilyHandle** column_families, const Slice* keys,
1955 PinnableSlice* values, Status* statuses,
1956 const bool sorted_input) {
1957 return MultiGet(read_options, num_keys, column_families, keys, values,
1958 /*timestamps*/ nullptr, statuses, sorted_input);
1959 }
1960
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,std::string * timestamps,Status * statuses,const bool sorted_input)1961 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
1962 ColumnFamilyHandle** column_families, const Slice* keys,
1963 PinnableSlice* values, std::string* timestamps,
1964 Status* statuses, const bool sorted_input) {
1965 if (num_keys == 0) {
1966 return;
1967 }
1968 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
1969 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
1970 sorted_keys.resize(num_keys);
1971 for (size_t i = 0; i < num_keys; ++i) {
1972 key_context.emplace_back(column_families[i], keys[i], &values[i],
1973 ×tamps[i], &statuses[i]);
1974 }
1975 for (size_t i = 0; i < num_keys; ++i) {
1976 sorted_keys[i] = &key_context[i];
1977 }
1978 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
1979
1980 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
1981 multiget_cf_data;
1982 size_t cf_start = 0;
1983 ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
1984 for (size_t i = 0; i < num_keys; ++i) {
1985 KeyContext* key_ctx = sorted_keys[i];
1986 if (key_ctx->column_family != cf) {
1987 multiget_cf_data.emplace_back(
1988 MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
1989 cf_start = i;
1990 cf = key_ctx->column_family;
1991 }
1992 }
1993 {
1994 // multiget_cf_data.emplace_back(
1995 // MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
1996 multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
1997 }
1998 std::function<MultiGetColumnFamilyData*(
1999 autovector<MultiGetColumnFamilyData,
2000 MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
2001 iter_deref_lambda =
2002 [](autovector<MultiGetColumnFamilyData,
2003 MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
2004 return &(*cf_iter);
2005 };
2006
2007 SequenceNumber consistent_seqnum;
2008 bool unref_only = MultiCFSnapshot<
2009 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
2010 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
2011 &consistent_seqnum);
2012
2013 for (auto cf_iter = multiget_cf_data.begin();
2014 cf_iter != multiget_cf_data.end(); ++cf_iter) {
2015 MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys,
2016 cf_iter->super_version, consistent_seqnum, nullptr, nullptr);
2017 if (!unref_only) {
2018 ReturnAndCleanupSuperVersion(cf_iter->cfd, cf_iter->super_version);
2019 } else {
2020 cf_iter->cfd->GetSuperVersion()->Unref();
2021 }
2022 }
2023 }
2024
2025 namespace {
2026 // Order keys by CF ID, followed by key contents
2027 struct CompareKeyContext {
operator ()ROCKSDB_NAMESPACE::__anond311f2020711::CompareKeyContext2028 inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
2029 ColumnFamilyHandleImpl* cfh =
2030 static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2031 uint32_t cfd_id1 = cfh->cfd()->GetID();
2032 const Comparator* comparator = cfh->cfd()->user_comparator();
2033 cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2034 uint32_t cfd_id2 = cfh->cfd()->GetID();
2035
2036 if (cfd_id1 < cfd_id2) {
2037 return true;
2038 } else if (cfd_id1 > cfd_id2) {
2039 return false;
2040 }
2041
2042 // Both keys are from the same column family
2043 int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
2044 if (cmp < 0) {
2045 return true;
2046 }
2047 return false;
2048 }
2049 };
2050
2051 } // anonymous namespace
2052
PrepareMultiGetKeys(size_t num_keys,bool sorted_input,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2053 void DBImpl::PrepareMultiGetKeys(
2054 size_t num_keys, bool sorted_input,
2055 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2056 #ifndef NDEBUG
2057 if (sorted_input) {
2058 for (size_t index = 0; index < sorted_keys->size(); ++index) {
2059 if (index > 0) {
2060 KeyContext* lhs = (*sorted_keys)[index - 1];
2061 KeyContext* rhs = (*sorted_keys)[index];
2062 ColumnFamilyHandleImpl* cfh =
2063 reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2064 uint32_t cfd_id1 = cfh->cfd()->GetID();
2065 const Comparator* comparator = cfh->cfd()->user_comparator();
2066 cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2067 uint32_t cfd_id2 = cfh->cfd()->GetID();
2068
2069 assert(cfd_id1 <= cfd_id2);
2070 if (cfd_id1 < cfd_id2) {
2071 continue;
2072 }
2073
2074 // Both keys are from the same column family
2075 int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
2076 assert(cmp <= 0);
2077 }
2078 index++;
2079 }
2080 }
2081 #endif
2082 if (!sorted_input) {
2083 CompareKeyContext sort_comparator;
2084 std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
2085 sort_comparator);
2086 }
2087 }
2088
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)2089 void DBImpl::MultiGet(const ReadOptions& read_options,
2090 ColumnFamilyHandle* column_family, const size_t num_keys,
2091 const Slice* keys, PinnableSlice* values,
2092 Status* statuses, const bool sorted_input) {
2093 return MultiGet(read_options, column_family, num_keys, keys, values,
2094 /*timestamp=*/nullptr, statuses, sorted_input);
2095 }
2096
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,std::string * timestamps,Status * statuses,const bool sorted_input)2097 void DBImpl::MultiGet(const ReadOptions& read_options,
2098 ColumnFamilyHandle* column_family, const size_t num_keys,
2099 const Slice* keys, PinnableSlice* values,
2100 std::string* timestamps, Status* statuses,
2101 const bool sorted_input) {
2102 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2103 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2104 sorted_keys.resize(num_keys);
2105 for (size_t i = 0; i < num_keys; ++i) {
2106 key_context.emplace_back(column_family, keys[i], &values[i],
2107 timestamps ? ×tamps[i] : nullptr,
2108 &statuses[i]);
2109 }
2110 for (size_t i = 0; i < num_keys; ++i) {
2111 sorted_keys[i] = &key_context[i];
2112 }
2113 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2114 MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
2115 }
2116
MultiGetWithCallback(const ReadOptions & read_options,ColumnFamilyHandle * column_family,ReadCallback * callback,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2117 void DBImpl::MultiGetWithCallback(
2118 const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2119 ReadCallback* callback,
2120 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2121 std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
2122 multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
2123 std::function<MultiGetColumnFamilyData*(
2124 std::array<MultiGetColumnFamilyData, 1>::iterator&)>
2125 iter_deref_lambda =
2126 [](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
2127 return &(*cf_iter);
2128 };
2129
2130 size_t num_keys = sorted_keys->size();
2131 SequenceNumber consistent_seqnum;
2132 bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
2133 read_options, callback, iter_deref_lambda, &multiget_cf_data,
2134 &consistent_seqnum);
2135 #ifndef NDEBUG
2136 assert(!unref_only);
2137 #else
2138 // Silence unused variable warning
2139 (void)unref_only;
2140 #endif // NDEBUG
2141
2142 if (callback && read_options.snapshot == nullptr) {
2143 // The unprep_seqs are not published for write unprepared, so it could be
2144 // that max_visible_seq is larger. Seek to the std::max of the two.
2145 // However, we still want our callback to contain the actual snapshot so
2146 // that it can do the correct visibility filtering.
2147 callback->Refresh(consistent_seqnum);
2148
2149 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
2150 // max_visible_seq = max(max_visible_seq, snapshot)
2151 //
2152 // Currently, the commented out assert is broken by
2153 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
2154 // the regular transaction flow, then this special read callback would not
2155 // be needed.
2156 //
2157 // assert(callback->max_visible_seq() >= snapshot);
2158 consistent_seqnum = callback->max_visible_seq();
2159 }
2160
2161 MultiGetImpl(read_options, 0, num_keys, sorted_keys,
2162 multiget_cf_data[0].super_version, consistent_seqnum, nullptr,
2163 nullptr);
2164 ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
2165 multiget_cf_data[0].super_version);
2166 }
2167
MultiGetImpl(const ReadOptions & read_options,size_t start_key,size_t num_keys,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys,SuperVersion * super_version,SequenceNumber snapshot,ReadCallback * callback,bool * is_blob_index)2168 void DBImpl::MultiGetImpl(
2169 const ReadOptions& read_options, size_t start_key, size_t num_keys,
2170 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2171 SuperVersion* super_version, SequenceNumber snapshot,
2172 ReadCallback* callback, bool* is_blob_index) {
2173 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
2174 StopWatch sw(env_, stats_, DB_MULTIGET);
2175
2176 // For each of the given keys, apply the entire "get" process as follows:
2177 // First look in the memtable, then in the immutable memtable (if any).
2178 // s is both in/out. When in, s could either be OK or MergeInProgress.
2179 // merge_operands will contain the sequence of merges in the latter case.
2180 size_t keys_left = num_keys;
2181 while (keys_left) {
2182 size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
2183 ? MultiGetContext::MAX_BATCH_SIZE
2184 : keys_left;
2185 MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
2186 batch_size, snapshot);
2187 MultiGetRange range = ctx.GetMultiGetRange();
2188 bool lookup_current = false;
2189
2190 keys_left -= batch_size;
2191 for (auto mget_iter = range.begin(); mget_iter != range.end();
2192 ++mget_iter) {
2193 mget_iter->merge_context.Clear();
2194 *mget_iter->s = Status::OK();
2195 }
2196
2197 bool skip_memtable =
2198 (read_options.read_tier == kPersistedTier &&
2199 has_unpersisted_data_.load(std::memory_order_relaxed));
2200 if (!skip_memtable) {
2201 super_version->mem->MultiGet(read_options, &range, callback,
2202 is_blob_index);
2203 if (!range.empty()) {
2204 super_version->imm->MultiGet(read_options, &range, callback,
2205 is_blob_index);
2206 }
2207 if (!range.empty()) {
2208 lookup_current = true;
2209 uint64_t left = range.KeysLeft();
2210 RecordTick(stats_, MEMTABLE_MISS, left);
2211 }
2212 }
2213 if (lookup_current) {
2214 PERF_TIMER_GUARD(get_from_output_files_time);
2215 super_version->current->MultiGet(read_options, &range, callback,
2216 is_blob_index);
2217 }
2218 }
2219
2220 // Post processing (decrement reference counts and record statistics)
2221 PERF_TIMER_GUARD(get_post_process_time);
2222 size_t num_found = 0;
2223 uint64_t bytes_read = 0;
2224 for (size_t i = start_key; i < start_key + num_keys; ++i) {
2225 KeyContext* key = (*sorted_keys)[i];
2226 if (key->s->ok()) {
2227 bytes_read += key->value->size();
2228 num_found++;
2229 }
2230 }
2231
2232 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2233 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
2234 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
2235 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
2236 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
2237 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
2238 PERF_TIMER_STOP(get_post_process_time);
2239 }
2240
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const std::string & column_family,ColumnFamilyHandle ** handle)2241 Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
2242 const std::string& column_family,
2243 ColumnFamilyHandle** handle) {
2244 assert(handle != nullptr);
2245 Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
2246 if (s.ok()) {
2247 s = WriteOptionsFile(true /*need_mutex_lock*/,
2248 true /*need_enter_write_thread*/);
2249 }
2250 return s;
2251 }
2252
CreateColumnFamilies(const ColumnFamilyOptions & cf_options,const std::vector<std::string> & column_family_names,std::vector<ColumnFamilyHandle * > * handles)2253 Status DBImpl::CreateColumnFamilies(
2254 const ColumnFamilyOptions& cf_options,
2255 const std::vector<std::string>& column_family_names,
2256 std::vector<ColumnFamilyHandle*>* handles) {
2257 assert(handles != nullptr);
2258 handles->clear();
2259 size_t num_cf = column_family_names.size();
2260 Status s;
2261 bool success_once = false;
2262 for (size_t i = 0; i < num_cf; i++) {
2263 ColumnFamilyHandle* handle;
2264 s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
2265 if (!s.ok()) {
2266 break;
2267 }
2268 handles->push_back(handle);
2269 success_once = true;
2270 }
2271 if (success_once) {
2272 Status persist_options_status = WriteOptionsFile(
2273 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2274 if (s.ok() && !persist_options_status.ok()) {
2275 s = persist_options_status;
2276 }
2277 }
2278 return s;
2279 }
2280
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles)2281 Status DBImpl::CreateColumnFamilies(
2282 const std::vector<ColumnFamilyDescriptor>& column_families,
2283 std::vector<ColumnFamilyHandle*>* handles) {
2284 assert(handles != nullptr);
2285 handles->clear();
2286 size_t num_cf = column_families.size();
2287 Status s;
2288 bool success_once = false;
2289 for (size_t i = 0; i < num_cf; i++) {
2290 ColumnFamilyHandle* handle;
2291 s = CreateColumnFamilyImpl(column_families[i].options,
2292 column_families[i].name, &handle);
2293 if (!s.ok()) {
2294 break;
2295 }
2296 handles->push_back(handle);
2297 success_once = true;
2298 }
2299 if (success_once) {
2300 Status persist_options_status = WriteOptionsFile(
2301 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2302 if (s.ok() && !persist_options_status.ok()) {
2303 s = persist_options_status;
2304 }
2305 }
2306 return s;
2307 }
2308
CreateColumnFamilyImpl(const ColumnFamilyOptions & cf_options,const std::string & column_family_name,ColumnFamilyHandle ** handle)2309 Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
2310 const std::string& column_family_name,
2311 ColumnFamilyHandle** handle) {
2312 Status s;
2313 Status persist_options_status;
2314 *handle = nullptr;
2315
2316 DBOptions db_options =
2317 BuildDBOptions(immutable_db_options_, mutable_db_options_);
2318 s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
2319 if (s.ok()) {
2320 for (auto& cf_path : cf_options.cf_paths) {
2321 s = env_->CreateDirIfMissing(cf_path.path);
2322 if (!s.ok()) {
2323 break;
2324 }
2325 }
2326 }
2327 if (!s.ok()) {
2328 return s;
2329 }
2330
2331 SuperVersionContext sv_context(/* create_superversion */ true);
2332 {
2333 InstrumentedMutexLock l(&mutex_);
2334
2335 if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
2336 nullptr) {
2337 return Status::InvalidArgument("Column family already exists");
2338 }
2339 VersionEdit edit;
2340 edit.AddColumnFamily(column_family_name);
2341 uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
2342 edit.SetColumnFamily(new_id);
2343 edit.SetLogNumber(logfile_number_);
2344 edit.SetComparatorName(cf_options.comparator->Name());
2345
2346 // LogAndApply will both write the creation in MANIFEST and create
2347 // ColumnFamilyData object
2348 { // write thread
2349 WriteThread::Writer w;
2350 write_thread_.EnterUnbatched(&w, &mutex_);
2351 // LogAndApply will both write the creation in MANIFEST and create
2352 // ColumnFamilyData object
2353 s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
2354 &mutex_, directories_.GetDbDir(), false,
2355 &cf_options);
2356 write_thread_.ExitUnbatched(&w);
2357 }
2358 if (s.ok()) {
2359 auto* cfd =
2360 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2361 assert(cfd != nullptr);
2362 std::map<std::string, std::shared_ptr<FSDirectory>> dummy_created_dirs;
2363 s = cfd->AddDirectories(&dummy_created_dirs);
2364 }
2365 if (s.ok()) {
2366 single_column_family_mode_ = false;
2367 auto* cfd =
2368 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2369 assert(cfd != nullptr);
2370 InstallSuperVersionAndScheduleWork(cfd, &sv_context,
2371 *cfd->GetLatestMutableCFOptions());
2372
2373 if (!cfd->mem()->IsSnapshotSupported()) {
2374 is_snapshot_supported_ = false;
2375 }
2376
2377 cfd->set_initialized();
2378
2379 *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
2380 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2381 "Created column family [%s] (ID %u)",
2382 column_family_name.c_str(), (unsigned)cfd->GetID());
2383 } else {
2384 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2385 "Creating column family [%s] FAILED -- %s",
2386 column_family_name.c_str(), s.ToString().c_str());
2387 }
2388 } // InstrumentedMutexLock l(&mutex_)
2389
2390 sv_context.Clean();
2391 // this is outside the mutex
2392 if (s.ok()) {
2393 NewThreadStatusCfInfo(
2394 reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
2395 }
2396 return s;
2397 }
2398
DropColumnFamily(ColumnFamilyHandle * column_family)2399 Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
2400 assert(column_family != nullptr);
2401 Status s = DropColumnFamilyImpl(column_family);
2402 if (s.ok()) {
2403 s = WriteOptionsFile(true /*need_mutex_lock*/,
2404 true /*need_enter_write_thread*/);
2405 }
2406 return s;
2407 }
2408
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > & column_families)2409 Status DBImpl::DropColumnFamilies(
2410 const std::vector<ColumnFamilyHandle*>& column_families) {
2411 Status s;
2412 bool success_once = false;
2413 for (auto* handle : column_families) {
2414 s = DropColumnFamilyImpl(handle);
2415 if (!s.ok()) {
2416 break;
2417 }
2418 success_once = true;
2419 }
2420 if (success_once) {
2421 Status persist_options_status = WriteOptionsFile(
2422 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2423 if (s.ok() && !persist_options_status.ok()) {
2424 s = persist_options_status;
2425 }
2426 }
2427 return s;
2428 }
2429
DropColumnFamilyImpl(ColumnFamilyHandle * column_family)2430 Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
2431 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2432 auto cfd = cfh->cfd();
2433 if (cfd->GetID() == 0) {
2434 return Status::InvalidArgument("Can't drop default column family");
2435 }
2436
2437 bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
2438
2439 VersionEdit edit;
2440 edit.DropColumnFamily();
2441 edit.SetColumnFamily(cfd->GetID());
2442
2443 Status s;
2444 {
2445 InstrumentedMutexLock l(&mutex_);
2446 if (cfd->IsDropped()) {
2447 s = Status::InvalidArgument("Column family already dropped!\n");
2448 }
2449 if (s.ok()) {
2450 // we drop column family from a single write thread
2451 WriteThread::Writer w;
2452 write_thread_.EnterUnbatched(&w, &mutex_);
2453 s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
2454 &mutex_);
2455 write_thread_.ExitUnbatched(&w);
2456 }
2457 if (s.ok()) {
2458 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2459 max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
2460 mutable_cf_options->max_write_buffer_number;
2461 }
2462
2463 if (!cf_support_snapshot) {
2464 // Dropped Column Family doesn't support snapshot. Need to recalculate
2465 // is_snapshot_supported_.
2466 bool new_is_snapshot_supported = true;
2467 for (auto c : *versions_->GetColumnFamilySet()) {
2468 if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
2469 new_is_snapshot_supported = false;
2470 break;
2471 }
2472 }
2473 is_snapshot_supported_ = new_is_snapshot_supported;
2474 }
2475 bg_cv_.SignalAll();
2476 }
2477
2478 if (s.ok()) {
2479 // Note that here we erase the associated cf_info of the to-be-dropped
2480 // cfd before its ref-count goes to zero to avoid having to erase cf_info
2481 // later inside db_mutex.
2482 EraseThreadStatusCfInfo(cfd);
2483 assert(cfd->IsDropped());
2484 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2485 "Dropped column family with id %u\n", cfd->GetID());
2486 } else {
2487 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2488 "Dropping column family with id %u FAILED -- %s\n",
2489 cfd->GetID(), s.ToString().c_str());
2490 }
2491
2492 return s;
2493 }
2494
KeyMayExist(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,std::string * timestamp,bool * value_found)2495 bool DBImpl::KeyMayExist(const ReadOptions& read_options,
2496 ColumnFamilyHandle* column_family, const Slice& key,
2497 std::string* value, std::string* timestamp,
2498 bool* value_found) {
2499 assert(value != nullptr);
2500 if (value_found != nullptr) {
2501 // falsify later if key-may-exist but can't fetch value
2502 *value_found = true;
2503 }
2504 ReadOptions roptions = read_options;
2505 roptions.read_tier = kBlockCacheTier; // read from block cache only
2506 PinnableSlice pinnable_val;
2507 GetImplOptions get_impl_options;
2508 get_impl_options.column_family = column_family;
2509 get_impl_options.value = &pinnable_val;
2510 get_impl_options.value_found = value_found;
2511 get_impl_options.timestamp = timestamp;
2512 auto s = GetImpl(roptions, key, get_impl_options);
2513 value->assign(pinnable_val.data(), pinnable_val.size());
2514
2515 // If block_cache is enabled and the index block of the table didn't
2516 // not present in block_cache, the return value will be Status::Incomplete.
2517 // In this case, key may still exist in the table.
2518 return s.ok() || s.IsIncomplete();
2519 }
2520
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)2521 Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
2522 ColumnFamilyHandle* column_family) {
2523 if (read_options.managed) {
2524 return NewErrorIterator(
2525 Status::NotSupported("Managed iterator is not supported anymore."));
2526 }
2527 Iterator* result = nullptr;
2528 if (read_options.read_tier == kPersistedTier) {
2529 return NewErrorIterator(Status::NotSupported(
2530 "ReadTier::kPersistedData is not yet supported in iterators."));
2531 }
2532 // if iterator wants internal keys, we can only proceed if
2533 // we can guarantee the deletes haven't been processed yet
2534 if (immutable_db_options_.preserve_deletes &&
2535 read_options.iter_start_seqnum > 0 &&
2536 read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
2537 return NewErrorIterator(Status::InvalidArgument(
2538 "Iterator requested internal keys which are too old and are not"
2539 " guaranteed to be preserved, try larger iter_start_seqnum opt."));
2540 }
2541 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2542 auto cfd = cfh->cfd();
2543 ReadCallback* read_callback = nullptr; // No read callback provided.
2544 if (read_options.tailing) {
2545 #ifdef ROCKSDB_LITE
2546 // not supported in lite version
2547 result = nullptr;
2548
2549 #else
2550 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2551 auto iter = new ForwardIterator(this, read_options, cfd, sv);
2552 result = NewDBIterator(
2553 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2554 cfd->user_comparator(), iter, kMaxSequenceNumber,
2555 sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
2556 this, cfd);
2557 #endif
2558 } else {
2559 // Note: no need to consider the special case of
2560 // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
2561 // WritePreparedTxnDB
2562 auto snapshot = read_options.snapshot != nullptr
2563 ? read_options.snapshot->GetSequenceNumber()
2564 : versions_->LastSequence();
2565 result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
2566 }
2567 return result;
2568 }
2569
NewIteratorImpl(const ReadOptions & read_options,ColumnFamilyData * cfd,SequenceNumber snapshot,ReadCallback * read_callback,bool allow_blob,bool allow_refresh)2570 ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
2571 ColumnFamilyData* cfd,
2572 SequenceNumber snapshot,
2573 ReadCallback* read_callback,
2574 bool allow_blob,
2575 bool allow_refresh) {
2576 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2577
2578 // Try to generate a DB iterator tree in continuous memory area to be
2579 // cache friendly. Here is an example of result:
2580 // +-------------------------------+
2581 // | |
2582 // | ArenaWrappedDBIter |
2583 // | + |
2584 // | +---> Inner Iterator ------------+
2585 // | | | |
2586 // | | +-- -- -- -- -- -- -- --+ |
2587 // | +--- | Arena | |
2588 // | | | |
2589 // | Allocated Memory: | |
2590 // | | +-------------------+ |
2591 // | | | DBIter | <---+
2592 // | | + |
2593 // | | | +-> iter_ ------------+
2594 // | | | | |
2595 // | | +-------------------+ |
2596 // | | | MergingIterator | <---+
2597 // | | + |
2598 // | | | +->child iter1 ------------+
2599 // | | | | | |
2600 // | | +->child iter2 ----------+ |
2601 // | | | | | | |
2602 // | | | +->child iter3 --------+ | |
2603 // | | | | | |
2604 // | | +-------------------+ | | |
2605 // | | | Iterator1 | <--------+
2606 // | | +-------------------+ | |
2607 // | | | Iterator2 | <------+
2608 // | | +-------------------+ |
2609 // | | | Iterator3 | <----+
2610 // | | +-------------------+
2611 // | | |
2612 // +-------+-----------------------+
2613 //
2614 // ArenaWrappedDBIter inlines an arena area where all the iterators in
2615 // the iterator tree are allocated in the order of being accessed when
2616 // querying.
2617 // Laying out the iterators in the order of being accessed makes it more
2618 // likely that any iterator pointer is close to the iterator it points to so
2619 // that they are likely to be in the same cache line and/or page.
2620 ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2621 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
2622 sv->mutable_cf_options.max_sequential_skip_in_iterations,
2623 sv->version_number, read_callback, this, cfd, allow_blob,
2624 read_options.snapshot != nullptr ? false : allow_refresh);
2625
2626 InternalIterator* internal_iter =
2627 NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
2628 db_iter->GetRangeDelAggregator(), snapshot);
2629 db_iter->SetIterUnderDBIter(internal_iter);
2630
2631 return db_iter;
2632 }
2633
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)2634 Status DBImpl::NewIterators(
2635 const ReadOptions& read_options,
2636 const std::vector<ColumnFamilyHandle*>& column_families,
2637 std::vector<Iterator*>* iterators) {
2638 if (read_options.managed) {
2639 return Status::NotSupported("Managed iterator is not supported anymore.");
2640 }
2641 if (read_options.read_tier == kPersistedTier) {
2642 return Status::NotSupported(
2643 "ReadTier::kPersistedData is not yet supported in iterators.");
2644 }
2645 ReadCallback* read_callback = nullptr; // No read callback provided.
2646 iterators->clear();
2647 iterators->reserve(column_families.size());
2648 if (read_options.tailing) {
2649 #ifdef ROCKSDB_LITE
2650 return Status::InvalidArgument(
2651 "Tailing iterator not supported in RocksDB lite");
2652 #else
2653 for (auto cfh : column_families) {
2654 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
2655 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2656 auto iter = new ForwardIterator(this, read_options, cfd, sv);
2657 iterators->push_back(NewDBIterator(
2658 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2659 cfd->user_comparator(), iter, kMaxSequenceNumber,
2660 sv->mutable_cf_options.max_sequential_skip_in_iterations,
2661 read_callback, this, cfd));
2662 }
2663 #endif
2664 } else {
2665 // Note: no need to consider the special case of
2666 // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
2667 // WritePreparedTxnDB
2668 auto snapshot = read_options.snapshot != nullptr
2669 ? read_options.snapshot->GetSequenceNumber()
2670 : versions_->LastSequence();
2671 for (size_t i = 0; i < column_families.size(); ++i) {
2672 auto* cfd =
2673 reinterpret_cast<ColumnFamilyHandleImpl*>(column_families[i])->cfd();
2674 iterators->push_back(
2675 NewIteratorImpl(read_options, cfd, snapshot, read_callback));
2676 }
2677 }
2678
2679 return Status::OK();
2680 }
2681
GetSnapshot()2682 const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
2683
2684 #ifndef ROCKSDB_LITE
GetSnapshotForWriteConflictBoundary()2685 const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
2686 return GetSnapshotImpl(true);
2687 }
2688 #endif // ROCKSDB_LITE
2689
GetSnapshotImpl(bool is_write_conflict_boundary,bool lock)2690 SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
2691 bool lock) {
2692 int64_t unix_time = 0;
2693 env_->GetCurrentTime(&unix_time); // Ignore error
2694 SnapshotImpl* s = new SnapshotImpl;
2695
2696 if (lock) {
2697 mutex_.Lock();
2698 }
2699 // returns null if the underlying memtable does not support snapshot.
2700 if (!is_snapshot_supported_) {
2701 if (lock) {
2702 mutex_.Unlock();
2703 }
2704 delete s;
2705 return nullptr;
2706 }
2707 auto snapshot_seq = last_seq_same_as_publish_seq_
2708 ? versions_->LastSequence()
2709 : versions_->LastPublishedSequence();
2710 SnapshotImpl* snapshot =
2711 snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
2712 if (lock) {
2713 mutex_.Unlock();
2714 }
2715 return snapshot;
2716 }
2717
2718 namespace {
2719 typedef autovector<ColumnFamilyData*, 2> CfdList;
CfdListContains(const CfdList & list,ColumnFamilyData * cfd)2720 bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
2721 for (const ColumnFamilyData* t : list) {
2722 if (t == cfd) {
2723 return true;
2724 }
2725 }
2726 return false;
2727 }
2728 } // namespace
2729
ReleaseSnapshot(const Snapshot * s)2730 void DBImpl::ReleaseSnapshot(const Snapshot* s) {
2731 const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
2732 {
2733 InstrumentedMutexLock l(&mutex_);
2734 snapshots_.Delete(casted_s);
2735 uint64_t oldest_snapshot;
2736 if (snapshots_.empty()) {
2737 oldest_snapshot = last_seq_same_as_publish_seq_
2738 ? versions_->LastSequence()
2739 : versions_->LastPublishedSequence();
2740 } else {
2741 oldest_snapshot = snapshots_.oldest()->number_;
2742 }
2743 // Avoid to go through every column family by checking a global threshold
2744 // first.
2745 if (oldest_snapshot > bottommost_files_mark_threshold_) {
2746 CfdList cf_scheduled;
2747 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2748 cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
2749 if (!cfd->current()
2750 ->storage_info()
2751 ->BottommostFilesMarkedForCompaction()
2752 .empty()) {
2753 SchedulePendingCompaction(cfd);
2754 MaybeScheduleFlushOrCompaction();
2755 cf_scheduled.push_back(cfd);
2756 }
2757 }
2758
2759 // Calculate a new threshold, skipping those CFs where compactions are
2760 // scheduled. We do not do the same pass as the previous loop because
2761 // mutex might be unlocked during the loop, making the result inaccurate.
2762 SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
2763 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2764 if (CfdListContains(cf_scheduled, cfd)) {
2765 continue;
2766 }
2767 new_bottommost_files_mark_threshold = std::min(
2768 new_bottommost_files_mark_threshold,
2769 cfd->current()->storage_info()->bottommost_files_mark_threshold());
2770 }
2771 bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
2772 }
2773 }
2774 delete casted_s;
2775 }
2776
2777 #ifndef ROCKSDB_LITE
GetPropertiesOfAllTables(ColumnFamilyHandle * column_family,TablePropertiesCollection * props)2778 Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
2779 TablePropertiesCollection* props) {
2780 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2781 auto cfd = cfh->cfd();
2782
2783 // Increment the ref count
2784 mutex_.Lock();
2785 auto version = cfd->current();
2786 version->Ref();
2787 mutex_.Unlock();
2788
2789 auto s = version->GetPropertiesOfAllTables(props);
2790
2791 // Decrement the ref count
2792 mutex_.Lock();
2793 version->Unref();
2794 mutex_.Unlock();
2795
2796 return s;
2797 }
2798
GetPropertiesOfTablesInRange(ColumnFamilyHandle * column_family,const Range * range,std::size_t n,TablePropertiesCollection * props)2799 Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
2800 const Range* range, std::size_t n,
2801 TablePropertiesCollection* props) {
2802 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2803 auto cfd = cfh->cfd();
2804
2805 // Increment the ref count
2806 mutex_.Lock();
2807 auto version = cfd->current();
2808 version->Ref();
2809 mutex_.Unlock();
2810
2811 auto s = version->GetPropertiesOfTablesInRange(range, n, props);
2812
2813 // Decrement the ref count
2814 mutex_.Lock();
2815 version->Unref();
2816 mutex_.Unlock();
2817
2818 return s;
2819 }
2820
2821 #endif // ROCKSDB_LITE
2822
GetName() const2823 const std::string& DBImpl::GetName() const { return dbname_; }
2824
GetEnv() const2825 Env* DBImpl::GetEnv() const { return env_; }
2826
GetFileSystem() const2827 FileSystem* DB::GetFileSystem() const {
2828 static LegacyFileSystemWrapper fs_wrap(GetEnv());
2829 return &fs_wrap;
2830 }
2831
GetFileSystem() const2832 FileSystem* DBImpl::GetFileSystem() const {
2833 return immutable_db_options_.fs.get();
2834 }
2835
GetOptions(ColumnFamilyHandle * column_family) const2836 Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
2837 InstrumentedMutexLock l(&mutex_);
2838 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2839 return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
2840 cfh->cfd()->GetLatestCFOptions());
2841 }
2842
GetDBOptions() const2843 DBOptions DBImpl::GetDBOptions() const {
2844 InstrumentedMutexLock l(&mutex_);
2845 return BuildDBOptions(immutable_db_options_, mutable_db_options_);
2846 }
2847
GetProperty(ColumnFamilyHandle * column_family,const Slice & property,std::string * value)2848 bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
2849 const Slice& property, std::string* value) {
2850 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2851 value->clear();
2852 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2853 if (property_info == nullptr) {
2854 return false;
2855 } else if (property_info->handle_int) {
2856 uint64_t int_value;
2857 bool ret_value =
2858 GetIntPropertyInternal(cfd, *property_info, false, &int_value);
2859 if (ret_value) {
2860 *value = ToString(int_value);
2861 }
2862 return ret_value;
2863 } else if (property_info->handle_string) {
2864 InstrumentedMutexLock l(&mutex_);
2865 return cfd->internal_stats()->GetStringProperty(*property_info, property,
2866 value);
2867 } else if (property_info->handle_string_dbimpl) {
2868 std::string tmp_value;
2869 bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
2870 if (ret_value) {
2871 *value = tmp_value;
2872 }
2873 return ret_value;
2874 }
2875 // Shouldn't reach here since exactly one of handle_string and handle_int
2876 // should be non-nullptr.
2877 assert(false);
2878 return false;
2879 }
2880
GetMapProperty(ColumnFamilyHandle * column_family,const Slice & property,std::map<std::string,std::string> * value)2881 bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
2882 const Slice& property,
2883 std::map<std::string, std::string>* value) {
2884 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2885 value->clear();
2886 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2887 if (property_info == nullptr) {
2888 return false;
2889 } else if (property_info->handle_map) {
2890 InstrumentedMutexLock l(&mutex_);
2891 return cfd->internal_stats()->GetMapProperty(*property_info, property,
2892 value);
2893 }
2894 // If we reach this point it means that handle_map is not provided for the
2895 // requested property
2896 return false;
2897 }
2898
GetIntProperty(ColumnFamilyHandle * column_family,const Slice & property,uint64_t * value)2899 bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
2900 const Slice& property, uint64_t* value) {
2901 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2902 if (property_info == nullptr || property_info->handle_int == nullptr) {
2903 return false;
2904 }
2905 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2906 return GetIntPropertyInternal(cfd, *property_info, false, value);
2907 }
2908
GetIntPropertyInternal(ColumnFamilyData * cfd,const DBPropertyInfo & property_info,bool is_locked,uint64_t * value)2909 bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
2910 const DBPropertyInfo& property_info,
2911 bool is_locked, uint64_t* value) {
2912 assert(property_info.handle_int != nullptr);
2913 if (!property_info.need_out_of_mutex) {
2914 if (is_locked) {
2915 mutex_.AssertHeld();
2916 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2917 } else {
2918 InstrumentedMutexLock l(&mutex_);
2919 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2920 }
2921 } else {
2922 SuperVersion* sv = nullptr;
2923 if (!is_locked) {
2924 sv = GetAndRefSuperVersion(cfd);
2925 } else {
2926 sv = cfd->GetSuperVersion();
2927 }
2928
2929 bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
2930 property_info, sv->current, value);
2931
2932 if (!is_locked) {
2933 ReturnAndCleanupSuperVersion(cfd, sv);
2934 }
2935
2936 return ret;
2937 }
2938 }
2939
GetPropertyHandleOptionsStatistics(std::string * value)2940 bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
2941 assert(value != nullptr);
2942 Statistics* statistics = immutable_db_options_.statistics.get();
2943 if (!statistics) {
2944 return false;
2945 }
2946 *value = statistics->ToString();
2947 return true;
2948 }
2949
2950 #ifndef ROCKSDB_LITE
ResetStats()2951 Status DBImpl::ResetStats() {
2952 InstrumentedMutexLock l(&mutex_);
2953 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2954 if (cfd->initialized()) {
2955 cfd->internal_stats()->Clear();
2956 }
2957 }
2958 return Status::OK();
2959 }
2960 #endif // ROCKSDB_LITE
2961
GetAggregatedIntProperty(const Slice & property,uint64_t * aggregated_value)2962 bool DBImpl::GetAggregatedIntProperty(const Slice& property,
2963 uint64_t* aggregated_value) {
2964 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2965 if (property_info == nullptr || property_info->handle_int == nullptr) {
2966 return false;
2967 }
2968
2969 uint64_t sum = 0;
2970 {
2971 // Needs mutex to protect the list of column families.
2972 InstrumentedMutexLock l(&mutex_);
2973 uint64_t value;
2974 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2975 if (!cfd->initialized()) {
2976 continue;
2977 }
2978 if (GetIntPropertyInternal(cfd, *property_info, true, &value)) {
2979 sum += value;
2980 } else {
2981 return false;
2982 }
2983 }
2984 }
2985 *aggregated_value = sum;
2986 return true;
2987 }
2988
GetAndRefSuperVersion(ColumnFamilyData * cfd)2989 SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
2990 // TODO(ljin): consider using GetReferencedSuperVersion() directly
2991 return cfd->GetThreadLocalSuperVersion(this);
2992 }
2993
2994 // REQUIRED: this function should only be called on the write thread or if the
2995 // mutex is held.
GetAndRefSuperVersion(uint32_t column_family_id)2996 SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
2997 auto column_family_set = versions_->GetColumnFamilySet();
2998 auto cfd = column_family_set->GetColumnFamily(column_family_id);
2999 if (!cfd) {
3000 return nullptr;
3001 }
3002
3003 return GetAndRefSuperVersion(cfd);
3004 }
3005
CleanupSuperVersion(SuperVersion * sv)3006 void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
3007 // Release SuperVersion
3008 if (sv->Unref()) {
3009 bool defer_purge =
3010 immutable_db_options().avoid_unnecessary_blocking_io;
3011 {
3012 InstrumentedMutexLock l(&mutex_);
3013 sv->Cleanup();
3014 if (defer_purge) {
3015 AddSuperVersionsToFreeQueue(sv);
3016 SchedulePurge();
3017 }
3018 }
3019 if (!defer_purge) {
3020 delete sv;
3021 }
3022 RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
3023 }
3024 RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
3025 }
3026
ReturnAndCleanupSuperVersion(ColumnFamilyData * cfd,SuperVersion * sv)3027 void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
3028 SuperVersion* sv) {
3029 if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
3030 CleanupSuperVersion(sv);
3031 }
3032 }
3033
3034 // REQUIRED: this function should only be called on the write thread.
ReturnAndCleanupSuperVersion(uint32_t column_family_id,SuperVersion * sv)3035 void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
3036 SuperVersion* sv) {
3037 auto column_family_set = versions_->GetColumnFamilySet();
3038 auto cfd = column_family_set->GetColumnFamily(column_family_id);
3039
3040 // If SuperVersion is held, and we successfully fetched a cfd using
3041 // GetAndRefSuperVersion(), it must still exist.
3042 assert(cfd != nullptr);
3043 ReturnAndCleanupSuperVersion(cfd, sv);
3044 }
3045
3046 // REQUIRED: this function should only be called on the write thread or if the
3047 // mutex is held.
GetColumnFamilyHandle(uint32_t column_family_id)3048 ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
3049 ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
3050
3051 if (!cf_memtables->Seek(column_family_id)) {
3052 return nullptr;
3053 }
3054
3055 return cf_memtables->GetColumnFamilyHandle();
3056 }
3057
3058 // REQUIRED: mutex is NOT held.
GetColumnFamilyHandleUnlocked(uint32_t column_family_id)3059 std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
3060 uint32_t column_family_id) {
3061 InstrumentedMutexLock l(&mutex_);
3062
3063 auto* cfd =
3064 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
3065 if (cfd == nullptr) {
3066 return nullptr;
3067 }
3068
3069 return std::unique_ptr<ColumnFamilyHandleImpl>(
3070 new ColumnFamilyHandleImpl(cfd, this, &mutex_));
3071 }
3072
GetApproximateMemTableStats(ColumnFamilyHandle * column_family,const Range & range,uint64_t * const count,uint64_t * const size)3073 void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
3074 const Range& range,
3075 uint64_t* const count,
3076 uint64_t* const size) {
3077 ColumnFamilyHandleImpl* cfh =
3078 reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3079 ColumnFamilyData* cfd = cfh->cfd();
3080 SuperVersion* sv = GetAndRefSuperVersion(cfd);
3081
3082 // Convert user_key into a corresponding internal key.
3083 InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
3084 InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
3085 MemTable::MemTableStats memStats =
3086 sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
3087 MemTable::MemTableStats immStats =
3088 sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
3089 *count = memStats.count + immStats.count;
3090 *size = memStats.size + immStats.size;
3091
3092 ReturnAndCleanupSuperVersion(cfd, sv);
3093 }
3094
GetApproximateSizes(const SizeApproximationOptions & options,ColumnFamilyHandle * column_family,const Range * range,int n,uint64_t * sizes)3095 Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
3096 ColumnFamilyHandle* column_family,
3097 const Range* range, int n, uint64_t* sizes) {
3098 if (!options.include_memtabtles && !options.include_files) {
3099 return Status::InvalidArgument("Invalid options");
3100 }
3101
3102 Version* v;
3103 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3104 auto cfd = cfh->cfd();
3105 SuperVersion* sv = GetAndRefSuperVersion(cfd);
3106 v = sv->current;
3107
3108 for (int i = 0; i < n; i++) {
3109 // Convert user_key into a corresponding internal key.
3110 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
3111 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
3112 sizes[i] = 0;
3113 if (options.include_files) {
3114 sizes[i] += versions_->ApproximateSize(
3115 options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
3116 /*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
3117 }
3118 if (options.include_memtabtles) {
3119 sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
3120 sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
3121 }
3122 }
3123
3124 ReturnAndCleanupSuperVersion(cfd, sv);
3125 return Status::OK();
3126 }
3127
3128 std::list<uint64_t>::iterator
CaptureCurrentFileNumberInPendingOutputs()3129 DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
3130 // We need to remember the iterator of our insert, because after the
3131 // background job is done, we need to remove that element from
3132 // pending_outputs_.
3133 pending_outputs_.push_back(versions_->current_next_file_number());
3134 auto pending_outputs_inserted_elem = pending_outputs_.end();
3135 --pending_outputs_inserted_elem;
3136 return pending_outputs_inserted_elem;
3137 }
3138
ReleaseFileNumberFromPendingOutputs(std::unique_ptr<std::list<uint64_t>::iterator> & v)3139 void DBImpl::ReleaseFileNumberFromPendingOutputs(
3140 std::unique_ptr<std::list<uint64_t>::iterator>& v) {
3141 if (v.get() != nullptr) {
3142 pending_outputs_.erase(*v.get());
3143 v.reset();
3144 }
3145 }
3146
3147 #ifndef ROCKSDB_LITE
GetUpdatesSince(SequenceNumber seq,std::unique_ptr<TransactionLogIterator> * iter,const TransactionLogIterator::ReadOptions & read_options)3148 Status DBImpl::GetUpdatesSince(
3149 SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
3150 const TransactionLogIterator::ReadOptions& read_options) {
3151 RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
3152 if (seq > versions_->LastSequence()) {
3153 return Status::NotFound("Requested sequence not yet written in the db");
3154 }
3155 return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
3156 }
3157
DeleteFile(std::string name)3158 Status DBImpl::DeleteFile(std::string name) {
3159 uint64_t number;
3160 FileType type;
3161 WalFileType log_type;
3162 if (!ParseFileName(name, &number, &type, &log_type) ||
3163 (type != kTableFile && type != kLogFile)) {
3164 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
3165 name.c_str());
3166 return Status::InvalidArgument("Invalid file name");
3167 }
3168
3169 Status status;
3170 if (type == kLogFile) {
3171 // Only allow deleting archived log files
3172 if (log_type != kArchivedLogFile) {
3173 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3174 "DeleteFile %s failed - not archived log.\n",
3175 name.c_str());
3176 return Status::NotSupported("Delete only supported for archived logs");
3177 }
3178 status = wal_manager_.DeleteFile(name, number);
3179 if (!status.ok()) {
3180 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3181 "DeleteFile %s failed -- %s.\n", name.c_str(),
3182 status.ToString().c_str());
3183 }
3184 return status;
3185 }
3186
3187 int level;
3188 FileMetaData* metadata;
3189 ColumnFamilyData* cfd;
3190 VersionEdit edit;
3191 JobContext job_context(next_job_id_.fetch_add(1), true);
3192 {
3193 InstrumentedMutexLock l(&mutex_);
3194 status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
3195 if (!status.ok()) {
3196 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3197 "DeleteFile %s failed. File not found\n", name.c_str());
3198 job_context.Clean();
3199 return Status::InvalidArgument("File not found");
3200 }
3201 assert(level < cfd->NumberLevels());
3202
3203 // If the file is being compacted no need to delete.
3204 if (metadata->being_compacted) {
3205 ROCKS_LOG_INFO(immutable_db_options_.info_log,
3206 "DeleteFile %s Skipped. File about to be compacted\n",
3207 name.c_str());
3208 job_context.Clean();
3209 return Status::OK();
3210 }
3211
3212 // Only the files in the last level can be deleted externally.
3213 // This is to make sure that any deletion tombstones are not
3214 // lost. Check that the level passed is the last level.
3215 auto* vstoreage = cfd->current()->storage_info();
3216 for (int i = level + 1; i < cfd->NumberLevels(); i++) {
3217 if (vstoreage->NumLevelFiles(i) != 0) {
3218 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3219 "DeleteFile %s FAILED. File not in last level\n",
3220 name.c_str());
3221 job_context.Clean();
3222 return Status::InvalidArgument("File not in last level");
3223 }
3224 }
3225 // if level == 0, it has to be the oldest file
3226 if (level == 0 &&
3227 vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
3228 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3229 "DeleteFile %s failed ---"
3230 " target file in level 0 must be the oldest.",
3231 name.c_str());
3232 job_context.Clean();
3233 return Status::InvalidArgument("File in level 0, but not oldest");
3234 }
3235 edit.SetColumnFamily(cfd->GetID());
3236 edit.DeleteFile(level, number);
3237 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3238 &edit, &mutex_, directories_.GetDbDir());
3239 if (status.ok()) {
3240 InstallSuperVersionAndScheduleWork(cfd,
3241 &job_context.superversion_contexts[0],
3242 *cfd->GetLatestMutableCFOptions());
3243 }
3244 FindObsoleteFiles(&job_context, false);
3245 } // lock released here
3246
3247 LogFlush(immutable_db_options_.info_log);
3248 // remove files outside the db-lock
3249 if (job_context.HaveSomethingToDelete()) {
3250 // Call PurgeObsoleteFiles() without holding mutex.
3251 PurgeObsoleteFiles(job_context);
3252 }
3253 job_context.Clean();
3254 return status;
3255 }
3256
DeleteFilesInRanges(ColumnFamilyHandle * column_family,const RangePtr * ranges,size_t n,bool include_end)3257 Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
3258 const RangePtr* ranges, size_t n,
3259 bool include_end) {
3260 Status status;
3261 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3262 ColumnFamilyData* cfd = cfh->cfd();
3263 VersionEdit edit;
3264 std::set<FileMetaData*> deleted_files;
3265 JobContext job_context(next_job_id_.fetch_add(1), true);
3266 {
3267 InstrumentedMutexLock l(&mutex_);
3268 Version* input_version = cfd->current();
3269
3270 auto* vstorage = input_version->storage_info();
3271 for (size_t r = 0; r < n; r++) {
3272 auto begin = ranges[r].start, end = ranges[r].limit;
3273 for (int i = 1; i < cfd->NumberLevels(); i++) {
3274 if (vstorage->LevelFiles(i).empty() ||
3275 !vstorage->OverlapInLevel(i, begin, end)) {
3276 continue;
3277 }
3278 std::vector<FileMetaData*> level_files;
3279 InternalKey begin_storage, end_storage, *begin_key, *end_key;
3280 if (begin == nullptr) {
3281 begin_key = nullptr;
3282 } else {
3283 begin_storage.SetMinPossibleForUserKey(*begin);
3284 begin_key = &begin_storage;
3285 }
3286 if (end == nullptr) {
3287 end_key = nullptr;
3288 } else {
3289 end_storage.SetMaxPossibleForUserKey(*end);
3290 end_key = &end_storage;
3291 }
3292
3293 vstorage->GetCleanInputsWithinInterval(
3294 i, begin_key, end_key, &level_files, -1 /* hint_index */,
3295 nullptr /* file_index */);
3296 FileMetaData* level_file;
3297 for (uint32_t j = 0; j < level_files.size(); j++) {
3298 level_file = level_files[j];
3299 if (level_file->being_compacted) {
3300 continue;
3301 }
3302 if (deleted_files.find(level_file) != deleted_files.end()) {
3303 continue;
3304 }
3305 if (!include_end && end != nullptr &&
3306 cfd->user_comparator()->Compare(level_file->largest.user_key(),
3307 *end) == 0) {
3308 continue;
3309 }
3310 edit.SetColumnFamily(cfd->GetID());
3311 edit.DeleteFile(i, level_file->fd.GetNumber());
3312 deleted_files.insert(level_file);
3313 level_file->being_compacted = true;
3314 }
3315 }
3316 }
3317 if (edit.GetDeletedFiles().empty()) {
3318 job_context.Clean();
3319 return Status::OK();
3320 }
3321 input_version->Ref();
3322 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3323 &edit, &mutex_, directories_.GetDbDir());
3324 if (status.ok()) {
3325 InstallSuperVersionAndScheduleWork(cfd,
3326 &job_context.superversion_contexts[0],
3327 *cfd->GetLatestMutableCFOptions());
3328 }
3329 for (auto* deleted_file : deleted_files) {
3330 deleted_file->being_compacted = false;
3331 }
3332 input_version->Unref();
3333 FindObsoleteFiles(&job_context, false);
3334 } // lock released here
3335
3336 LogFlush(immutable_db_options_.info_log);
3337 // remove files outside the db-lock
3338 if (job_context.HaveSomethingToDelete()) {
3339 // Call PurgeObsoleteFiles() without holding mutex.
3340 PurgeObsoleteFiles(job_context);
3341 }
3342 job_context.Clean();
3343 return status;
3344 }
3345
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)3346 void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
3347 InstrumentedMutexLock l(&mutex_);
3348 versions_->GetLiveFilesMetaData(metadata);
3349 }
3350
GetColumnFamilyMetaData(ColumnFamilyHandle * column_family,ColumnFamilyMetaData * cf_meta)3351 void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
3352 ColumnFamilyMetaData* cf_meta) {
3353 assert(column_family);
3354 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
3355 auto* sv = GetAndRefSuperVersion(cfd);
3356 {
3357 // Without mutex, Version::GetColumnFamilyMetaData will have data race with
3358 // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
3359 // this may cause regression. An alternative is to make
3360 // FileMetaData::being_compacted atomic, but it will make FileMetaData
3361 // non-copy-able. Another option is to separate these variables from
3362 // original FileMetaData struct, and this requires re-organization of data
3363 // structures. For now, we take the easy approach. If
3364 // DB::GetColumnFamilyMetaData is not called frequently, the regression
3365 // should not be big. We still need to keep an eye on it.
3366 InstrumentedMutexLock l(&mutex_);
3367 sv->current->GetColumnFamilyMetaData(cf_meta);
3368 }
3369 ReturnAndCleanupSuperVersion(cfd, sv);
3370 }
3371
3372 #endif // ROCKSDB_LITE
3373
CheckConsistency()3374 Status DBImpl::CheckConsistency() {
3375 mutex_.AssertHeld();
3376 std::vector<LiveFileMetaData> metadata;
3377 versions_->GetLiveFilesMetaData(&metadata);
3378 TEST_SYNC_POINT("DBImpl::CheckConsistency:AfterGetLiveFilesMetaData");
3379
3380 std::string corruption_messages;
3381
3382 if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
3383 // Instead of calling GetFileSize() for each expected file, call
3384 // GetChildren() for the DB directory and check that all expected files
3385 // are listed, without checking their sizes.
3386 // Since sst files might be in different directories, do it for each
3387 // directory separately.
3388 std::map<std::string, std::vector<std::string>> files_by_directory;
3389 for (const auto& md : metadata) {
3390 // md.name has a leading "/". Remove it.
3391 std::string fname = md.name;
3392 if (!fname.empty() && fname[0] == '/') {
3393 fname = fname.substr(1);
3394 }
3395 files_by_directory[md.db_path].push_back(fname);
3396 }
3397 for (const auto& dir_files : files_by_directory) {
3398 std::string directory = dir_files.first;
3399 std::vector<std::string> existing_files;
3400 Status s = env_->GetChildren(directory, &existing_files);
3401 if (!s.ok()) {
3402 corruption_messages +=
3403 "Can't list files in " + directory + ": " + s.ToString() + "\n";
3404 continue;
3405 }
3406 std::sort(existing_files.begin(), existing_files.end());
3407
3408 for (const std::string& fname : dir_files.second) {
3409 if (!std::binary_search(existing_files.begin(), existing_files.end(),
3410 fname) &&
3411 !std::binary_search(existing_files.begin(), existing_files.end(),
3412 Rocks2LevelTableFileName(fname))) {
3413 corruption_messages +=
3414 "Missing sst file " + fname + " in " + directory + "\n";
3415 }
3416 }
3417 }
3418 } else {
3419 for (const auto& md : metadata) {
3420 // md.name has a leading "/".
3421 std::string file_path = md.db_path + md.name;
3422
3423 uint64_t fsize = 0;
3424 TEST_SYNC_POINT("DBImpl::CheckConsistency:BeforeGetFileSize");
3425 Status s = env_->GetFileSize(file_path, &fsize);
3426 if (!s.ok() &&
3427 env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
3428 s = Status::OK();
3429 }
3430 if (!s.ok()) {
3431 corruption_messages +=
3432 "Can't access " + md.name + ": " + s.ToString() + "\n";
3433 } else if (fsize != md.size) {
3434 corruption_messages += "Sst file size mismatch: " + file_path +
3435 ". Size recorded in manifest " +
3436 ToString(md.size) + ", actual size " +
3437 ToString(fsize) + "\n";
3438 }
3439 }
3440 }
3441
3442 if (corruption_messages.size() == 0) {
3443 return Status::OK();
3444 } else {
3445 return Status::Corruption(corruption_messages);
3446 }
3447 }
3448
GetDbIdentity(std::string & identity) const3449 Status DBImpl::GetDbIdentity(std::string& identity) const {
3450 identity.assign(db_id_);
3451 return Status::OK();
3452 }
3453
GetDbIdentityFromIdentityFile(std::string * identity) const3454 Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
3455 std::string idfilename = IdentityFileName(dbname_);
3456 const FileOptions soptions;
3457
3458 Status s = ReadFileToString(fs_.get(), idfilename, identity);
3459 if (!s.ok()) {
3460 return s;
3461 }
3462
3463 // If last character is '\n' remove it from identity
3464 if (identity->size() > 0 && identity->back() == '\n') {
3465 identity->pop_back();
3466 }
3467 return s;
3468 }
3469
3470 // Default implementation -- returns not supported status
CreateColumnFamily(const ColumnFamilyOptions &,const std::string &,ColumnFamilyHandle **)3471 Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
3472 const std::string& /*column_family_name*/,
3473 ColumnFamilyHandle** /*handle*/) {
3474 return Status::NotSupported("");
3475 }
3476
CreateColumnFamilies(const ColumnFamilyOptions &,const std::vector<std::string> &,std::vector<ColumnFamilyHandle * > *)3477 Status DB::CreateColumnFamilies(
3478 const ColumnFamilyOptions& /*cf_options*/,
3479 const std::vector<std::string>& /*column_family_names*/,
3480 std::vector<ColumnFamilyHandle*>* /*handles*/) {
3481 return Status::NotSupported("");
3482 }
3483
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> &,std::vector<ColumnFamilyHandle * > *)3484 Status DB::CreateColumnFamilies(
3485 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
3486 std::vector<ColumnFamilyHandle*>* /*handles*/) {
3487 return Status::NotSupported("");
3488 }
3489
DropColumnFamily(ColumnFamilyHandle *)3490 Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
3491 return Status::NotSupported("");
3492 }
3493
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > &)3494 Status DB::DropColumnFamilies(
3495 const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
3496 return Status::NotSupported("");
3497 }
3498
DestroyColumnFamilyHandle(ColumnFamilyHandle * column_family)3499 Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
3500 delete column_family;
3501 return Status::OK();
3502 }
3503
~DB()3504 DB::~DB() {}
3505
Close()3506 Status DBImpl::Close() {
3507 if (!closed_) {
3508 {
3509 InstrumentedMutexLock l(&mutex_);
3510 // If there is unreleased snapshot, fail the close call
3511 if (!snapshots_.empty()) {
3512 return Status::Aborted("Cannot close DB with unreleased snapshot.");
3513 }
3514 }
3515
3516 closed_ = true;
3517 return CloseImpl();
3518 }
3519 return Status::OK();
3520 }
3521
ListColumnFamilies(const DBOptions & db_options,const std::string & name,std::vector<std::string> * column_families)3522 Status DB::ListColumnFamilies(const DBOptions& db_options,
3523 const std::string& name,
3524 std::vector<std::string>* column_families) {
3525 const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
3526 return VersionSet::ListColumnFamilies(column_families, name, fs.get());
3527 }
3528
~Snapshot()3529 Snapshot::~Snapshot() {}
3530
DestroyDB(const std::string & dbname,const Options & options,const std::vector<ColumnFamilyDescriptor> & column_families)3531 Status DestroyDB(const std::string& dbname, const Options& options,
3532 const std::vector<ColumnFamilyDescriptor>& column_families) {
3533 ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
3534 Env* env = soptions.env;
3535 std::vector<std::string> filenames;
3536 bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
3537
3538 // Reset the logger because it holds a handle to the
3539 // log file and prevents cleanup and directory removal
3540 soptions.info_log.reset();
3541 // Ignore error in case directory does not exist
3542 env->GetChildren(dbname, &filenames);
3543
3544 FileLock* lock;
3545 const std::string lockname = LockFileName(dbname);
3546 Status result = env->LockFile(lockname, &lock);
3547 if (result.ok()) {
3548 uint64_t number;
3549 FileType type;
3550 InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
3551 for (const auto& fname : filenames) {
3552 if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
3553 type != kDBLockFile) { // Lock file will be deleted at end
3554 Status del;
3555 std::string path_to_delete = dbname + "/" + fname;
3556 if (type == kMetaDatabase) {
3557 del = DestroyDB(path_to_delete, options);
3558 } else if (type == kTableFile || type == kLogFile) {
3559 del = DeleteDBFile(&soptions, path_to_delete, dbname,
3560 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3561 } else {
3562 del = env->DeleteFile(path_to_delete);
3563 }
3564 if (result.ok() && !del.ok()) {
3565 result = del;
3566 }
3567 }
3568 }
3569
3570 std::set<std::string> paths;
3571 for (const DbPath& db_path : options.db_paths) {
3572 paths.insert(db_path.path);
3573 }
3574 for (const ColumnFamilyDescriptor& cf : column_families) {
3575 for (const DbPath& cf_path : cf.options.cf_paths) {
3576 paths.insert(cf_path.path);
3577 }
3578 }
3579 for (const auto& path : paths) {
3580 if (env->GetChildren(path, &filenames).ok()) {
3581 for (const auto& fname : filenames) {
3582 if (ParseFileName(fname, &number, &type) &&
3583 type == kTableFile) { // Lock file will be deleted at end
3584 std::string table_path = path + "/" + fname;
3585 Status del = DeleteDBFile(&soptions, table_path, dbname,
3586 /*force_bg=*/false, /*force_fg=*/false);
3587 if (result.ok() && !del.ok()) {
3588 result = del;
3589 }
3590 }
3591 }
3592 env->DeleteDir(path);
3593 }
3594 }
3595
3596 std::vector<std::string> walDirFiles;
3597 std::string archivedir = ArchivalDirectory(dbname);
3598 bool wal_dir_exists = false;
3599 if (dbname != soptions.wal_dir) {
3600 wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
3601 archivedir = ArchivalDirectory(soptions.wal_dir);
3602 }
3603
3604 // Archive dir may be inside wal dir or dbname and should be
3605 // processed and removed before those otherwise we have issues
3606 // removing them
3607 std::vector<std::string> archiveFiles;
3608 if (env->GetChildren(archivedir, &archiveFiles).ok()) {
3609 // Delete archival files.
3610 for (const auto& file : archiveFiles) {
3611 if (ParseFileName(file, &number, &type) && type == kLogFile) {
3612 Status del =
3613 DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
3614 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3615 if (result.ok() && !del.ok()) {
3616 result = del;
3617 }
3618 }
3619 }
3620 env->DeleteDir(archivedir);
3621 }
3622
3623 // Delete log files in the WAL dir
3624 if (wal_dir_exists) {
3625 for (const auto& file : walDirFiles) {
3626 if (ParseFileName(file, &number, &type) && type == kLogFile) {
3627 Status del =
3628 DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
3629 soptions.wal_dir, /*force_bg=*/false,
3630 /*force_fg=*/!wal_in_db_path);
3631 if (result.ok() && !del.ok()) {
3632 result = del;
3633 }
3634 }
3635 }
3636 env->DeleteDir(soptions.wal_dir);
3637 }
3638
3639 env->UnlockFile(lock); // Ignore error since state is already gone
3640 env->DeleteFile(lockname);
3641
3642 // sst_file_manager holds a ref to the logger. Make sure the logger is
3643 // gone before trying to remove the directory.
3644 soptions.sst_file_manager.reset();
3645
3646 env->DeleteDir(dbname); // Ignore error in case dir contains other files
3647 }
3648 return result;
3649 }
3650
WriteOptionsFile(bool need_mutex_lock,bool need_enter_write_thread)3651 Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
3652 bool need_enter_write_thread) {
3653 #ifndef ROCKSDB_LITE
3654 WriteThread::Writer w;
3655 if (need_mutex_lock) {
3656 mutex_.Lock();
3657 } else {
3658 mutex_.AssertHeld();
3659 }
3660 if (need_enter_write_thread) {
3661 write_thread_.EnterUnbatched(&w, &mutex_);
3662 }
3663
3664 std::vector<std::string> cf_names;
3665 std::vector<ColumnFamilyOptions> cf_opts;
3666
3667 // This part requires mutex to protect the column family options
3668 for (auto cfd : *versions_->GetColumnFamilySet()) {
3669 if (cfd->IsDropped()) {
3670 continue;
3671 }
3672 cf_names.push_back(cfd->GetName());
3673 cf_opts.push_back(cfd->GetLatestCFOptions());
3674 }
3675
3676 // Unlock during expensive operations. New writes cannot get here
3677 // because the single write thread ensures all new writes get queued.
3678 DBOptions db_options =
3679 BuildDBOptions(immutable_db_options_, mutable_db_options_);
3680 mutex_.Unlock();
3681
3682 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
3683 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
3684
3685 std::string file_name =
3686 TempOptionsFileName(GetName(), versions_->NewFileNumber());
3687 Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
3688 GetFileSystem());
3689
3690 if (s.ok()) {
3691 s = RenameTempFileToOptionsFile(file_name);
3692 }
3693 // restore lock
3694 if (!need_mutex_lock) {
3695 mutex_.Lock();
3696 }
3697 if (need_enter_write_thread) {
3698 write_thread_.ExitUnbatched(&w);
3699 }
3700 if (!s.ok()) {
3701 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3702 "Unnable to persist options -- %s", s.ToString().c_str());
3703 if (immutable_db_options_.fail_if_options_file_error) {
3704 return Status::IOError("Unable to persist options.",
3705 s.ToString().c_str());
3706 }
3707 }
3708 #else
3709 (void)need_mutex_lock;
3710 (void)need_enter_write_thread;
3711 #endif // !ROCKSDB_LITE
3712 return Status::OK();
3713 }
3714
3715 #ifndef ROCKSDB_LITE
3716 namespace {
DeleteOptionsFilesHelper(const std::map<uint64_t,std::string> & filenames,const size_t num_files_to_keep,const std::shared_ptr<Logger> & info_log,Env * env)3717 void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
3718 const size_t num_files_to_keep,
3719 const std::shared_ptr<Logger>& info_log,
3720 Env* env) {
3721 if (filenames.size() <= num_files_to_keep) {
3722 return;
3723 }
3724 for (auto iter = std::next(filenames.begin(), num_files_to_keep);
3725 iter != filenames.end(); ++iter) {
3726 if (!env->DeleteFile(iter->second).ok()) {
3727 ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
3728 iter->second.c_str());
3729 }
3730 }
3731 }
3732 } // namespace
3733 #endif // !ROCKSDB_LITE
3734
DeleteObsoleteOptionsFiles()3735 Status DBImpl::DeleteObsoleteOptionsFiles() {
3736 #ifndef ROCKSDB_LITE
3737 std::vector<std::string> filenames;
3738 // use ordered map to store keep the filenames sorted from the newest
3739 // to the oldest.
3740 std::map<uint64_t, std::string> options_filenames;
3741 Status s;
3742 s = GetEnv()->GetChildren(GetName(), &filenames);
3743 if (!s.ok()) {
3744 return s;
3745 }
3746 for (auto& filename : filenames) {
3747 uint64_t file_number;
3748 FileType type;
3749 if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
3750 options_filenames.insert(
3751 {std::numeric_limits<uint64_t>::max() - file_number,
3752 GetName() + "/" + filename});
3753 }
3754 }
3755
3756 // Keeps the latest 2 Options file
3757 const size_t kNumOptionsFilesKept = 2;
3758 DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
3759 immutable_db_options_.info_log, GetEnv());
3760 return Status::OK();
3761 #else
3762 return Status::OK();
3763 #endif // !ROCKSDB_LITE
3764 }
3765
RenameTempFileToOptionsFile(const std::string & file_name)3766 Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
3767 #ifndef ROCKSDB_LITE
3768 Status s;
3769
3770 uint64_t options_file_number = versions_->NewFileNumber();
3771 std::string options_file_name =
3772 OptionsFileName(GetName(), options_file_number);
3773 // Retry if the file name happen to conflict with an existing one.
3774 s = GetEnv()->RenameFile(file_name, options_file_name);
3775 if (s.ok()) {
3776 InstrumentedMutexLock l(&mutex_);
3777 versions_->options_file_number_ = options_file_number;
3778 }
3779
3780 if (0 == disable_delete_obsolete_files_) {
3781 DeleteObsoleteOptionsFiles();
3782 }
3783 return s;
3784 #else
3785 (void)file_name;
3786 return Status::OK();
3787 #endif // !ROCKSDB_LITE
3788 }
3789
3790 #ifdef ROCKSDB_USING_THREAD_STATUS
3791
NewThreadStatusCfInfo(ColumnFamilyData * cfd) const3792 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
3793 if (immutable_db_options_.enable_thread_tracking) {
3794 ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
3795 cfd->ioptions()->env);
3796 }
3797 }
3798
EraseThreadStatusCfInfo(ColumnFamilyData * cfd) const3799 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
3800 if (immutable_db_options_.enable_thread_tracking) {
3801 ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
3802 }
3803 }
3804
EraseThreadStatusDbInfo() const3805 void DBImpl::EraseThreadStatusDbInfo() const {
3806 if (immutable_db_options_.enable_thread_tracking) {
3807 ThreadStatusUtil::EraseDatabaseInfo(this);
3808 }
3809 }
3810
3811 #else
NewThreadStatusCfInfo(ColumnFamilyData *) const3812 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
3813
EraseThreadStatusCfInfo(ColumnFamilyData *) const3814 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
3815
EraseThreadStatusDbInfo() const3816 void DBImpl::EraseThreadStatusDbInfo() const {}
3817 #endif // ROCKSDB_USING_THREAD_STATUS
3818
3819 //
3820 // A global method that can dump out the build version
DumpRocksDBBuildVersion(Logger * log)3821 void DumpRocksDBBuildVersion(Logger* log) {
3822 #if !defined(IOS_CROSS_COMPILE)
3823 // if we compile with Xcode, we don't run build_detect_version, so we don't
3824 // generate util/build_version.cc
3825 ROCKS_LOG_HEADER(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR,
3826 ROCKSDB_MINOR, ROCKSDB_PATCH);
3827 ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha);
3828 ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date);
3829 #else
3830 (void)log; // ignore "-Wunused-parameter"
3831 #endif
3832 }
3833
3834 #ifndef ROCKSDB_LITE
GetEarliestMemTableSequenceNumber(SuperVersion * sv,bool include_history)3835 SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
3836 bool include_history) {
3837 // Find the earliest sequence number that we know we can rely on reading
3838 // from the memtable without needing to check sst files.
3839 SequenceNumber earliest_seq =
3840 sv->imm->GetEarliestSequenceNumber(include_history);
3841 if (earliest_seq == kMaxSequenceNumber) {
3842 earliest_seq = sv->mem->GetEarliestSequenceNumber();
3843 }
3844 assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
3845
3846 return earliest_seq;
3847 }
3848 #endif // ROCKSDB_LITE
3849
3850 #ifndef ROCKSDB_LITE
GetLatestSequenceForKey(SuperVersion * sv,const Slice & key,bool cache_only,SequenceNumber lower_bound_seq,SequenceNumber * seq,bool * found_record_for_key,bool * is_blob_index)3851 Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
3852 bool cache_only,
3853 SequenceNumber lower_bound_seq,
3854 SequenceNumber* seq,
3855 bool* found_record_for_key,
3856 bool* is_blob_index) {
3857 Status s;
3858 MergeContext merge_context;
3859 SequenceNumber max_covering_tombstone_seq = 0;
3860
3861 ReadOptions read_options;
3862 SequenceNumber current_seq = versions_->LastSequence();
3863 LookupKey lkey(key, current_seq);
3864
3865 *seq = kMaxSequenceNumber;
3866 *found_record_for_key = false;
3867
3868 // Check if there is a record for this key in the latest memtable
3869 sv->mem->Get(lkey, nullptr, nullptr, &s, &merge_context,
3870 &max_covering_tombstone_seq, seq, read_options,
3871 nullptr /*read_callback*/, is_blob_index);
3872
3873 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3874 // unexpected error reading memtable.
3875 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3876 "Unexpected status returned from MemTable::Get: %s\n",
3877 s.ToString().c_str());
3878
3879 return s;
3880 }
3881
3882 if (*seq != kMaxSequenceNumber) {
3883 // Found a sequence number, no need to check immutable memtables
3884 *found_record_for_key = true;
3885 return Status::OK();
3886 }
3887
3888 SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
3889 if (lower_bound_in_mem != kMaxSequenceNumber &&
3890 lower_bound_in_mem < lower_bound_seq) {
3891 *found_record_for_key = false;
3892 return Status::OK();
3893 }
3894
3895 // Check if there is a record for this key in the immutable memtables
3896 sv->imm->Get(lkey, nullptr, nullptr, &s, &merge_context,
3897 &max_covering_tombstone_seq, seq, read_options,
3898 nullptr /*read_callback*/, is_blob_index);
3899
3900 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3901 // unexpected error reading memtable.
3902 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3903 "Unexpected status returned from MemTableList::Get: %s\n",
3904 s.ToString().c_str());
3905
3906 return s;
3907 }
3908
3909 if (*seq != kMaxSequenceNumber) {
3910 // Found a sequence number, no need to check memtable history
3911 *found_record_for_key = true;
3912 return Status::OK();
3913 }
3914
3915 SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
3916 if (lower_bound_in_imm != kMaxSequenceNumber &&
3917 lower_bound_in_imm < lower_bound_seq) {
3918 *found_record_for_key = false;
3919 return Status::OK();
3920 }
3921
3922 // Check if there is a record for this key in the immutable memtables
3923 sv->imm->GetFromHistory(lkey, nullptr, nullptr, &s, &merge_context,
3924 &max_covering_tombstone_seq, seq, read_options,
3925 is_blob_index);
3926
3927 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3928 // unexpected error reading memtable.
3929 ROCKS_LOG_ERROR(
3930 immutable_db_options_.info_log,
3931 "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
3932 s.ToString().c_str());
3933
3934 return s;
3935 }
3936
3937 if (*seq != kMaxSequenceNumber) {
3938 // Found a sequence number, no need to check SST files
3939 *found_record_for_key = true;
3940 return Status::OK();
3941 }
3942
3943 // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
3944 // check here to skip the history if possible. But currently the caller
3945 // already does that. Maybe we should move the logic here later.
3946
3947 // TODO(agiardullo): possible optimization: consider checking cached
3948 // SST files if cache_only=true?
3949 if (!cache_only) {
3950 // Check tables
3951 sv->current->Get(read_options, lkey, nullptr, nullptr, &s, &merge_context,
3952 &max_covering_tombstone_seq, nullptr /* value_found */,
3953 found_record_for_key, seq, nullptr /*read_callback*/,
3954 is_blob_index);
3955
3956 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3957 // unexpected error reading SST files
3958 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3959 "Unexpected status returned from Version::Get: %s\n",
3960 s.ToString().c_str());
3961 }
3962 }
3963
3964 return s;
3965 }
3966
IngestExternalFile(ColumnFamilyHandle * column_family,const std::vector<std::string> & external_files,const IngestExternalFileOptions & ingestion_options)3967 Status DBImpl::IngestExternalFile(
3968 ColumnFamilyHandle* column_family,
3969 const std::vector<std::string>& external_files,
3970 const IngestExternalFileOptions& ingestion_options) {
3971 IngestExternalFileArg arg;
3972 arg.column_family = column_family;
3973 arg.external_files = external_files;
3974 arg.options = ingestion_options;
3975 return IngestExternalFiles({arg});
3976 }
3977
IngestExternalFiles(const std::vector<IngestExternalFileArg> & args)3978 Status DBImpl::IngestExternalFiles(
3979 const std::vector<IngestExternalFileArg>& args) {
3980 if (args.empty()) {
3981 return Status::InvalidArgument("ingestion arg list is empty");
3982 }
3983 {
3984 std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
3985 for (const auto& arg : args) {
3986 if (arg.column_family == nullptr) {
3987 return Status::InvalidArgument("column family handle is null");
3988 } else if (unique_cfhs.count(arg.column_family) > 0) {
3989 return Status::InvalidArgument(
3990 "ingestion args have duplicate column families");
3991 }
3992 unique_cfhs.insert(arg.column_family);
3993 }
3994 }
3995 // Ingest multiple external SST files atomically.
3996 size_t num_cfs = args.size();
3997 for (size_t i = 0; i != num_cfs; ++i) {
3998 if (args[i].external_files.empty()) {
3999 char err_msg[128] = {0};
4000 snprintf(err_msg, 128, "external_files[%zu] is empty", i);
4001 return Status::InvalidArgument(err_msg);
4002 }
4003 }
4004 for (const auto& arg : args) {
4005 const IngestExternalFileOptions& ingest_opts = arg.options;
4006 if (ingest_opts.ingest_behind &&
4007 !immutable_db_options_.allow_ingest_behind) {
4008 return Status::InvalidArgument(
4009 "can't ingest_behind file in DB with allow_ingest_behind=false");
4010 }
4011 }
4012
4013 // TODO (yanqin) maybe handle the case in which column_families have
4014 // duplicates
4015 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4016 size_t total = 0;
4017 for (const auto& arg : args) {
4018 total += arg.external_files.size();
4019 }
4020 uint64_t next_file_number = 0;
4021 Status status = ReserveFileNumbersBeforeIngestion(
4022 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
4023 pending_output_elem, &next_file_number);
4024 if (!status.ok()) {
4025 InstrumentedMutexLock l(&mutex_);
4026 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4027 return status;
4028 }
4029
4030 std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
4031 for (const auto& arg : args) {
4032 auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
4033 ingestion_jobs.emplace_back(
4034 env_, versions_.get(), cfd, immutable_db_options_, file_options_,
4035 &snapshots_, arg.options, &directories_, &event_logger_);
4036 }
4037 std::vector<std::pair<bool, Status>> exec_results;
4038 for (size_t i = 0; i != num_cfs; ++i) {
4039 exec_results.emplace_back(false, Status::OK());
4040 }
4041 // TODO(yanqin) maybe make jobs run in parallel
4042 uint64_t start_file_number = next_file_number;
4043 for (size_t i = 1; i != num_cfs; ++i) {
4044 start_file_number += args[i - 1].external_files.size();
4045 auto* cfd =
4046 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4047 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
4048 exec_results[i].second = ingestion_jobs[i].Prepare(
4049 args[i].external_files, start_file_number, super_version);
4050 exec_results[i].first = true;
4051 CleanupSuperVersion(super_version);
4052 }
4053 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
4054 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
4055 {
4056 auto* cfd =
4057 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
4058 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
4059 exec_results[0].second = ingestion_jobs[0].Prepare(
4060 args[0].external_files, next_file_number, super_version);
4061 exec_results[0].first = true;
4062 CleanupSuperVersion(super_version);
4063 }
4064 for (const auto& exec_result : exec_results) {
4065 if (!exec_result.second.ok()) {
4066 status = exec_result.second;
4067 break;
4068 }
4069 }
4070 if (!status.ok()) {
4071 for (size_t i = 0; i != num_cfs; ++i) {
4072 if (exec_results[i].first) {
4073 ingestion_jobs[i].Cleanup(status);
4074 }
4075 }
4076 InstrumentedMutexLock l(&mutex_);
4077 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4078 return status;
4079 }
4080
4081 std::vector<SuperVersionContext> sv_ctxs;
4082 for (size_t i = 0; i != num_cfs; ++i) {
4083 sv_ctxs.emplace_back(true /* create_superversion */);
4084 }
4085 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
4086 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
4087 TEST_SYNC_POINT("DBImpl::AddFile:Start");
4088 {
4089 InstrumentedMutexLock l(&mutex_);
4090 TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
4091
4092 // Stop writes to the DB by entering both write threads
4093 WriteThread::Writer w;
4094 write_thread_.EnterUnbatched(&w, &mutex_);
4095 WriteThread::Writer nonmem_w;
4096 if (two_write_queues_) {
4097 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4098 }
4099
4100 // When unordered_write is enabled, the keys are writing to memtable in an
4101 // unordered way. If the ingestion job checks memtable key range before the
4102 // key landing in memtable, the ingestion job may skip the necessary
4103 // memtable flush.
4104 // So wait here to ensure there is no pending write to memtable.
4105 WaitForPendingWrites();
4106
4107 num_running_ingest_file_ += static_cast<int>(num_cfs);
4108 TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
4109
4110 bool at_least_one_cf_need_flush = false;
4111 std::vector<bool> need_flush(num_cfs, false);
4112 for (size_t i = 0; i != num_cfs; ++i) {
4113 auto* cfd =
4114 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4115 if (cfd->IsDropped()) {
4116 // TODO (yanqin) investigate whether we should abort ingestion or
4117 // proceed with other non-dropped column families.
4118 status = Status::InvalidArgument(
4119 "cannot ingest an external file into a dropped CF");
4120 break;
4121 }
4122 bool tmp = false;
4123 status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
4124 need_flush[i] = tmp;
4125 at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
4126 if (!status.ok()) {
4127 break;
4128 }
4129 }
4130 TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
4131 &at_least_one_cf_need_flush);
4132
4133 if (status.ok() && at_least_one_cf_need_flush) {
4134 FlushOptions flush_opts;
4135 flush_opts.allow_write_stall = true;
4136 if (immutable_db_options_.atomic_flush) {
4137 autovector<ColumnFamilyData*> cfds_to_flush;
4138 SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
4139 mutex_.Unlock();
4140 status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
4141 FlushReason::kExternalFileIngestion,
4142 true /* writes_stopped */);
4143 mutex_.Lock();
4144 } else {
4145 for (size_t i = 0; i != num_cfs; ++i) {
4146 if (need_flush[i]) {
4147 mutex_.Unlock();
4148 auto* cfd =
4149 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
4150 ->cfd();
4151 status = FlushMemTable(cfd, flush_opts,
4152 FlushReason::kExternalFileIngestion,
4153 true /* writes_stopped */);
4154 mutex_.Lock();
4155 if (!status.ok()) {
4156 break;
4157 }
4158 }
4159 }
4160 }
4161 }
4162 // Run ingestion jobs.
4163 if (status.ok()) {
4164 for (size_t i = 0; i != num_cfs; ++i) {
4165 status = ingestion_jobs[i].Run();
4166 if (!status.ok()) {
4167 break;
4168 }
4169 }
4170 }
4171 if (status.ok()) {
4172 int consumed_seqno_count =
4173 ingestion_jobs[0].ConsumedSequenceNumbersCount();
4174 #ifndef NDEBUG
4175 for (size_t i = 1; i != num_cfs; ++i) {
4176 assert(!!consumed_seqno_count ==
4177 !!ingestion_jobs[i].ConsumedSequenceNumbersCount());
4178 consumed_seqno_count +=
4179 ingestion_jobs[i].ConsumedSequenceNumbersCount();
4180 }
4181 #endif
4182 if (consumed_seqno_count > 0) {
4183 const SequenceNumber last_seqno = versions_->LastSequence();
4184 versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
4185 versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
4186 versions_->SetLastSequence(last_seqno + consumed_seqno_count);
4187 }
4188 autovector<ColumnFamilyData*> cfds_to_commit;
4189 autovector<const MutableCFOptions*> mutable_cf_options_list;
4190 autovector<autovector<VersionEdit*>> edit_lists;
4191 uint32_t num_entries = 0;
4192 for (size_t i = 0; i != num_cfs; ++i) {
4193 auto* cfd =
4194 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4195 if (cfd->IsDropped()) {
4196 continue;
4197 }
4198 cfds_to_commit.push_back(cfd);
4199 mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
4200 autovector<VersionEdit*> edit_list;
4201 edit_list.push_back(ingestion_jobs[i].edit());
4202 edit_lists.push_back(edit_list);
4203 ++num_entries;
4204 }
4205 // Mark the version edits as an atomic group if the number of version
4206 // edits exceeds 1.
4207 if (cfds_to_commit.size() > 1) {
4208 for (auto& edits : edit_lists) {
4209 assert(edits.size() == 1);
4210 edits[0]->MarkAtomicGroup(--num_entries);
4211 }
4212 assert(0 == num_entries);
4213 }
4214 status =
4215 versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
4216 edit_lists, &mutex_, directories_.GetDbDir());
4217 }
4218
4219 if (status.ok()) {
4220 for (size_t i = 0; i != num_cfs; ++i) {
4221 auto* cfd =
4222 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4223 if (!cfd->IsDropped()) {
4224 InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
4225 *cfd->GetLatestMutableCFOptions());
4226 #ifndef NDEBUG
4227 if (0 == i && num_cfs > 1) {
4228 TEST_SYNC_POINT(
4229 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
4230 TEST_SYNC_POINT(
4231 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
4232 }
4233 #endif // !NDEBUG
4234 }
4235 }
4236 }
4237
4238 // Resume writes to the DB
4239 if (two_write_queues_) {
4240 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4241 }
4242 write_thread_.ExitUnbatched(&w);
4243
4244 if (status.ok()) {
4245 for (auto& job : ingestion_jobs) {
4246 job.UpdateStats();
4247 }
4248 }
4249 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4250 num_running_ingest_file_ -= static_cast<int>(num_cfs);
4251 if (0 == num_running_ingest_file_) {
4252 bg_cv_.SignalAll();
4253 }
4254 TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
4255 }
4256 // mutex_ is unlocked here
4257
4258 // Cleanup
4259 for (size_t i = 0; i != num_cfs; ++i) {
4260 sv_ctxs[i].Clean();
4261 // This may rollback jobs that have completed successfully. This is
4262 // intended for atomicity.
4263 ingestion_jobs[i].Cleanup(status);
4264 }
4265 if (status.ok()) {
4266 for (size_t i = 0; i != num_cfs; ++i) {
4267 auto* cfd =
4268 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4269 if (!cfd->IsDropped()) {
4270 NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
4271 }
4272 }
4273 }
4274 return status;
4275 }
4276
CreateColumnFamilyWithImport(const ColumnFamilyOptions & options,const std::string & column_family_name,const ImportColumnFamilyOptions & import_options,const ExportImportFilesMetaData & metadata,ColumnFamilyHandle ** handle)4277 Status DBImpl::CreateColumnFamilyWithImport(
4278 const ColumnFamilyOptions& options, const std::string& column_family_name,
4279 const ImportColumnFamilyOptions& import_options,
4280 const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
4281 assert(handle != nullptr);
4282 assert(*handle == nullptr);
4283 std::string cf_comparator_name = options.comparator->Name();
4284 if (cf_comparator_name != metadata.db_comparator_name) {
4285 return Status::InvalidArgument("Comparator name mismatch");
4286 }
4287
4288 // Create column family.
4289 auto status = CreateColumnFamily(options, column_family_name, handle);
4290 if (!status.ok()) {
4291 return status;
4292 }
4293
4294 // Import sst files from metadata.
4295 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(*handle);
4296 auto cfd = cfh->cfd();
4297 ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
4298 immutable_db_options_, file_options_,
4299 import_options, metadata.files);
4300
4301 SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
4302 VersionEdit dummy_edit;
4303 uint64_t next_file_number = 0;
4304 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4305 {
4306 // Lock db mutex
4307 InstrumentedMutexLock l(&mutex_);
4308 if (error_handler_.IsDBStopped()) {
4309 // Don't import files when there is a bg_error
4310 status = error_handler_.GetBGError();
4311 }
4312
4313 // Make sure that bg cleanup wont delete the files that we are importing
4314 pending_output_elem.reset(new std::list<uint64_t>::iterator(
4315 CaptureCurrentFileNumberInPendingOutputs()));
4316
4317 if (status.ok()) {
4318 // If crash happen after a hard link established, Recover function may
4319 // reuse the file number that has already assigned to the internal file,
4320 // and this will overwrite the external file. To protect the external
4321 // file, we have to make sure the file number will never being reused.
4322 next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
4323 auto cf_options = cfd->GetLatestMutableCFOptions();
4324 status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4325 directories_.GetDbDir());
4326 if (status.ok()) {
4327 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4328 }
4329 }
4330 }
4331 dummy_sv_ctx.Clean();
4332
4333 if (status.ok()) {
4334 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
4335 status = import_job.Prepare(next_file_number, sv);
4336 CleanupSuperVersion(sv);
4337 }
4338
4339 if (status.ok()) {
4340 SuperVersionContext sv_context(true /*create_superversion*/);
4341 {
4342 // Lock db mutex
4343 InstrumentedMutexLock l(&mutex_);
4344
4345 // Stop writes to the DB by entering both write threads
4346 WriteThread::Writer w;
4347 write_thread_.EnterUnbatched(&w, &mutex_);
4348 WriteThread::Writer nonmem_w;
4349 if (two_write_queues_) {
4350 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4351 }
4352
4353 num_running_ingest_file_++;
4354 assert(!cfd->IsDropped());
4355 status = import_job.Run();
4356
4357 // Install job edit [Mutex will be unlocked here]
4358 if (status.ok()) {
4359 auto cf_options = cfd->GetLatestMutableCFOptions();
4360 status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
4361 &mutex_, directories_.GetDbDir());
4362 if (status.ok()) {
4363 InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
4364 }
4365 }
4366
4367 // Resume writes to the DB
4368 if (two_write_queues_) {
4369 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4370 }
4371 write_thread_.ExitUnbatched(&w);
4372
4373 num_running_ingest_file_--;
4374 if (num_running_ingest_file_ == 0) {
4375 bg_cv_.SignalAll();
4376 }
4377 }
4378 // mutex_ is unlocked here
4379
4380 sv_context.Clean();
4381 }
4382
4383 {
4384 InstrumentedMutexLock l(&mutex_);
4385 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4386 }
4387
4388 import_job.Cleanup(status);
4389 if (!status.ok()) {
4390 DropColumnFamily(*handle);
4391 DestroyColumnFamilyHandle(*handle);
4392 *handle = nullptr;
4393 }
4394 return status;
4395 }
4396
VerifyChecksum(const ReadOptions & read_options)4397 Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
4398 Status s;
4399 std::vector<ColumnFamilyData*> cfd_list;
4400 {
4401 InstrumentedMutexLock l(&mutex_);
4402 for (auto cfd : *versions_->GetColumnFamilySet()) {
4403 if (!cfd->IsDropped() && cfd->initialized()) {
4404 cfd->Ref();
4405 cfd_list.push_back(cfd);
4406 }
4407 }
4408 }
4409 std::vector<SuperVersion*> sv_list;
4410 for (auto cfd : cfd_list) {
4411 sv_list.push_back(cfd->GetReferencedSuperVersion(this));
4412 }
4413 for (auto& sv : sv_list) {
4414 VersionStorageInfo* vstorage = sv->current->storage_info();
4415 ColumnFamilyData* cfd = sv->current->cfd();
4416 Options opts;
4417 {
4418 InstrumentedMutexLock l(&mutex_);
4419 opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
4420 cfd->GetLatestCFOptions());
4421 }
4422 for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
4423 for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
4424 j++) {
4425 const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd;
4426 std::string fname = TableFileName(cfd->ioptions()->cf_paths,
4427 fd.GetNumber(), fd.GetPathId());
4428 s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(opts, file_options_,
4429 read_options, fname);
4430 }
4431 }
4432 if (!s.ok()) {
4433 break;
4434 }
4435 }
4436 bool defer_purge =
4437 immutable_db_options().avoid_unnecessary_blocking_io;
4438 {
4439 InstrumentedMutexLock l(&mutex_);
4440 for (auto sv : sv_list) {
4441 if (sv && sv->Unref()) {
4442 sv->Cleanup();
4443 if (defer_purge) {
4444 AddSuperVersionsToFreeQueue(sv);
4445 } else {
4446 delete sv;
4447 }
4448 }
4449 }
4450 if (defer_purge) {
4451 SchedulePurge();
4452 }
4453 for (auto cfd : cfd_list) {
4454 cfd->UnrefAndTryDelete();
4455 }
4456 }
4457 return s;
4458 }
4459
NotifyOnExternalFileIngested(ColumnFamilyData * cfd,const ExternalSstFileIngestionJob & ingestion_job)4460 void DBImpl::NotifyOnExternalFileIngested(
4461 ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
4462 if (immutable_db_options_.listeners.empty()) {
4463 return;
4464 }
4465
4466 for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
4467 ExternalFileIngestionInfo info;
4468 info.cf_name = cfd->GetName();
4469 info.external_file_path = f.external_file_path;
4470 info.internal_file_path = f.internal_file_path;
4471 info.global_seqno = f.assigned_seqno;
4472 info.table_properties = f.table_properties;
4473 for (auto listener : immutable_db_options_.listeners) {
4474 listener->OnExternalFileIngested(this, info);
4475 }
4476 }
4477 }
4478
WaitForIngestFile()4479 void DBImpl::WaitForIngestFile() {
4480 mutex_.AssertHeld();
4481 while (num_running_ingest_file_ > 0) {
4482 bg_cv_.Wait();
4483 }
4484 }
4485
StartTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4486 Status DBImpl::StartTrace(const TraceOptions& trace_options,
4487 std::unique_ptr<TraceWriter>&& trace_writer) {
4488 InstrumentedMutexLock lock(&trace_mutex_);
4489 tracer_.reset(new Tracer(env_, trace_options, std::move(trace_writer)));
4490 return Status::OK();
4491 }
4492
EndTrace()4493 Status DBImpl::EndTrace() {
4494 InstrumentedMutexLock lock(&trace_mutex_);
4495 Status s;
4496 if (tracer_ != nullptr) {
4497 s = tracer_->Close();
4498 tracer_.reset();
4499 } else {
4500 return Status::IOError("No trace file to close");
4501 }
4502 return s;
4503 }
4504
StartBlockCacheTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4505 Status DBImpl::StartBlockCacheTrace(
4506 const TraceOptions& trace_options,
4507 std::unique_ptr<TraceWriter>&& trace_writer) {
4508 return block_cache_tracer_.StartTrace(env_, trace_options,
4509 std::move(trace_writer));
4510 }
4511
EndBlockCacheTrace()4512 Status DBImpl::EndBlockCacheTrace() {
4513 block_cache_tracer_.EndTrace();
4514 return Status::OK();
4515 }
4516
TraceIteratorSeek(const uint32_t & cf_id,const Slice & key)4517 Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
4518 Status s;
4519 if (tracer_) {
4520 InstrumentedMutexLock lock(&trace_mutex_);
4521 if (tracer_) {
4522 s = tracer_->IteratorSeek(cf_id, key);
4523 }
4524 }
4525 return s;
4526 }
4527
TraceIteratorSeekForPrev(const uint32_t & cf_id,const Slice & key)4528 Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
4529 const Slice& key) {
4530 Status s;
4531 if (tracer_) {
4532 InstrumentedMutexLock lock(&trace_mutex_);
4533 if (tracer_) {
4534 s = tracer_->IteratorSeekForPrev(cf_id, key);
4535 }
4536 }
4537 return s;
4538 }
4539
ReserveFileNumbersBeforeIngestion(ColumnFamilyData * cfd,uint64_t num,std::unique_ptr<std::list<uint64_t>::iterator> & pending_output_elem,uint64_t * next_file_number)4540 Status DBImpl::ReserveFileNumbersBeforeIngestion(
4541 ColumnFamilyData* cfd, uint64_t num,
4542 std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
4543 uint64_t* next_file_number) {
4544 Status s;
4545 SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
4546 assert(nullptr != next_file_number);
4547 InstrumentedMutexLock l(&mutex_);
4548 if (error_handler_.IsDBStopped()) {
4549 // Do not ingest files when there is a bg_error
4550 return error_handler_.GetBGError();
4551 }
4552 pending_output_elem.reset(new std::list<uint64_t>::iterator(
4553 CaptureCurrentFileNumberInPendingOutputs()));
4554 *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
4555 auto cf_options = cfd->GetLatestMutableCFOptions();
4556 VersionEdit dummy_edit;
4557 // If crash happen after a hard link established, Recover function may
4558 // reuse the file number that has already assigned to the internal file,
4559 // and this will overwrite the external file. To protect the external
4560 // file, we have to make sure the file number will never being reused.
4561 s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4562 directories_.GetDbDir());
4563 if (s.ok()) {
4564 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4565 }
4566 dummy_sv_ctx.Clean();
4567 return s;
4568 }
4569
GetCreationTimeOfOldestFile(uint64_t * creation_time)4570 Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
4571 if (mutable_db_options_.max_open_files == -1) {
4572 uint64_t oldest_time = port::kMaxUint64;
4573 for (auto cfd : *versions_->GetColumnFamilySet()) {
4574 if (!cfd->IsDropped()) {
4575 uint64_t ctime;
4576 {
4577 SuperVersion* sv = GetAndRefSuperVersion(cfd);
4578 Version* version = sv->current;
4579 version->GetCreationTimeOfOldestFile(&ctime);
4580 ReturnAndCleanupSuperVersion(cfd, sv);
4581 }
4582
4583 if (ctime < oldest_time) {
4584 oldest_time = ctime;
4585 }
4586 if (oldest_time == 0) {
4587 break;
4588 }
4589 }
4590 }
4591 *creation_time = oldest_time;
4592 return Status::OK();
4593 } else {
4594 return Status::NotSupported("This API only works if max_open_files = -1");
4595 }
4596 }
4597 #endif // ROCKSDB_LITE
4598
4599 } // namespace ROCKSDB_NAMESPACE
4600