1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10 
11 #include <stdint.h>
12 #ifdef OS_SOLARIS
13 #include <alloca.h>
14 #endif
15 
16 #include <algorithm>
17 #include <cinttypes>
18 #include <cstdio>
19 #include <map>
20 #include <set>
21 #include <stdexcept>
22 #include <string>
23 #include <unordered_map>
24 #include <utility>
25 #include <vector>
26 
27 #include "db/arena_wrapped_db_iter.h"
28 #include "db/builder.h"
29 #include "db/compaction/compaction_job.h"
30 #include "db/db_info_dumper.h"
31 #include "db/db_iter.h"
32 #include "db/dbformat.h"
33 #include "db/error_handler.h"
34 #include "db/event_helpers.h"
35 #include "db/external_sst_file_ingestion_job.h"
36 #include "db/flush_job.h"
37 #include "db/forward_iterator.h"
38 #include "db/import_column_family_job.h"
39 #include "db/job_context.h"
40 #include "db/log_reader.h"
41 #include "db/log_writer.h"
42 #include "db/malloc_stats.h"
43 #include "db/memtable.h"
44 #include "db/memtable_list.h"
45 #include "db/merge_context.h"
46 #include "db/merge_helper.h"
47 #include "db/range_tombstone_fragmenter.h"
48 #include "db/table_cache.h"
49 #include "db/table_properties_collector.h"
50 #include "db/transaction_log_impl.h"
51 #include "db/version_set.h"
52 #include "db/write_batch_internal.h"
53 #include "db/write_callback.h"
54 #include "env/composite_env_wrapper.h"
55 #include "file/file_util.h"
56 #include "file/filename.h"
57 #include "file/random_access_file_reader.h"
58 #include "file/sst_file_manager_impl.h"
59 #include "logging/auto_roll_logger.h"
60 #include "logging/log_buffer.h"
61 #include "logging/logging.h"
62 #include "memtable/hash_linklist_rep.h"
63 #include "memtable/hash_skiplist_rep.h"
64 #include "monitoring/in_memory_stats_history.h"
65 #include "monitoring/iostats_context_imp.h"
66 #include "monitoring/perf_context_imp.h"
67 #include "monitoring/persistent_stats_history.h"
68 #include "monitoring/thread_status_updater.h"
69 #include "monitoring/thread_status_util.h"
70 #include "options/cf_options.h"
71 #include "options/options_helper.h"
72 #include "options/options_parser.h"
73 #include "port/port.h"
74 #include "rocksdb/cache.h"
75 #include "rocksdb/compaction_filter.h"
76 #include "rocksdb/convenience.h"
77 #include "rocksdb/db.h"
78 #include "rocksdb/env.h"
79 #include "rocksdb/merge_operator.h"
80 #include "rocksdb/statistics.h"
81 #include "rocksdb/stats_history.h"
82 #include "rocksdb/status.h"
83 #include "rocksdb/table.h"
84 #include "rocksdb/write_buffer_manager.h"
85 #include "table/block_based/block.h"
86 #include "table/block_based/block_based_table_factory.h"
87 #include "table/get_context.h"
88 #include "table/merging_iterator.h"
89 #include "table/multiget_context.h"
90 #include "table/table_builder.h"
91 #include "table/two_level_iterator.h"
92 #include "test_util/sync_point.h"
93 #include "tools/sst_dump_tool_imp.h"
94 #include "util/autovector.h"
95 #include "util/build_version.h"
96 #include "util/cast_util.h"
97 #include "util/coding.h"
98 #include "util/compression.h"
99 #include "util/crc32c.h"
100 #include "util/mutexlock.h"
101 #include "util/stop_watch.h"
102 #include "util/string_util.h"
103 
104 namespace ROCKSDB_NAMESPACE {
105 
106 const std::string kDefaultColumnFamilyName("default");
107 const std::string kPersistentStatsColumnFamilyName(
108     "___rocksdb_stats_history___");
109 void DumpRocksDBBuildVersion(Logger* log);
110 
GetCompressionFlush(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options)111 CompressionType GetCompressionFlush(
112     const ImmutableCFOptions& ioptions,
113     const MutableCFOptions& mutable_cf_options) {
114   // Compressing memtable flushes might not help unless the sequential load
115   // optimization is used for leveled compaction. Otherwise the CPU and
116   // latency overhead is not offset by saving much space.
117   if (ioptions.compaction_style == kCompactionStyleUniversal) {
118     if (mutable_cf_options.compaction_options_universal
119             .compression_size_percent < 0) {
120       return mutable_cf_options.compression;
121     } else {
122       return kNoCompression;
123     }
124   } else if (!ioptions.compression_per_level.empty()) {
125     // For leveled compress when min_level_to_compress != 0.
126     return ioptions.compression_per_level[0];
127   } else {
128     return mutable_cf_options.compression;
129   }
130 }
131 
132 namespace {
DumpSupportInfo(Logger * logger)133 void DumpSupportInfo(Logger* logger) {
134   ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
135   for (auto& compression : OptionsHelper::compression_type_string_map) {
136     if (compression.second != kNoCompression &&
137         compression.second != kDisableCompressionOption) {
138       ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
139                        CompressionTypeSupported(compression.second));
140     }
141   }
142   ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
143                    crc32c::IsFastCrc32Supported().c_str());
144 }
145 }  // namespace
146 
DBImpl(const DBOptions & options,const std::string & dbname,const bool seq_per_batch,const bool batch_per_txn)147 DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
148                const bool seq_per_batch, const bool batch_per_txn)
149     : dbname_(dbname),
150       own_info_log_(options.info_log == nullptr),
151       initial_db_options_(SanitizeOptions(dbname, options)),
152       env_(initial_db_options_.env),
153       fs_(initial_db_options_.env->GetFileSystem()),
154       immutable_db_options_(initial_db_options_),
155       mutable_db_options_(initial_db_options_),
156       stats_(immutable_db_options_.statistics.get()),
157       mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
158              immutable_db_options_.use_adaptive_mutex),
159       default_cf_handle_(nullptr),
160       max_total_in_memory_state_(0),
161       file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
162       file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
163           file_options_, immutable_db_options_)),
164       seq_per_batch_(seq_per_batch),
165       batch_per_txn_(batch_per_txn),
166       db_lock_(nullptr),
167       shutting_down_(false),
168       manual_compaction_paused_(false),
169       bg_cv_(&mutex_),
170       logfile_number_(0),
171       log_dir_synced_(false),
172       log_empty_(true),
173       persist_stats_cf_handle_(nullptr),
174       log_sync_cv_(&mutex_),
175       total_log_size_(0),
176       is_snapshot_supported_(true),
177       write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
178       write_thread_(immutable_db_options_),
179       nonmem_write_thread_(immutable_db_options_),
180       write_controller_(mutable_db_options_.delayed_write_rate),
181       last_batch_group_size_(0),
182       unscheduled_flushes_(0),
183       unscheduled_compactions_(0),
184       bg_bottom_compaction_scheduled_(0),
185       bg_compaction_scheduled_(0),
186       num_running_compactions_(0),
187       bg_flush_scheduled_(0),
188       num_running_flushes_(0),
189       bg_purge_scheduled_(0),
190       disable_delete_obsolete_files_(0),
191       pending_purge_obsolete_files_(0),
192       delete_obsolete_files_last_run_(env_->NowMicros()),
193       last_stats_dump_time_microsec_(0),
194       next_job_id_(1),
195       has_unpersisted_data_(false),
196       unable_to_release_oldest_log_(false),
197       num_running_ingest_file_(0),
198 #ifndef ROCKSDB_LITE
199       wal_manager_(immutable_db_options_, file_options_, seq_per_batch),
200 #endif  // ROCKSDB_LITE
201       event_logger_(immutable_db_options_.info_log.get()),
202       bg_work_paused_(0),
203       bg_compaction_paused_(0),
204       refitting_level_(false),
205       opened_successfully_(false),
206       two_write_queues_(options.two_write_queues),
207       manual_wal_flush_(options.manual_wal_flush),
208       // last_sequencee_ is always maintained by the main queue that also writes
209       // to the memtable. When two_write_queues_ is disabled last seq in
210       // memtable is the same as last seq published to the readers. When it is
211       // enabled but seq_per_batch_ is disabled, last seq in memtable still
212       // indicates last published seq since wal-only writes that go to the 2nd
213       // queue do not consume a sequence number. Otherwise writes performed by
214       // the 2nd queue could change what is visible to the readers. In this
215       // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
216       // separate variable to indicate the last published sequence.
217       last_seq_same_as_publish_seq_(
218           !(seq_per_batch && options.two_write_queues)),
219       // Since seq_per_batch_ is currently set only by WritePreparedTxn which
220       // requires a custom gc for compaction, we use that to set use_custom_gc_
221       // as well.
222       use_custom_gc_(seq_per_batch),
223       shutdown_initiated_(false),
224       own_sfm_(options.sst_file_manager == nullptr),
225       preserve_deletes_(options.preserve_deletes),
226       closed_(false),
227       error_handler_(this, immutable_db_options_, &mutex_),
228       atomic_flush_install_cv_(&mutex_) {
229   // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
230   // WriteUnprepared, which should use seq_per_batch_.
231   assert(batch_per_txn_ || seq_per_batch_);
232   env_->GetAbsolutePath(dbname, &db_absolute_path_);
233 
234   // Reserve ten files or so for other uses and give the rest to TableCache.
235   // Give a large number for setting of "infinite" open files.
236   const int table_cache_size = (mutable_db_options_.max_open_files == -1)
237                                    ? TableCache::kInfiniteCapacity
238                                    : mutable_db_options_.max_open_files - 10;
239   LRUCacheOptions co;
240   co.capacity = table_cache_size;
241   co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
242   co.metadata_charge_policy = kDontChargeCacheMetadata;
243   table_cache_ = NewLRUCache(co);
244 
245   versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
246                                  table_cache_.get(), write_buffer_manager_,
247                                  &write_controller_, &block_cache_tracer_));
248   column_family_memtables_.reset(
249       new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
250 
251   DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
252   DumpDBFileSummary(immutable_db_options_, dbname_);
253   immutable_db_options_.Dump(immutable_db_options_.info_log.get());
254   mutable_db_options_.Dump(immutable_db_options_.info_log.get());
255   DumpSupportInfo(immutable_db_options_.info_log.get());
256 
257   // always open the DB with 0 here, which means if preserve_deletes_==true
258   // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
259   // is called by client and this seqnum is advanced.
260   preserve_deletes_seqnum_.store(0);
261 }
262 
Resume()263 Status DBImpl::Resume() {
264   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
265 
266   InstrumentedMutexLock db_mutex(&mutex_);
267 
268   if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
269     // Nothing to do
270     return Status::OK();
271   }
272 
273   if (error_handler_.IsRecoveryInProgress()) {
274     // Don't allow a mix of manual and automatic recovery
275     return Status::Busy();
276   }
277 
278   mutex_.Unlock();
279   Status s = error_handler_.RecoverFromBGError(true);
280   mutex_.Lock();
281   return s;
282 }
283 
284 // This function implements the guts of recovery from a background error. It
285 // is eventually called for both manual as well as automatic recovery. It does
286 // the following -
287 // 1. Wait for currently scheduled background flush/compaction to exit, in
288 //    order to inadvertently causing an error and thinking recovery failed
289 // 2. Flush memtables if there's any data for all the CFs. This may result
290 //    another error, which will be saved by error_handler_ and reported later
291 //    as the recovery status
292 // 3. Find and delete any obsolete files
293 // 4. Schedule compactions if needed for all the CFs. This is needed as the
294 //    flush in the prior step might have been a no-op for some CFs, which
295 //    means a new super version wouldn't have been installed
ResumeImpl()296 Status DBImpl::ResumeImpl() {
297   mutex_.AssertHeld();
298   WaitForBackgroundWork();
299 
300   Status bg_error = error_handler_.GetBGError();
301   Status s;
302   if (shutdown_initiated_) {
303     // Returning shutdown status to SFM during auto recovery will cause it
304     // to abort the recovery and allow the shutdown to progress
305     s = Status::ShutdownInProgress();
306   }
307   if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
308     ROCKS_LOG_INFO(
309         immutable_db_options_.info_log,
310         "DB resume requested but failed due to Fatal/Unrecoverable error");
311     s = bg_error;
312   }
313 
314   // Make sure the IO Status stored in version set is set to OK.
315   if (s.ok()) {
316     versions_->SetIOStatusOK();
317   }
318 
319   // We cannot guarantee consistency of the WAL. So force flush Memtables of
320   // all the column families
321   if (s.ok()) {
322     FlushOptions flush_opts;
323     // We allow flush to stall write since we are trying to resume from error.
324     flush_opts.allow_write_stall = true;
325     if (immutable_db_options_.atomic_flush) {
326       autovector<ColumnFamilyData*> cfds;
327       SelectColumnFamiliesForAtomicFlush(&cfds);
328       mutex_.Unlock();
329       s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery);
330       mutex_.Lock();
331     } else {
332       for (auto cfd : *versions_->GetColumnFamilySet()) {
333         if (cfd->IsDropped()) {
334           continue;
335         }
336         cfd->Ref();
337         mutex_.Unlock();
338         s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
339         mutex_.Lock();
340         cfd->UnrefAndTryDelete();
341         if (!s.ok()) {
342           break;
343         }
344       }
345     }
346     if (!s.ok()) {
347       ROCKS_LOG_INFO(immutable_db_options_.info_log,
348                      "DB resume requested but failed due to Flush failure [%s]",
349                      s.ToString().c_str());
350     }
351   }
352 
353   JobContext job_context(0);
354   FindObsoleteFiles(&job_context, true);
355   if (s.ok()) {
356     s = error_handler_.ClearBGError();
357   }
358   mutex_.Unlock();
359 
360   job_context.manifest_file_number = 1;
361   if (job_context.HaveSomethingToDelete()) {
362     PurgeObsoleteFiles(job_context);
363   }
364   job_context.Clean();
365 
366   if (s.ok()) {
367     ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
368   }
369   mutex_.Lock();
370   // Check for shutdown again before scheduling further compactions,
371   // since we released and re-acquired the lock above
372   if (shutdown_initiated_) {
373     s = Status::ShutdownInProgress();
374   }
375   if (s.ok()) {
376     for (auto cfd : *versions_->GetColumnFamilySet()) {
377       SchedulePendingCompaction(cfd);
378     }
379     MaybeScheduleFlushOrCompaction();
380   }
381 
382   // Wake up any waiters - in this case, it could be the shutdown thread
383   bg_cv_.SignalAll();
384 
385   // No need to check BGError again. If something happened, event listener would
386   // be notified and the operation causing it would have failed
387   return s;
388 }
389 
WaitForBackgroundWork()390 void DBImpl::WaitForBackgroundWork() {
391   // Wait for background work to finish
392   while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
393          bg_flush_scheduled_) {
394     bg_cv_.Wait();
395   }
396 }
397 
398 // Will lock the mutex_,  will wait for completion if wait is true
CancelAllBackgroundWork(bool wait)399 void DBImpl::CancelAllBackgroundWork(bool wait) {
400   ROCKS_LOG_INFO(immutable_db_options_.info_log,
401                  "Shutdown: canceling all background work");
402 
403   if (thread_dump_stats_ != nullptr) {
404     thread_dump_stats_->cancel();
405     thread_dump_stats_.reset();
406   }
407   if (thread_persist_stats_ != nullptr) {
408     thread_persist_stats_->cancel();
409     thread_persist_stats_.reset();
410   }
411   InstrumentedMutexLock l(&mutex_);
412   if (!shutting_down_.load(std::memory_order_acquire) &&
413       has_unpersisted_data_.load(std::memory_order_relaxed) &&
414       !mutable_db_options_.avoid_flush_during_shutdown) {
415     if (immutable_db_options_.atomic_flush) {
416       autovector<ColumnFamilyData*> cfds;
417       SelectColumnFamiliesForAtomicFlush(&cfds);
418       mutex_.Unlock();
419       AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
420       mutex_.Lock();
421     } else {
422       for (auto cfd : *versions_->GetColumnFamilySet()) {
423         if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
424           cfd->Ref();
425           mutex_.Unlock();
426           FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
427           mutex_.Lock();
428           cfd->UnrefAndTryDelete();
429         }
430       }
431     }
432     versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
433   }
434 
435   shutting_down_.store(true, std::memory_order_release);
436   bg_cv_.SignalAll();
437   if (!wait) {
438     return;
439   }
440   WaitForBackgroundWork();
441 }
442 
CloseHelper()443 Status DBImpl::CloseHelper() {
444   // Guarantee that there is no background error recovery in progress before
445   // continuing with the shutdown
446   mutex_.Lock();
447   shutdown_initiated_ = true;
448   error_handler_.CancelErrorRecovery();
449   while (error_handler_.IsRecoveryInProgress()) {
450     bg_cv_.Wait();
451   }
452   mutex_.Unlock();
453 
454   // CancelAllBackgroundWork called with false means we just set the shutdown
455   // marker. After this we do a variant of the waiting and unschedule work
456   // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
457   CancelAllBackgroundWork(false);
458   int bottom_compactions_unscheduled =
459       env_->UnSchedule(this, Env::Priority::BOTTOM);
460   int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
461   int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
462   Status ret;
463   mutex_.Lock();
464   bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
465   bg_compaction_scheduled_ -= compactions_unscheduled;
466   bg_flush_scheduled_ -= flushes_unscheduled;
467 
468   // Wait for background work to finish
469   while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
470          bg_flush_scheduled_ || bg_purge_scheduled_ ||
471          pending_purge_obsolete_files_ ||
472          error_handler_.IsRecoveryInProgress()) {
473     TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
474     bg_cv_.Wait();
475   }
476   TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
477                            &files_grabbed_for_purge_);
478   EraseThreadStatusDbInfo();
479   flush_scheduler_.Clear();
480   trim_history_scheduler_.Clear();
481 
482   while (!flush_queue_.empty()) {
483     const FlushRequest& flush_req = PopFirstFromFlushQueue();
484     for (const auto& iter : flush_req) {
485       iter.first->UnrefAndTryDelete();
486     }
487   }
488   while (!compaction_queue_.empty()) {
489     auto cfd = PopFirstFromCompactionQueue();
490     cfd->UnrefAndTryDelete();
491   }
492 
493   if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
494     // we need to delete handle outside of lock because it does its own locking
495     mutex_.Unlock();
496     if (default_cf_handle_) {
497       delete default_cf_handle_;
498       default_cf_handle_ = nullptr;
499     }
500     if (persist_stats_cf_handle_) {
501       delete persist_stats_cf_handle_;
502       persist_stats_cf_handle_ = nullptr;
503     }
504     mutex_.Lock();
505   }
506 
507   // Clean up obsolete files due to SuperVersion release.
508   // (1) Need to delete to obsolete files before closing because RepairDB()
509   // scans all existing files in the file system and builds manifest file.
510   // Keeping obsolete files confuses the repair process.
511   // (2) Need to check if we Open()/Recover() the DB successfully before
512   // deleting because if VersionSet recover fails (may be due to corrupted
513   // manifest file), it is not able to identify live files correctly. As a
514   // result, all "live" files can get deleted by accident. However, corrupted
515   // manifest is recoverable by RepairDB().
516   if (opened_successfully_) {
517     JobContext job_context(next_job_id_.fetch_add(1));
518     FindObsoleteFiles(&job_context, true);
519 
520     mutex_.Unlock();
521     // manifest number starting from 2
522     job_context.manifest_file_number = 1;
523     if (job_context.HaveSomethingToDelete()) {
524       PurgeObsoleteFiles(job_context);
525     }
526     job_context.Clean();
527     mutex_.Lock();
528   }
529 
530   for (auto l : logs_to_free_) {
531     delete l;
532   }
533   for (auto& log : logs_) {
534     uint64_t log_number = log.writer->get_log_number();
535     Status s = log.ClearWriter();
536     if (!s.ok()) {
537       ROCKS_LOG_WARN(
538           immutable_db_options_.info_log,
539           "Unable to Sync WAL file %s with error -- %s",
540           LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
541           s.ToString().c_str());
542       // Retain the first error
543       if (ret.ok()) {
544         ret = s;
545       }
546     }
547   }
548   logs_.clear();
549 
550   // Table cache may have table handles holding blocks from the block cache.
551   // We need to release them before the block cache is destroyed. The block
552   // cache may be destroyed inside versions_.reset(), when column family data
553   // list is destroyed, so leaving handles in table cache after
554   // versions_.reset() may cause issues.
555   // Here we clean all unreferenced handles in table cache.
556   // Now we assume all user queries have finished, so only version set itself
557   // can possibly hold the blocks from block cache. After releasing unreferenced
558   // handles here, only handles held by version set left and inside
559   // versions_.reset(), we will release them. There, we need to make sure every
560   // time a handle is released, we erase it from the cache too. By doing that,
561   // we can guarantee that after versions_.reset(), table cache is empty
562   // so the cache can be safely destroyed.
563   table_cache_->EraseUnRefEntries();
564 
565   for (auto& txn_entry : recovered_transactions_) {
566     delete txn_entry.second;
567   }
568 
569   // versions need to be destroyed before table_cache since it can hold
570   // references to table_cache.
571   versions_.reset();
572   mutex_.Unlock();
573   if (db_lock_ != nullptr) {
574     env_->UnlockFile(db_lock_);
575   }
576 
577   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
578   LogFlush(immutable_db_options_.info_log);
579 
580 #ifndef ROCKSDB_LITE
581   // If the sst_file_manager was allocated by us during DB::Open(), ccall
582   // Close() on it before closing the info_log. Otherwise, background thread
583   // in SstFileManagerImpl might try to log something
584   if (immutable_db_options_.sst_file_manager && own_sfm_) {
585     auto sfm = static_cast<SstFileManagerImpl*>(
586         immutable_db_options_.sst_file_manager.get());
587     sfm->Close();
588   }
589 #endif  // ROCKSDB_LITE
590 
591   if (immutable_db_options_.info_log && own_info_log_) {
592     Status s = immutable_db_options_.info_log->Close();
593     if (ret.ok()) {
594       ret = s;
595     }
596   }
597 
598   if (ret.IsAborted()) {
599     // Reserve IsAborted() error for those where users didn't release
600     // certain resource and they can release them and come back and
601     // retry. In this case, we wrap this exception to something else.
602     return Status::Incomplete(ret.ToString());
603   }
604   return ret;
605 }
606 
CloseImpl()607 Status DBImpl::CloseImpl() { return CloseHelper(); }
608 
~DBImpl()609 DBImpl::~DBImpl() {
610   if (!closed_) {
611     closed_ = true;
612     CloseHelper();
613   }
614 }
615 
MaybeIgnoreError(Status * s) const616 void DBImpl::MaybeIgnoreError(Status* s) const {
617   if (s->ok() || immutable_db_options_.paranoid_checks) {
618     // No change needed
619   } else {
620     ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
621                    s->ToString().c_str());
622     *s = Status::OK();
623   }
624 }
625 
CreateArchivalDirectory()626 const Status DBImpl::CreateArchivalDirectory() {
627   if (immutable_db_options_.wal_ttl_seconds > 0 ||
628       immutable_db_options_.wal_size_limit_mb > 0) {
629     std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
630     return env_->CreateDirIfMissing(archivalPath);
631   }
632   return Status::OK();
633 }
634 
PrintStatistics()635 void DBImpl::PrintStatistics() {
636   auto dbstats = immutable_db_options_.statistics.get();
637   if (dbstats) {
638     ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
639                    dbstats->ToString().c_str());
640   }
641 }
642 
StartTimedTasks()643 void DBImpl::StartTimedTasks() {
644   unsigned int stats_dump_period_sec = 0;
645   unsigned int stats_persist_period_sec = 0;
646   {
647     InstrumentedMutexLock l(&mutex_);
648     stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
649     if (stats_dump_period_sec > 0) {
650       if (!thread_dump_stats_) {
651         thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
652             [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
653             static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond));
654       }
655     }
656     stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
657     if (stats_persist_period_sec > 0) {
658       if (!thread_persist_stats_) {
659         thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
660             [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
661             static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond));
662       }
663     }
664   }
665 }
666 
667 // esitmate the total size of stats_history_
EstimateInMemoryStatsHistorySize() const668 size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
669   size_t size_total =
670       sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
671   if (stats_history_.size() == 0) return size_total;
672   size_t size_per_slice =
673       sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
674   // non-empty map, stats_history_.begin() guaranteed to exist
675   std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
676   for (const auto& pairs : sample_slice) {
677     size_per_slice +=
678         pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
679   }
680   size_total = size_per_slice * stats_history_.size();
681   return size_total;
682 }
683 
PersistStats()684 void DBImpl::PersistStats() {
685   TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
686 #ifndef ROCKSDB_LITE
687   if (shutdown_initiated_) {
688     return;
689   }
690   uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond;
691   Statistics* statistics = immutable_db_options_.statistics.get();
692   if (!statistics) {
693     return;
694   }
695   size_t stats_history_size_limit = 0;
696   {
697     InstrumentedMutexLock l(&mutex_);
698     stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
699   }
700 
701   std::map<std::string, uint64_t> stats_map;
702   if (!statistics->getTickerMap(&stats_map)) {
703     return;
704   }
705   ROCKS_LOG_INFO(immutable_db_options_.info_log,
706                  "------- PERSISTING STATS -------");
707 
708   if (immutable_db_options_.persist_stats_to_disk) {
709     WriteBatch batch;
710     if (stats_slice_initialized_) {
711       ROCKS_LOG_INFO(immutable_db_options_.info_log,
712                      "Reading %" ROCKSDB_PRIszt " stats from statistics\n",
713                      stats_slice_.size());
714       for (const auto& stat : stats_map) {
715         char key[100];
716         int length =
717             EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
718         // calculate the delta from last time
719         if (stats_slice_.find(stat.first) != stats_slice_.end()) {
720           uint64_t delta = stat.second - stats_slice_[stat.first];
721           batch.Put(persist_stats_cf_handle_, Slice(key, std::min(100, length)),
722                     ToString(delta));
723         }
724       }
725     }
726     stats_slice_initialized_ = true;
727     std::swap(stats_slice_, stats_map);
728     WriteOptions wo;
729     wo.low_pri = true;
730     wo.no_slowdown = true;
731     wo.sync = false;
732     Status s = Write(wo, &batch);
733     if (!s.ok()) {
734       ROCKS_LOG_INFO(immutable_db_options_.info_log,
735                      "Writing to persistent stats CF failed -- %s",
736                      s.ToString().c_str());
737     } else {
738       ROCKS_LOG_INFO(immutable_db_options_.info_log,
739                      "Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
740                      " to persistent stats CF succeeded",
741                      stats_slice_.size(), now_seconds);
742     }
743     // TODO(Zhongyi): add purging for persisted data
744   } else {
745     InstrumentedMutexLock l(&stats_history_mutex_);
746     // calculate the delta from last time
747     if (stats_slice_initialized_) {
748       std::map<std::string, uint64_t> stats_delta;
749       for (const auto& stat : stats_map) {
750         if (stats_slice_.find(stat.first) != stats_slice_.end()) {
751           stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
752         }
753       }
754       ROCKS_LOG_INFO(immutable_db_options_.info_log,
755                      "Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
756                      " to in-memory stats history",
757                      stats_slice_.size(), now_seconds);
758       stats_history_[now_seconds] = stats_delta;
759     }
760     stats_slice_initialized_ = true;
761     std::swap(stats_slice_, stats_map);
762     TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
763 
764     // delete older stats snapshots to control memory consumption
765     size_t stats_history_size = EstimateInMemoryStatsHistorySize();
766     bool purge_needed = stats_history_size > stats_history_size_limit;
767     ROCKS_LOG_INFO(immutable_db_options_.info_log,
768                    "[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
769                    " bytes, slice count: %" ROCKSDB_PRIszt,
770                    stats_history_size, stats_history_.size());
771     while (purge_needed && !stats_history_.empty()) {
772       stats_history_.erase(stats_history_.begin());
773       purge_needed =
774           EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
775     }
776     ROCKS_LOG_INFO(immutable_db_options_.info_log,
777                    "[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
778                    " bytes, slice count: %" ROCKSDB_PRIszt,
779                    stats_history_size, stats_history_.size());
780   }
781 #endif  // !ROCKSDB_LITE
782 }
783 
FindStatsByTime(uint64_t start_time,uint64_t end_time,uint64_t * new_time,std::map<std::string,uint64_t> * stats_map)784 bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
785                              uint64_t* new_time,
786                              std::map<std::string, uint64_t>* stats_map) {
787   assert(new_time);
788   assert(stats_map);
789   if (!new_time || !stats_map) return false;
790   // lock when search for start_time
791   {
792     InstrumentedMutexLock l(&stats_history_mutex_);
793     auto it = stats_history_.lower_bound(start_time);
794     if (it != stats_history_.end() && it->first < end_time) {
795       // make a copy for timestamp and stats_map
796       *new_time = it->first;
797       *stats_map = it->second;
798       return true;
799     } else {
800       return false;
801     }
802   }
803 }
804 
GetStatsHistory(uint64_t start_time,uint64_t end_time,std::unique_ptr<StatsHistoryIterator> * stats_iterator)805 Status DBImpl::GetStatsHistory(
806     uint64_t start_time, uint64_t end_time,
807     std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
808   if (!stats_iterator) {
809     return Status::InvalidArgument("stats_iterator not preallocated.");
810   }
811   if (immutable_db_options_.persist_stats_to_disk) {
812     stats_iterator->reset(
813         new PersistentStatsHistoryIterator(start_time, end_time, this));
814   } else {
815     stats_iterator->reset(
816         new InMemoryStatsHistoryIterator(start_time, end_time, this));
817   }
818   return (*stats_iterator)->status();
819 }
820 
DumpStats()821 void DBImpl::DumpStats() {
822   TEST_SYNC_POINT("DBImpl::DumpStats:1");
823 #ifndef ROCKSDB_LITE
824   const DBPropertyInfo* cf_property_info =
825       GetPropertyInfo(DB::Properties::kCFStats);
826   assert(cf_property_info != nullptr);
827   const DBPropertyInfo* db_property_info =
828       GetPropertyInfo(DB::Properties::kDBStats);
829   assert(db_property_info != nullptr);
830 
831   std::string stats;
832   if (shutdown_initiated_) {
833     return;
834   }
835   {
836     InstrumentedMutexLock l(&mutex_);
837     default_cf_internal_stats_->GetStringProperty(
838         *db_property_info, DB::Properties::kDBStats, &stats);
839     for (auto cfd : *versions_->GetColumnFamilySet()) {
840       if (cfd->initialized()) {
841         cfd->internal_stats()->GetStringProperty(
842             *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
843       }
844     }
845     for (auto cfd : *versions_->GetColumnFamilySet()) {
846       if (cfd->initialized()) {
847         cfd->internal_stats()->GetStringProperty(
848             *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
849       }
850     }
851   }
852   TEST_SYNC_POINT("DBImpl::DumpStats:2");
853   ROCKS_LOG_INFO(immutable_db_options_.info_log,
854                  "------- DUMPING STATS -------");
855   ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
856   if (immutable_db_options_.dump_malloc_stats) {
857     stats.clear();
858     DumpMallocStats(&stats);
859     if (!stats.empty()) {
860       ROCKS_LOG_INFO(immutable_db_options_.info_log,
861                      "------- Malloc STATS -------");
862       ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
863     }
864   }
865 #endif  // !ROCKSDB_LITE
866 
867   PrintStatistics();
868 }
869 
TablesRangeTombstoneSummary(ColumnFamilyHandle * column_family,int max_entries_to_print,std::string * out_str)870 Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
871                                            int max_entries_to_print,
872                                            std::string* out_str) {
873   auto* cfh =
874       static_cast_with_check<ColumnFamilyHandleImpl, ColumnFamilyHandle>(
875           column_family);
876   ColumnFamilyData* cfd = cfh->cfd();
877 
878   SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
879   Version* version = super_version->current;
880 
881   Status s =
882       version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
883 
884   CleanupSuperVersion(super_version);
885   return s;
886 }
887 
ScheduleBgLogWriterClose(JobContext * job_context)888 void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
889   if (!job_context->logs_to_free.empty()) {
890     for (auto l : job_context->logs_to_free) {
891       AddToLogsToFreeQueue(l);
892     }
893     job_context->logs_to_free.clear();
894   }
895 }
896 
GetDataDir(ColumnFamilyData * cfd,size_t path_id) const897 FSDirectory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
898   assert(cfd);
899   FSDirectory* ret_dir = cfd->GetDataDir(path_id);
900   if (ret_dir == nullptr) {
901     return directories_.GetDataDir(path_id);
902   }
903   return ret_dir;
904 }
905 
SetOptions(ColumnFamilyHandle * column_family,const std::unordered_map<std::string,std::string> & options_map)906 Status DBImpl::SetOptions(
907     ColumnFamilyHandle* column_family,
908     const std::unordered_map<std::string, std::string>& options_map) {
909 #ifdef ROCKSDB_LITE
910   (void)column_family;
911   (void)options_map;
912   return Status::NotSupported("Not supported in ROCKSDB LITE");
913 #else
914   auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
915   if (options_map.empty()) {
916     ROCKS_LOG_WARN(immutable_db_options_.info_log,
917                    "SetOptions() on column family [%s], empty input",
918                    cfd->GetName().c_str());
919     return Status::InvalidArgument("empty input");
920   }
921 
922   MutableCFOptions new_options;
923   Status s;
924   Status persist_options_status;
925   SuperVersionContext sv_context(/* create_superversion */ true);
926   {
927     auto db_options = GetDBOptions();
928     InstrumentedMutexLock l(&mutex_);
929     s = cfd->SetOptions(db_options, options_map);
930     if (s.ok()) {
931       new_options = *cfd->GetLatestMutableCFOptions();
932       // Append new version to recompute compaction score.
933       VersionEdit dummy_edit;
934       versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
935                              directories_.GetDbDir());
936       // Trigger possible flush/compactions. This has to be before we persist
937       // options to file, otherwise there will be a deadlock with writer
938       // thread.
939       InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
940 
941       persist_options_status = WriteOptionsFile(
942           false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
943       bg_cv_.SignalAll();
944     }
945   }
946   sv_context.Clean();
947 
948   ROCKS_LOG_INFO(
949       immutable_db_options_.info_log,
950       "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
951   for (const auto& o : options_map) {
952     ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
953                    o.second.c_str());
954   }
955   if (s.ok()) {
956     ROCKS_LOG_INFO(immutable_db_options_.info_log,
957                    "[%s] SetOptions() succeeded", cfd->GetName().c_str());
958     new_options.Dump(immutable_db_options_.info_log.get());
959     if (!persist_options_status.ok()) {
960       s = persist_options_status;
961     }
962   } else {
963     ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
964                    cfd->GetName().c_str());
965   }
966   LogFlush(immutable_db_options_.info_log);
967   return s;
968 #endif  // ROCKSDB_LITE
969 }
970 
SetDBOptions(const std::unordered_map<std::string,std::string> & options_map)971 Status DBImpl::SetDBOptions(
972     const std::unordered_map<std::string, std::string>& options_map) {
973 #ifdef ROCKSDB_LITE
974   (void)options_map;
975   return Status::NotSupported("Not supported in ROCKSDB LITE");
976 #else
977   if (options_map.empty()) {
978     ROCKS_LOG_WARN(immutable_db_options_.info_log,
979                    "SetDBOptions(), empty input.");
980     return Status::InvalidArgument("empty input");
981   }
982 
983   MutableDBOptions new_options;
984   Status s;
985   Status persist_options_status;
986   bool wal_changed = false;
987   WriteContext write_context;
988   {
989     InstrumentedMutexLock l(&mutex_);
990     s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
991                                        &new_options);
992     if (new_options.bytes_per_sync == 0) {
993       new_options.bytes_per_sync = 1024 * 1024;
994     }
995     DBOptions new_db_options =
996         BuildDBOptions(immutable_db_options_, new_options);
997     if (s.ok()) {
998       s = ValidateOptions(new_db_options);
999     }
1000     if (s.ok()) {
1001       for (auto c : *versions_->GetColumnFamilySet()) {
1002         if (!c->IsDropped()) {
1003           auto cf_options = c->GetLatestCFOptions();
1004           s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
1005           if (!s.ok()) {
1006             break;
1007           }
1008         }
1009       }
1010     }
1011     if (s.ok()) {
1012       const BGJobLimits current_bg_job_limits =
1013           GetBGJobLimits(immutable_db_options_.max_background_flushes,
1014                          mutable_db_options_.max_background_compactions,
1015                          mutable_db_options_.max_background_jobs,
1016                          /* parallelize_compactions */ true);
1017       const BGJobLimits new_bg_job_limits = GetBGJobLimits(
1018           immutable_db_options_.max_background_flushes,
1019           new_options.max_background_compactions,
1020           new_options.max_background_jobs, /* parallelize_compactions */ true);
1021 
1022       const bool max_flushes_increased =
1023           new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
1024       const bool max_compactions_increased =
1025           new_bg_job_limits.max_compactions >
1026           current_bg_job_limits.max_compactions;
1027 
1028       if (max_flushes_increased || max_compactions_increased) {
1029         if (max_flushes_increased) {
1030           env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
1031                                              Env::Priority::HIGH);
1032         }
1033 
1034         if (max_compactions_increased) {
1035           env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
1036                                              Env::Priority::LOW);
1037         }
1038 
1039         MaybeScheduleFlushOrCompaction();
1040       }
1041 
1042       if (new_options.stats_dump_period_sec !=
1043           mutable_db_options_.stats_dump_period_sec) {
1044         if (thread_dump_stats_) {
1045           mutex_.Unlock();
1046           thread_dump_stats_->cancel();
1047           mutex_.Lock();
1048         }
1049         if (new_options.stats_dump_period_sec > 0) {
1050           thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
1051               [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
1052               static_cast<uint64_t>(new_options.stats_dump_period_sec) *
1053                   kMicrosInSecond));
1054         } else {
1055           thread_dump_stats_.reset();
1056         }
1057       }
1058       if (new_options.stats_persist_period_sec !=
1059           mutable_db_options_.stats_persist_period_sec) {
1060         if (thread_persist_stats_) {
1061           mutex_.Unlock();
1062           thread_persist_stats_->cancel();
1063           mutex_.Lock();
1064         }
1065         if (new_options.stats_persist_period_sec > 0) {
1066           thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
1067               [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
1068               static_cast<uint64_t>(new_options.stats_persist_period_sec) *
1069                   kMicrosInSecond));
1070         } else {
1071           thread_persist_stats_.reset();
1072         }
1073       }
1074       write_controller_.set_max_delayed_write_rate(
1075           new_options.delayed_write_rate);
1076       table_cache_.get()->SetCapacity(new_options.max_open_files == -1
1077                                           ? TableCache::kInfiniteCapacity
1078                                           : new_options.max_open_files - 10);
1079       wal_changed = mutable_db_options_.wal_bytes_per_sync !=
1080                     new_options.wal_bytes_per_sync;
1081       mutable_db_options_ = new_options;
1082       file_options_for_compaction_ = FileOptions(new_db_options);
1083       file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
1084           file_options_for_compaction_, immutable_db_options_);
1085       versions_->ChangeFileOptions(mutable_db_options_);
1086       //TODO(xiez): clarify why apply optimize for read to write options
1087       file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
1088           file_options_for_compaction_, immutable_db_options_);
1089       file_options_for_compaction_.compaction_readahead_size =
1090           mutable_db_options_.compaction_readahead_size;
1091       WriteThread::Writer w;
1092       write_thread_.EnterUnbatched(&w, &mutex_);
1093       if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
1094         Status purge_wal_status = SwitchWAL(&write_context);
1095         if (!purge_wal_status.ok()) {
1096           ROCKS_LOG_WARN(immutable_db_options_.info_log,
1097                          "Unable to purge WAL files in SetDBOptions() -- %s",
1098                          purge_wal_status.ToString().c_str());
1099         }
1100       }
1101       persist_options_status = WriteOptionsFile(
1102           false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1103       write_thread_.ExitUnbatched(&w);
1104     }
1105   }
1106   ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
1107   for (const auto& o : options_map) {
1108     ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1109                    o.second.c_str());
1110   }
1111   if (s.ok()) {
1112     ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
1113     new_options.Dump(immutable_db_options_.info_log.get());
1114     if (!persist_options_status.ok()) {
1115       if (immutable_db_options_.fail_if_options_file_error) {
1116         s = Status::IOError(
1117             "SetDBOptions() succeeded, but unable to persist options",
1118             persist_options_status.ToString());
1119       }
1120       ROCKS_LOG_WARN(immutable_db_options_.info_log,
1121                      "Unable to persist options in SetDBOptions() -- %s",
1122                      persist_options_status.ToString().c_str());
1123     }
1124   } else {
1125     ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
1126   }
1127   LogFlush(immutable_db_options_.info_log);
1128   return s;
1129 #endif  // ROCKSDB_LITE
1130 }
1131 
1132 // return the same level if it cannot be moved
FindMinimumEmptyLevelFitting(ColumnFamilyData * cfd,const MutableCFOptions &,int level)1133 int DBImpl::FindMinimumEmptyLevelFitting(
1134     ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
1135     int level) {
1136   mutex_.AssertHeld();
1137   const auto* vstorage = cfd->current()->storage_info();
1138   int minimum_level = level;
1139   for (int i = level - 1; i > 0; --i) {
1140     // stop if level i is not empty
1141     if (vstorage->NumLevelFiles(i) > 0) break;
1142     // stop if level i is too small (cannot fit the level files)
1143     if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
1144       break;
1145     }
1146 
1147     minimum_level = i;
1148   }
1149   return minimum_level;
1150 }
1151 
FlushWAL(bool sync)1152 Status DBImpl::FlushWAL(bool sync) {
1153   if (manual_wal_flush_) {
1154     IOStatus io_s;
1155     {
1156       // We need to lock log_write_mutex_ since logs_ might change concurrently
1157       InstrumentedMutexLock wl(&log_write_mutex_);
1158       log::Writer* cur_log_writer = logs_.back().writer;
1159       io_s = cur_log_writer->WriteBuffer();
1160     }
1161     if (!io_s.ok()) {
1162       ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1163                       io_s.ToString().c_str());
1164       // In case there is a fs error we should set it globally to prevent the
1165       // future writes
1166       IOStatusCheck(io_s);
1167       // whether sync or not, we should abort the rest of function upon error
1168       return std::move(io_s);
1169     }
1170     if (!sync) {
1171       ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
1172       return std::move(io_s);
1173     }
1174   }
1175   if (!sync) {
1176     return Status::OK();
1177   }
1178   // sync = true
1179   ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
1180   return SyncWAL();
1181 }
1182 
SyncWAL()1183 Status DBImpl::SyncWAL() {
1184   autovector<log::Writer*, 1> logs_to_sync;
1185   bool need_log_dir_sync;
1186   uint64_t current_log_number;
1187 
1188   {
1189     InstrumentedMutexLock l(&mutex_);
1190     assert(!logs_.empty());
1191 
1192     // This SyncWAL() call only cares about logs up to this number.
1193     current_log_number = logfile_number_;
1194 
1195     while (logs_.front().number <= current_log_number &&
1196            logs_.front().getting_synced) {
1197       log_sync_cv_.Wait();
1198     }
1199     // First check that logs are safe to sync in background.
1200     for (auto it = logs_.begin();
1201          it != logs_.end() && it->number <= current_log_number; ++it) {
1202       if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
1203         return Status::NotSupported(
1204             "SyncWAL() is not supported for this implementation of WAL file",
1205             immutable_db_options_.allow_mmap_writes
1206                 ? "try setting Options::allow_mmap_writes to false"
1207                 : Slice());
1208       }
1209     }
1210     for (auto it = logs_.begin();
1211          it != logs_.end() && it->number <= current_log_number; ++it) {
1212       auto& log = *it;
1213       assert(!log.getting_synced);
1214       log.getting_synced = true;
1215       logs_to_sync.push_back(log.writer);
1216     }
1217 
1218     need_log_dir_sync = !log_dir_synced_;
1219   }
1220 
1221   TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
1222   RecordTick(stats_, WAL_FILE_SYNCED);
1223   Status status;
1224   IOStatus io_s;
1225   for (log::Writer* log : logs_to_sync) {
1226     io_s = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
1227     if (!io_s.ok()) {
1228       status = io_s;
1229       break;
1230     }
1231   }
1232   if (!io_s.ok()) {
1233     ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL Sync error %s",
1234                     io_s.ToString().c_str());
1235     // In case there is a fs error we should set it globally to prevent the
1236     // future writes
1237     IOStatusCheck(io_s);
1238   }
1239   if (status.ok() && need_log_dir_sync) {
1240     status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
1241   }
1242   TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
1243 
1244   TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1245   {
1246     InstrumentedMutexLock l(&mutex_);
1247     MarkLogsSynced(current_log_number, need_log_dir_sync, status);
1248   }
1249   TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
1250 
1251   return status;
1252 }
1253 
LockWAL()1254 Status DBImpl::LockWAL() {
1255   log_write_mutex_.Lock();
1256   auto cur_log_writer = logs_.back().writer;
1257   auto status = cur_log_writer->WriteBuffer();
1258   if (!status.ok()) {
1259     ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1260                     status.ToString().c_str());
1261     // In case there is a fs error we should set it globally to prevent the
1262     // future writes
1263     WriteStatusCheck(status);
1264   }
1265   return std::move(status);
1266 }
1267 
UnlockWAL()1268 Status DBImpl::UnlockWAL() {
1269   log_write_mutex_.Unlock();
1270   return Status::OK();
1271 }
1272 
MarkLogsSynced(uint64_t up_to,bool synced_dir,const Status & status)1273 void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
1274                             const Status& status) {
1275   mutex_.AssertHeld();
1276   if (synced_dir && logfile_number_ == up_to && status.ok()) {
1277     log_dir_synced_ = true;
1278   }
1279   for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
1280     auto& log = *it;
1281     assert(log.getting_synced);
1282     if (status.ok() && logs_.size() > 1) {
1283       logs_to_free_.push_back(log.ReleaseWriter());
1284       // To modify logs_ both mutex_ and log_write_mutex_ must be held
1285       InstrumentedMutexLock l(&log_write_mutex_);
1286       it = logs_.erase(it);
1287     } else {
1288       log.getting_synced = false;
1289       ++it;
1290     }
1291   }
1292   assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
1293          (logs_.size() == 1 && !logs_[0].getting_synced));
1294   log_sync_cv_.SignalAll();
1295 }
1296 
GetLatestSequenceNumber() const1297 SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1298   return versions_->LastSequence();
1299 }
1300 
SetLastPublishedSequence(SequenceNumber seq)1301 void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
1302   versions_->SetLastPublishedSequence(seq);
1303 }
1304 
SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)1305 bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
1306   if (seqnum > preserve_deletes_seqnum_.load()) {
1307     preserve_deletes_seqnum_.store(seqnum);
1308     return true;
1309   } else {
1310     return false;
1311   }
1312 }
1313 
NewInternalIterator(Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence,ColumnFamilyHandle * column_family)1314 InternalIterator* DBImpl::NewInternalIterator(
1315     Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
1316     ColumnFamilyHandle* column_family) {
1317   ColumnFamilyData* cfd;
1318   if (column_family == nullptr) {
1319     cfd = default_cf_handle_->cfd();
1320   } else {
1321     auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1322     cfd = cfh->cfd();
1323   }
1324 
1325   mutex_.Lock();
1326   SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
1327   mutex_.Unlock();
1328   ReadOptions roptions;
1329   return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
1330                              sequence);
1331 }
1332 
SchedulePurge()1333 void DBImpl::SchedulePurge() {
1334   mutex_.AssertHeld();
1335   assert(opened_successfully_);
1336 
1337   // Purge operations are put into High priority queue
1338   bg_purge_scheduled_++;
1339   env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
1340 }
1341 
BackgroundCallPurge()1342 void DBImpl::BackgroundCallPurge() {
1343   mutex_.Lock();
1344 
1345   while (!logs_to_free_queue_.empty()) {
1346     assert(!logs_to_free_queue_.empty());
1347     log::Writer* log_writer = *(logs_to_free_queue_.begin());
1348     logs_to_free_queue_.pop_front();
1349     mutex_.Unlock();
1350     delete log_writer;
1351     mutex_.Lock();
1352   }
1353   while (!superversions_to_free_queue_.empty()) {
1354     assert(!superversions_to_free_queue_.empty());
1355     SuperVersion* sv = superversions_to_free_queue_.front();
1356     superversions_to_free_queue_.pop_front();
1357     mutex_.Unlock();
1358     delete sv;
1359     mutex_.Lock();
1360   }
1361 
1362   // Can't use iterator to go over purge_files_ because inside the loop we're
1363   // unlocking the mutex that protects purge_files_.
1364   while (!purge_files_.empty()) {
1365     auto it = purge_files_.begin();
1366     // Need to make a copy of the PurgeFilesInfo before unlocking the mutex.
1367     PurgeFileInfo purge_file = it->second;
1368 
1369     const std::string& fname = purge_file.fname;
1370     const std::string& dir_to_sync = purge_file.dir_to_sync;
1371     FileType type = purge_file.type;
1372     uint64_t number = purge_file.number;
1373     int job_id = purge_file.job_id;
1374 
1375     purge_files_.erase(it);
1376 
1377     mutex_.Unlock();
1378     DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
1379     mutex_.Lock();
1380   }
1381 
1382   bg_purge_scheduled_--;
1383 
1384   bg_cv_.SignalAll();
1385   // IMPORTANT:there should be no code after calling SignalAll. This call may
1386   // signal the DB destructor that it's OK to proceed with destruction. In
1387   // that case, all DB variables will be dealloacated and referencing them
1388   // will cause trouble.
1389   mutex_.Unlock();
1390 }
1391 
1392 namespace {
1393 struct IterState {
IterStateROCKSDB_NAMESPACE::__anond311f2020411::IterState1394   IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
1395             bool _background_purge)
1396       : db(_db),
1397         mu(_mu),
1398         super_version(_super_version),
1399         background_purge(_background_purge) {}
1400 
1401   DBImpl* db;
1402   InstrumentedMutex* mu;
1403   SuperVersion* super_version;
1404   bool background_purge;
1405 };
1406 
CleanupIteratorState(void * arg1,void *)1407 static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
1408   IterState* state = reinterpret_cast<IterState*>(arg1);
1409 
1410   if (state->super_version->Unref()) {
1411     // Job id == 0 means that this is not our background process, but rather
1412     // user thread
1413     JobContext job_context(0);
1414 
1415     state->mu->Lock();
1416     state->super_version->Cleanup();
1417     state->db->FindObsoleteFiles(&job_context, false, true);
1418     if (state->background_purge) {
1419       state->db->ScheduleBgLogWriterClose(&job_context);
1420       state->db->AddSuperVersionsToFreeQueue(state->super_version);
1421       state->db->SchedulePurge();
1422     }
1423     state->mu->Unlock();
1424 
1425     if (!state->background_purge) {
1426       delete state->super_version;
1427     }
1428     if (job_context.HaveSomethingToDelete()) {
1429       if (state->background_purge) {
1430         // PurgeObsoleteFiles here does not delete files. Instead, it adds the
1431         // files to be deleted to a job queue, and deletes it in a separate
1432         // background thread.
1433         state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
1434         state->mu->Lock();
1435         state->db->SchedulePurge();
1436         state->mu->Unlock();
1437       } else {
1438         state->db->PurgeObsoleteFiles(job_context);
1439       }
1440     }
1441     job_context.Clean();
1442   }
1443 
1444   delete state;
1445 }
1446 }  // namespace
1447 
NewInternalIterator(const ReadOptions & read_options,ColumnFamilyData * cfd,SuperVersion * super_version,Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence)1448 InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1449                                               ColumnFamilyData* cfd,
1450                                               SuperVersion* super_version,
1451                                               Arena* arena,
1452                                               RangeDelAggregator* range_del_agg,
1453                                               SequenceNumber sequence) {
1454   InternalIterator* internal_iter;
1455   assert(arena != nullptr);
1456   assert(range_del_agg != nullptr);
1457   // Need to create internal iterator from the arena.
1458   MergeIteratorBuilder merge_iter_builder(
1459       &cfd->internal_comparator(), arena,
1460       !read_options.total_order_seek &&
1461           super_version->mutable_cf_options.prefix_extractor != nullptr);
1462   // Collect iterator for mutable mem
1463   merge_iter_builder.AddIterator(
1464       super_version->mem->NewIterator(read_options, arena));
1465   std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
1466   Status s;
1467   if (!read_options.ignore_range_deletions) {
1468     range_del_iter.reset(
1469         super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
1470     range_del_agg->AddTombstones(std::move(range_del_iter));
1471   }
1472   // Collect all needed child iterators for immutable memtables
1473   if (s.ok()) {
1474     super_version->imm->AddIterators(read_options, &merge_iter_builder);
1475     if (!read_options.ignore_range_deletions) {
1476       s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
1477                                                          range_del_agg);
1478     }
1479   }
1480   TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
1481   if (s.ok()) {
1482     // Collect iterators for files in L0 - Ln
1483     if (read_options.read_tier != kMemtableTier) {
1484       super_version->current->AddIterators(read_options, file_options_,
1485                                            &merge_iter_builder, range_del_agg);
1486     }
1487     internal_iter = merge_iter_builder.Finish();
1488     IterState* cleanup =
1489         new IterState(this, &mutex_, super_version,
1490                       read_options.background_purge_on_iterator_cleanup ||
1491                       immutable_db_options_.avoid_unnecessary_blocking_io);
1492     internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1493 
1494     return internal_iter;
1495   } else {
1496     CleanupSuperVersion(super_version);
1497   }
1498   return NewErrorInternalIterator<Slice>(s, arena);
1499 }
1500 
DefaultColumnFamily() const1501 ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
1502   return default_cf_handle_;
1503 }
1504 
PersistentStatsColumnFamily() const1505 ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
1506   return persist_stats_cf_handle_;
1507 }
1508 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1509 Status DBImpl::Get(const ReadOptions& read_options,
1510                    ColumnFamilyHandle* column_family, const Slice& key,
1511                    PinnableSlice* value) {
1512   return Get(read_options, column_family, key, value, /*timestamp=*/nullptr);
1513 }
1514 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,std::string * timestamp)1515 Status DBImpl::Get(const ReadOptions& read_options,
1516                    ColumnFamilyHandle* column_family, const Slice& key,
1517                    PinnableSlice* value, std::string* timestamp) {
1518   GetImplOptions get_impl_options;
1519   get_impl_options.column_family = column_family;
1520   get_impl_options.value = value;
1521   get_impl_options.timestamp = timestamp;
1522   Status s = GetImpl(read_options, key, get_impl_options);
1523   return s;
1524 }
1525 
GetImpl(const ReadOptions & read_options,const Slice & key,GetImplOptions & get_impl_options)1526 Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
1527                        GetImplOptions& get_impl_options) {
1528   assert(get_impl_options.value != nullptr ||
1529          get_impl_options.merge_operands != nullptr);
1530   PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
1531   StopWatch sw(env_, stats_, DB_GET);
1532   PERF_TIMER_GUARD(get_snapshot_time);
1533 
1534   auto cfh =
1535       reinterpret_cast<ColumnFamilyHandleImpl*>(get_impl_options.column_family);
1536   auto cfd = cfh->cfd();
1537 
1538   if (tracer_) {
1539     // TODO: This mutex should be removed later, to improve performance when
1540     // tracing is enabled.
1541     InstrumentedMutexLock lock(&trace_mutex_);
1542     if (tracer_) {
1543       tracer_->Get(get_impl_options.column_family, key);
1544     }
1545   }
1546 
1547   // Acquire SuperVersion
1548   SuperVersion* sv = GetAndRefSuperVersion(cfd);
1549 
1550   TEST_SYNC_POINT("DBImpl::GetImpl:1");
1551   TEST_SYNC_POINT("DBImpl::GetImpl:2");
1552 
1553   SequenceNumber snapshot;
1554   if (read_options.snapshot != nullptr) {
1555     if (get_impl_options.callback) {
1556       // Already calculated based on read_options.snapshot
1557       snapshot = get_impl_options.callback->max_visible_seq();
1558     } else {
1559       snapshot =
1560           reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1561     }
1562   } else {
1563     // Note that the snapshot is assigned AFTER referencing the super
1564     // version because otherwise a flush happening in between may compact away
1565     // data for the snapshot, so the reader would see neither data that was be
1566     // visible to the snapshot before compaction nor the newer data inserted
1567     // afterwards.
1568     snapshot = last_seq_same_as_publish_seq_
1569                    ? versions_->LastSequence()
1570                    : versions_->LastPublishedSequence();
1571     if (get_impl_options.callback) {
1572       // The unprep_seqs are not published for write unprepared, so it could be
1573       // that max_visible_seq is larger. Seek to the std::max of the two.
1574       // However, we still want our callback to contain the actual snapshot so
1575       // that it can do the correct visibility filtering.
1576       get_impl_options.callback->Refresh(snapshot);
1577 
1578       // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
1579       // max_visible_seq = max(max_visible_seq, snapshot)
1580       //
1581       // Currently, the commented out assert is broken by
1582       // InvalidSnapshotReadCallback, but if write unprepared recovery followed
1583       // the regular transaction flow, then this special read callback would not
1584       // be needed.
1585       //
1586       // assert(callback->max_visible_seq() >= snapshot);
1587       snapshot = get_impl_options.callback->max_visible_seq();
1588     }
1589   }
1590   TEST_SYNC_POINT("DBImpl::GetImpl:3");
1591   TEST_SYNC_POINT("DBImpl::GetImpl:4");
1592 
1593   // Prepare to store a list of merge operations if merge occurs.
1594   MergeContext merge_context;
1595   SequenceNumber max_covering_tombstone_seq = 0;
1596 
1597   Status s;
1598   // First look in the memtable, then in the immutable memtable (if any).
1599   // s is both in/out. When in, s could either be OK or MergeInProgress.
1600   // merge_operands will contain the sequence of merges in the latter case.
1601   LookupKey lkey(key, snapshot, read_options.timestamp);
1602   PERF_TIMER_STOP(get_snapshot_time);
1603 
1604   bool skip_memtable = (read_options.read_tier == kPersistedTier &&
1605                         has_unpersisted_data_.load(std::memory_order_relaxed));
1606   bool done = false;
1607   const Comparator* comparator =
1608       get_impl_options.column_family->GetComparator();
1609   size_t ts_sz = comparator->timestamp_size();
1610   std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr;
1611   if (!skip_memtable) {
1612     // Get value associated with key
1613     if (get_impl_options.get_value) {
1614       if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), timestamp, &s,
1615                        &merge_context, &max_covering_tombstone_seq,
1616                        read_options, get_impl_options.callback,
1617                        get_impl_options.is_blob_index)) {
1618         done = true;
1619         get_impl_options.value->PinSelf();
1620         RecordTick(stats_, MEMTABLE_HIT);
1621       } else if ((s.ok() || s.IsMergeInProgress()) &&
1622                  sv->imm->Get(lkey, get_impl_options.value->GetSelf(),
1623                               timestamp, &s, &merge_context,
1624                               &max_covering_tombstone_seq, read_options,
1625                               get_impl_options.callback,
1626                               get_impl_options.is_blob_index)) {
1627         done = true;
1628         get_impl_options.value->PinSelf();
1629         RecordTick(stats_, MEMTABLE_HIT);
1630       }
1631     } else {
1632       // Get Merge Operands associated with key, Merge Operands should not be
1633       // merged and raw values should be returned to the user.
1634       if (sv->mem->Get(lkey, /*value*/ nullptr, /*timestamp=*/nullptr, &s,
1635                        &merge_context, &max_covering_tombstone_seq,
1636                        read_options, nullptr, nullptr, false)) {
1637         done = true;
1638         RecordTick(stats_, MEMTABLE_HIT);
1639       } else if ((s.ok() || s.IsMergeInProgress()) &&
1640                  sv->imm->GetMergeOperands(lkey, &s, &merge_context,
1641                                            &max_covering_tombstone_seq,
1642                                            read_options)) {
1643         done = true;
1644         RecordTick(stats_, MEMTABLE_HIT);
1645       }
1646     }
1647     if (!done && !s.ok() && !s.IsMergeInProgress()) {
1648       ReturnAndCleanupSuperVersion(cfd, sv);
1649       return s;
1650     }
1651   }
1652   if (!done) {
1653     PERF_TIMER_GUARD(get_from_output_files_time);
1654     sv->current->Get(
1655         read_options, lkey, get_impl_options.value, timestamp, &s,
1656         &merge_context, &max_covering_tombstone_seq,
1657         get_impl_options.get_value ? get_impl_options.value_found : nullptr,
1658         nullptr, nullptr,
1659         get_impl_options.get_value ? get_impl_options.callback : nullptr,
1660         get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
1661         get_impl_options.get_value);
1662     RecordTick(stats_, MEMTABLE_MISS);
1663   }
1664 
1665   {
1666     PERF_TIMER_GUARD(get_post_process_time);
1667 
1668     ReturnAndCleanupSuperVersion(cfd, sv);
1669 
1670     RecordTick(stats_, NUMBER_KEYS_READ);
1671     size_t size = 0;
1672     if (s.ok()) {
1673       if (get_impl_options.get_value) {
1674         size = get_impl_options.value->size();
1675       } else {
1676         // Return all merge operands for get_impl_options.key
1677         *get_impl_options.number_of_operands =
1678             static_cast<int>(merge_context.GetNumOperands());
1679         if (*get_impl_options.number_of_operands >
1680             get_impl_options.get_merge_operands_options
1681                 ->expected_max_number_of_operands) {
1682           s = Status::Incomplete(
1683               Status::SubCode::KMergeOperandsInsufficientCapacity);
1684         } else {
1685           for (const Slice& sl : merge_context.GetOperands()) {
1686             size += sl.size();
1687             get_impl_options.merge_operands->PinSelf(sl);
1688             get_impl_options.merge_operands++;
1689           }
1690         }
1691       }
1692       RecordTick(stats_, BYTES_READ, size);
1693       PERF_COUNTER_ADD(get_read_bytes, size);
1694     }
1695     RecordInHistogram(stats_, BYTES_PER_READ, size);
1696   }
1697   return s;
1698 }
1699 
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)1700 std::vector<Status> DBImpl::MultiGet(
1701     const ReadOptions& read_options,
1702     const std::vector<ColumnFamilyHandle*>& column_family,
1703     const std::vector<Slice>& keys, std::vector<std::string>* values) {
1704   return MultiGet(read_options, column_family, keys, values,
1705                   /*timestamps*/ nullptr);
1706 }
1707 
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values,std::vector<std::string> * timestamps)1708 std::vector<Status> DBImpl::MultiGet(
1709     const ReadOptions& read_options,
1710     const std::vector<ColumnFamilyHandle*>& column_family,
1711     const std::vector<Slice>& keys, std::vector<std::string>* values,
1712     std::vector<std::string>* timestamps) {
1713   PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
1714   StopWatch sw(env_, stats_, DB_MULTIGET);
1715   PERF_TIMER_GUARD(get_snapshot_time);
1716 
1717   SequenceNumber consistent_seqnum;
1718 
1719   std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
1720       column_family.size());
1721   for (auto cf : column_family) {
1722     auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
1723     auto cfd = cfh->cfd();
1724     if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
1725       multiget_cf_data.emplace(cfd->GetID(),
1726                                MultiGetColumnFamilyData(cfh, nullptr));
1727     }
1728   }
1729 
1730   std::function<MultiGetColumnFamilyData*(
1731       std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&)>
1732       iter_deref_lambda =
1733           [](std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&
1734                  cf_iter) { return &cf_iter->second; };
1735 
1736   bool unref_only =
1737       MultiCFSnapshot<std::unordered_map<uint32_t, MultiGetColumnFamilyData>>(
1738           read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
1739           &consistent_seqnum);
1740 
1741   // Contain a list of merge operations if merge occurs.
1742   MergeContext merge_context;
1743 
1744   // Note: this always resizes the values array
1745   size_t num_keys = keys.size();
1746   std::vector<Status> stat_list(num_keys);
1747   values->resize(num_keys);
1748   if (timestamps) {
1749     timestamps->resize(num_keys);
1750   }
1751 
1752   // Keep track of bytes that we read for statistics-recording later
1753   uint64_t bytes_read = 0;
1754   PERF_TIMER_STOP(get_snapshot_time);
1755 
1756   // For each of the given keys, apply the entire "get" process as follows:
1757   // First look in the memtable, then in the immutable memtable (if any).
1758   // s is both in/out. When in, s could either be OK or MergeInProgress.
1759   // merge_operands will contain the sequence of merges in the latter case.
1760   size_t num_found = 0;
1761   for (size_t i = 0; i < num_keys; ++i) {
1762     merge_context.Clear();
1763     Status& s = stat_list[i];
1764     std::string* value = &(*values)[i];
1765     std::string* timestamp = timestamps ? &(*timestamps)[i] : nullptr;
1766 
1767     LookupKey lkey(keys[i], consistent_seqnum, read_options.timestamp);
1768     auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
1769     SequenceNumber max_covering_tombstone_seq = 0;
1770     auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
1771     assert(mgd_iter != multiget_cf_data.end());
1772     auto mgd = mgd_iter->second;
1773     auto super_version = mgd.super_version;
1774     bool skip_memtable =
1775         (read_options.read_tier == kPersistedTier &&
1776          has_unpersisted_data_.load(std::memory_order_relaxed));
1777     bool done = false;
1778     if (!skip_memtable) {
1779       if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
1780                                   &max_covering_tombstone_seq, read_options)) {
1781         done = true;
1782         RecordTick(stats_, MEMTABLE_HIT);
1783       } else if (super_version->imm->Get(
1784                      lkey, value, timestamp, &s, &merge_context,
1785                      &max_covering_tombstone_seq, read_options)) {
1786         done = true;
1787         RecordTick(stats_, MEMTABLE_HIT);
1788       }
1789     }
1790     if (!done) {
1791       PinnableSlice pinnable_val;
1792       PERF_TIMER_GUARD(get_from_output_files_time);
1793       super_version->current->Get(read_options, lkey, &pinnable_val, timestamp,
1794                                   &s, &merge_context,
1795                                   &max_covering_tombstone_seq);
1796       value->assign(pinnable_val.data(), pinnable_val.size());
1797       RecordTick(stats_, MEMTABLE_MISS);
1798     }
1799 
1800     if (s.ok()) {
1801       bytes_read += value->size();
1802       num_found++;
1803     }
1804   }
1805 
1806   // Post processing (decrement reference counts and record statistics)
1807   PERF_TIMER_GUARD(get_post_process_time);
1808   autovector<SuperVersion*> superversions_to_delete;
1809 
1810   for (auto mgd_iter : multiget_cf_data) {
1811     auto mgd = mgd_iter.second;
1812     if (!unref_only) {
1813       ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
1814     } else {
1815       mgd.cfd->GetSuperVersion()->Unref();
1816     }
1817   }
1818   RecordTick(stats_, NUMBER_MULTIGET_CALLS);
1819   RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
1820   RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
1821   RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
1822   RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
1823   PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
1824   PERF_TIMER_STOP(get_post_process_time);
1825 
1826   return stat_list;
1827 }
1828 
1829 template <class T>
MultiCFSnapshot(const ReadOptions & read_options,ReadCallback * callback,std::function<MultiGetColumnFamilyData * (typename T::iterator &)> & iter_deref_func,T * cf_list,SequenceNumber * snapshot)1830 bool DBImpl::MultiCFSnapshot(
1831     const ReadOptions& read_options, ReadCallback* callback,
1832     std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
1833         iter_deref_func,
1834     T* cf_list, SequenceNumber* snapshot) {
1835   PERF_TIMER_GUARD(get_snapshot_time);
1836 
1837   bool last_try = false;
1838   if (cf_list->size() == 1) {
1839     // Fast path for a single column family. We can simply get the thread loca
1840     // super version
1841     auto cf_iter = cf_list->begin();
1842     auto node = iter_deref_func(cf_iter);
1843     node->super_version = GetAndRefSuperVersion(node->cfd);
1844     if (read_options.snapshot != nullptr) {
1845       // Note: In WritePrepared txns this is not necessary but not harmful
1846       // either.  Because prep_seq > snapshot => commit_seq > snapshot so if
1847       // a snapshot is specified we should be fine with skipping seq numbers
1848       // that are greater than that.
1849       //
1850       // In WriteUnprepared, we cannot set snapshot in the lookup key because we
1851       // may skip uncommitted data that should be visible to the transaction for
1852       // reading own writes.
1853       *snapshot =
1854           static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1855       if (callback) {
1856         *snapshot = std::max(*snapshot, callback->max_visible_seq());
1857       }
1858     } else {
1859       // Since we get and reference the super version before getting
1860       // the snapshot number, without a mutex protection, it is possible
1861       // that a memtable switch happened in the middle and not all the
1862       // data for this snapshot is available. But it will contain all
1863       // the data available in the super version we have, which is also
1864       // a valid snapshot to read from.
1865       // We shouldn't get snapshot before finding and referencing the super
1866       // version because a flush happening in between may compact away data for
1867       // the snapshot, but the snapshot is earlier than the data overwriting it,
1868       // so users may see wrong results.
1869       *snapshot = last_seq_same_as_publish_seq_
1870                       ? versions_->LastSequence()
1871                       : versions_->LastPublishedSequence();
1872     }
1873   } else {
1874     // If we end up with the same issue of memtable geting sealed during 2
1875     // consecutive retries, it means the write rate is very high. In that case
1876     // its probably ok to take the mutex on the 3rd try so we can succeed for
1877     // sure
1878     static const int num_retries = 3;
1879     for (int i = 0; i < num_retries; ++i) {
1880       last_try = (i == num_retries - 1);
1881       bool retry = false;
1882 
1883       if (i > 0) {
1884         for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
1885              ++cf_iter) {
1886           auto node = iter_deref_func(cf_iter);
1887           SuperVersion* super_version = node->super_version;
1888           ColumnFamilyData* cfd = node->cfd;
1889           if (super_version != nullptr) {
1890             ReturnAndCleanupSuperVersion(cfd, super_version);
1891           }
1892           node->super_version = nullptr;
1893         }
1894       }
1895       if (read_options.snapshot == nullptr) {
1896         if (last_try) {
1897           TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
1898           // We're close to max number of retries. For the last retry,
1899           // acquire the lock so we're sure to succeed
1900           mutex_.Lock();
1901         }
1902         *snapshot = last_seq_same_as_publish_seq_
1903                         ? versions_->LastSequence()
1904                         : versions_->LastPublishedSequence();
1905       } else {
1906         *snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
1907                         ->number_;
1908       }
1909       for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
1910            ++cf_iter) {
1911         auto node = iter_deref_func(cf_iter);
1912         if (!last_try) {
1913           node->super_version = GetAndRefSuperVersion(node->cfd);
1914         } else {
1915           node->super_version = node->cfd->GetSuperVersion()->Ref();
1916         }
1917         TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
1918         if (read_options.snapshot != nullptr || last_try) {
1919           // If user passed a snapshot, then we don't care if a memtable is
1920           // sealed or compaction happens because the snapshot would ensure
1921           // that older key versions are kept around. If this is the last
1922           // retry, then we have the lock so nothing bad can happen
1923           continue;
1924         }
1925         // We could get the earliest sequence number for the whole list of
1926         // memtables, which will include immutable memtables as well, but that
1927         // might be tricky to maintain in case we decide, in future, to do
1928         // memtable compaction.
1929         if (!last_try) {
1930           SequenceNumber seq =
1931               node->super_version->mem->GetEarliestSequenceNumber();
1932           if (seq > *snapshot) {
1933             retry = true;
1934             break;
1935           }
1936         }
1937       }
1938       if (!retry) {
1939         if (last_try) {
1940           mutex_.Unlock();
1941         }
1942         break;
1943       }
1944     }
1945   }
1946 
1947   // Keep track of bytes that we read for statistics-recording later
1948   PERF_TIMER_STOP(get_snapshot_time);
1949 
1950   return last_try;
1951 }
1952 
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)1953 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
1954                       ColumnFamilyHandle** column_families, const Slice* keys,
1955                       PinnableSlice* values, Status* statuses,
1956                       const bool sorted_input) {
1957   return MultiGet(read_options, num_keys, column_families, keys, values,
1958                   /*timestamps*/ nullptr, statuses, sorted_input);
1959 }
1960 
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,std::string * timestamps,Status * statuses,const bool sorted_input)1961 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
1962                       ColumnFamilyHandle** column_families, const Slice* keys,
1963                       PinnableSlice* values, std::string* timestamps,
1964                       Status* statuses, const bool sorted_input) {
1965   if (num_keys == 0) {
1966     return;
1967   }
1968   autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
1969   autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
1970   sorted_keys.resize(num_keys);
1971   for (size_t i = 0; i < num_keys; ++i) {
1972     key_context.emplace_back(column_families[i], keys[i], &values[i],
1973                              &timestamps[i], &statuses[i]);
1974   }
1975   for (size_t i = 0; i < num_keys; ++i) {
1976     sorted_keys[i] = &key_context[i];
1977   }
1978   PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
1979 
1980   autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
1981       multiget_cf_data;
1982   size_t cf_start = 0;
1983   ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
1984   for (size_t i = 0; i < num_keys; ++i) {
1985     KeyContext* key_ctx = sorted_keys[i];
1986     if (key_ctx->column_family != cf) {
1987       multiget_cf_data.emplace_back(
1988           MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
1989       cf_start = i;
1990       cf = key_ctx->column_family;
1991     }
1992   }
1993   {
1994     // multiget_cf_data.emplace_back(
1995     // MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
1996     multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
1997   }
1998   std::function<MultiGetColumnFamilyData*(
1999       autovector<MultiGetColumnFamilyData,
2000                  MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
2001       iter_deref_lambda =
2002           [](autovector<MultiGetColumnFamilyData,
2003                         MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
2004             return &(*cf_iter);
2005           };
2006 
2007   SequenceNumber consistent_seqnum;
2008   bool unref_only = MultiCFSnapshot<
2009       autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
2010       read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
2011       &consistent_seqnum);
2012 
2013   for (auto cf_iter = multiget_cf_data.begin();
2014        cf_iter != multiget_cf_data.end(); ++cf_iter) {
2015     MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys,
2016                  cf_iter->super_version, consistent_seqnum, nullptr, nullptr);
2017     if (!unref_only) {
2018       ReturnAndCleanupSuperVersion(cf_iter->cfd, cf_iter->super_version);
2019     } else {
2020       cf_iter->cfd->GetSuperVersion()->Unref();
2021     }
2022   }
2023 }
2024 
2025 namespace {
2026 // Order keys by CF ID, followed by key contents
2027 struct CompareKeyContext {
operator ()ROCKSDB_NAMESPACE::__anond311f2020711::CompareKeyContext2028   inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
2029     ColumnFamilyHandleImpl* cfh =
2030         static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2031     uint32_t cfd_id1 = cfh->cfd()->GetID();
2032     const Comparator* comparator = cfh->cfd()->user_comparator();
2033     cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2034     uint32_t cfd_id2 = cfh->cfd()->GetID();
2035 
2036     if (cfd_id1 < cfd_id2) {
2037       return true;
2038     } else if (cfd_id1 > cfd_id2) {
2039       return false;
2040     }
2041 
2042     // Both keys are from the same column family
2043     int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
2044     if (cmp < 0) {
2045       return true;
2046     }
2047     return false;
2048   }
2049 };
2050 
2051 }  // anonymous namespace
2052 
PrepareMultiGetKeys(size_t num_keys,bool sorted_input,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2053 void DBImpl::PrepareMultiGetKeys(
2054     size_t num_keys, bool sorted_input,
2055     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2056 #ifndef NDEBUG
2057   if (sorted_input) {
2058     for (size_t index = 0; index < sorted_keys->size(); ++index) {
2059       if (index > 0) {
2060         KeyContext* lhs = (*sorted_keys)[index - 1];
2061         KeyContext* rhs = (*sorted_keys)[index];
2062         ColumnFamilyHandleImpl* cfh =
2063             reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2064         uint32_t cfd_id1 = cfh->cfd()->GetID();
2065         const Comparator* comparator = cfh->cfd()->user_comparator();
2066         cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2067         uint32_t cfd_id2 = cfh->cfd()->GetID();
2068 
2069         assert(cfd_id1 <= cfd_id2);
2070         if (cfd_id1 < cfd_id2) {
2071           continue;
2072         }
2073 
2074         // Both keys are from the same column family
2075         int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
2076         assert(cmp <= 0);
2077       }
2078       index++;
2079     }
2080   }
2081 #endif
2082   if (!sorted_input) {
2083     CompareKeyContext sort_comparator;
2084     std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
2085               sort_comparator);
2086   }
2087 }
2088 
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)2089 void DBImpl::MultiGet(const ReadOptions& read_options,
2090                       ColumnFamilyHandle* column_family, const size_t num_keys,
2091                       const Slice* keys, PinnableSlice* values,
2092                       Status* statuses, const bool sorted_input) {
2093   return MultiGet(read_options, column_family, num_keys, keys, values,
2094                   /*timestamp=*/nullptr, statuses, sorted_input);
2095 }
2096 
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,std::string * timestamps,Status * statuses,const bool sorted_input)2097 void DBImpl::MultiGet(const ReadOptions& read_options,
2098                       ColumnFamilyHandle* column_family, const size_t num_keys,
2099                       const Slice* keys, PinnableSlice* values,
2100                       std::string* timestamps, Status* statuses,
2101                       const bool sorted_input) {
2102   autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2103   autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2104   sorted_keys.resize(num_keys);
2105   for (size_t i = 0; i < num_keys; ++i) {
2106     key_context.emplace_back(column_family, keys[i], &values[i],
2107                              timestamps ? &timestamps[i] : nullptr,
2108                              &statuses[i]);
2109   }
2110   for (size_t i = 0; i < num_keys; ++i) {
2111     sorted_keys[i] = &key_context[i];
2112   }
2113   PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2114   MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
2115 }
2116 
MultiGetWithCallback(const ReadOptions & read_options,ColumnFamilyHandle * column_family,ReadCallback * callback,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2117 void DBImpl::MultiGetWithCallback(
2118     const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2119     ReadCallback* callback,
2120     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2121   std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
2122   multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
2123   std::function<MultiGetColumnFamilyData*(
2124       std::array<MultiGetColumnFamilyData, 1>::iterator&)>
2125       iter_deref_lambda =
2126           [](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
2127             return &(*cf_iter);
2128           };
2129 
2130   size_t num_keys = sorted_keys->size();
2131   SequenceNumber consistent_seqnum;
2132   bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
2133       read_options, callback, iter_deref_lambda, &multiget_cf_data,
2134       &consistent_seqnum);
2135 #ifndef NDEBUG
2136   assert(!unref_only);
2137 #else
2138   // Silence unused variable warning
2139   (void)unref_only;
2140 #endif  // NDEBUG
2141 
2142   if (callback && read_options.snapshot == nullptr) {
2143     // The unprep_seqs are not published for write unprepared, so it could be
2144     // that max_visible_seq is larger. Seek to the std::max of the two.
2145     // However, we still want our callback to contain the actual snapshot so
2146     // that it can do the correct visibility filtering.
2147     callback->Refresh(consistent_seqnum);
2148 
2149     // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
2150     // max_visible_seq = max(max_visible_seq, snapshot)
2151     //
2152     // Currently, the commented out assert is broken by
2153     // InvalidSnapshotReadCallback, but if write unprepared recovery followed
2154     // the regular transaction flow, then this special read callback would not
2155     // be needed.
2156     //
2157     // assert(callback->max_visible_seq() >= snapshot);
2158     consistent_seqnum = callback->max_visible_seq();
2159   }
2160 
2161   MultiGetImpl(read_options, 0, num_keys, sorted_keys,
2162                multiget_cf_data[0].super_version, consistent_seqnum, nullptr,
2163                nullptr);
2164   ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
2165                                multiget_cf_data[0].super_version);
2166 }
2167 
MultiGetImpl(const ReadOptions & read_options,size_t start_key,size_t num_keys,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys,SuperVersion * super_version,SequenceNumber snapshot,ReadCallback * callback,bool * is_blob_index)2168 void DBImpl::MultiGetImpl(
2169     const ReadOptions& read_options, size_t start_key, size_t num_keys,
2170     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2171     SuperVersion* super_version, SequenceNumber snapshot,
2172     ReadCallback* callback, bool* is_blob_index) {
2173   PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
2174   StopWatch sw(env_, stats_, DB_MULTIGET);
2175 
2176   // For each of the given keys, apply the entire "get" process as follows:
2177   // First look in the memtable, then in the immutable memtable (if any).
2178   // s is both in/out. When in, s could either be OK or MergeInProgress.
2179   // merge_operands will contain the sequence of merges in the latter case.
2180   size_t keys_left = num_keys;
2181   while (keys_left) {
2182     size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
2183                             ? MultiGetContext::MAX_BATCH_SIZE
2184                             : keys_left;
2185     MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
2186                         batch_size, snapshot);
2187     MultiGetRange range = ctx.GetMultiGetRange();
2188     bool lookup_current = false;
2189 
2190     keys_left -= batch_size;
2191     for (auto mget_iter = range.begin(); mget_iter != range.end();
2192          ++mget_iter) {
2193       mget_iter->merge_context.Clear();
2194       *mget_iter->s = Status::OK();
2195     }
2196 
2197     bool skip_memtable =
2198         (read_options.read_tier == kPersistedTier &&
2199          has_unpersisted_data_.load(std::memory_order_relaxed));
2200     if (!skip_memtable) {
2201       super_version->mem->MultiGet(read_options, &range, callback,
2202                                    is_blob_index);
2203       if (!range.empty()) {
2204         super_version->imm->MultiGet(read_options, &range, callback,
2205                                      is_blob_index);
2206       }
2207       if (!range.empty()) {
2208         lookup_current = true;
2209         uint64_t left = range.KeysLeft();
2210         RecordTick(stats_, MEMTABLE_MISS, left);
2211       }
2212     }
2213     if (lookup_current) {
2214       PERF_TIMER_GUARD(get_from_output_files_time);
2215       super_version->current->MultiGet(read_options, &range, callback,
2216                                        is_blob_index);
2217     }
2218   }
2219 
2220   // Post processing (decrement reference counts and record statistics)
2221   PERF_TIMER_GUARD(get_post_process_time);
2222   size_t num_found = 0;
2223   uint64_t bytes_read = 0;
2224   for (size_t i = start_key; i < start_key + num_keys; ++i) {
2225     KeyContext* key = (*sorted_keys)[i];
2226     if (key->s->ok()) {
2227       bytes_read += key->value->size();
2228       num_found++;
2229     }
2230   }
2231 
2232   RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2233   RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
2234   RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
2235   RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
2236   RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
2237   PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
2238   PERF_TIMER_STOP(get_post_process_time);
2239 }
2240 
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const std::string & column_family,ColumnFamilyHandle ** handle)2241 Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
2242                                   const std::string& column_family,
2243                                   ColumnFamilyHandle** handle) {
2244   assert(handle != nullptr);
2245   Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
2246   if (s.ok()) {
2247     s = WriteOptionsFile(true /*need_mutex_lock*/,
2248                          true /*need_enter_write_thread*/);
2249   }
2250   return s;
2251 }
2252 
CreateColumnFamilies(const ColumnFamilyOptions & cf_options,const std::vector<std::string> & column_family_names,std::vector<ColumnFamilyHandle * > * handles)2253 Status DBImpl::CreateColumnFamilies(
2254     const ColumnFamilyOptions& cf_options,
2255     const std::vector<std::string>& column_family_names,
2256     std::vector<ColumnFamilyHandle*>* handles) {
2257   assert(handles != nullptr);
2258   handles->clear();
2259   size_t num_cf = column_family_names.size();
2260   Status s;
2261   bool success_once = false;
2262   for (size_t i = 0; i < num_cf; i++) {
2263     ColumnFamilyHandle* handle;
2264     s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
2265     if (!s.ok()) {
2266       break;
2267     }
2268     handles->push_back(handle);
2269     success_once = true;
2270   }
2271   if (success_once) {
2272     Status persist_options_status = WriteOptionsFile(
2273         true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2274     if (s.ok() && !persist_options_status.ok()) {
2275       s = persist_options_status;
2276     }
2277   }
2278   return s;
2279 }
2280 
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles)2281 Status DBImpl::CreateColumnFamilies(
2282     const std::vector<ColumnFamilyDescriptor>& column_families,
2283     std::vector<ColumnFamilyHandle*>* handles) {
2284   assert(handles != nullptr);
2285   handles->clear();
2286   size_t num_cf = column_families.size();
2287   Status s;
2288   bool success_once = false;
2289   for (size_t i = 0; i < num_cf; i++) {
2290     ColumnFamilyHandle* handle;
2291     s = CreateColumnFamilyImpl(column_families[i].options,
2292                                column_families[i].name, &handle);
2293     if (!s.ok()) {
2294       break;
2295     }
2296     handles->push_back(handle);
2297     success_once = true;
2298   }
2299   if (success_once) {
2300     Status persist_options_status = WriteOptionsFile(
2301         true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2302     if (s.ok() && !persist_options_status.ok()) {
2303       s = persist_options_status;
2304     }
2305   }
2306   return s;
2307 }
2308 
CreateColumnFamilyImpl(const ColumnFamilyOptions & cf_options,const std::string & column_family_name,ColumnFamilyHandle ** handle)2309 Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
2310                                       const std::string& column_family_name,
2311                                       ColumnFamilyHandle** handle) {
2312   Status s;
2313   Status persist_options_status;
2314   *handle = nullptr;
2315 
2316   DBOptions db_options =
2317       BuildDBOptions(immutable_db_options_, mutable_db_options_);
2318   s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
2319   if (s.ok()) {
2320     for (auto& cf_path : cf_options.cf_paths) {
2321       s = env_->CreateDirIfMissing(cf_path.path);
2322       if (!s.ok()) {
2323         break;
2324       }
2325     }
2326   }
2327   if (!s.ok()) {
2328     return s;
2329   }
2330 
2331   SuperVersionContext sv_context(/* create_superversion */ true);
2332   {
2333     InstrumentedMutexLock l(&mutex_);
2334 
2335     if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
2336         nullptr) {
2337       return Status::InvalidArgument("Column family already exists");
2338     }
2339     VersionEdit edit;
2340     edit.AddColumnFamily(column_family_name);
2341     uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
2342     edit.SetColumnFamily(new_id);
2343     edit.SetLogNumber(logfile_number_);
2344     edit.SetComparatorName(cf_options.comparator->Name());
2345 
2346     // LogAndApply will both write the creation in MANIFEST and create
2347     // ColumnFamilyData object
2348     {  // write thread
2349       WriteThread::Writer w;
2350       write_thread_.EnterUnbatched(&w, &mutex_);
2351       // LogAndApply will both write the creation in MANIFEST and create
2352       // ColumnFamilyData object
2353       s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
2354                                  &mutex_, directories_.GetDbDir(), false,
2355                                  &cf_options);
2356       write_thread_.ExitUnbatched(&w);
2357     }
2358     if (s.ok()) {
2359       auto* cfd =
2360           versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2361       assert(cfd != nullptr);
2362       std::map<std::string, std::shared_ptr<FSDirectory>> dummy_created_dirs;
2363       s = cfd->AddDirectories(&dummy_created_dirs);
2364     }
2365     if (s.ok()) {
2366       single_column_family_mode_ = false;
2367       auto* cfd =
2368           versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2369       assert(cfd != nullptr);
2370       InstallSuperVersionAndScheduleWork(cfd, &sv_context,
2371                                          *cfd->GetLatestMutableCFOptions());
2372 
2373       if (!cfd->mem()->IsSnapshotSupported()) {
2374         is_snapshot_supported_ = false;
2375       }
2376 
2377       cfd->set_initialized();
2378 
2379       *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
2380       ROCKS_LOG_INFO(immutable_db_options_.info_log,
2381                      "Created column family [%s] (ID %u)",
2382                      column_family_name.c_str(), (unsigned)cfd->GetID());
2383     } else {
2384       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2385                       "Creating column family [%s] FAILED -- %s",
2386                       column_family_name.c_str(), s.ToString().c_str());
2387     }
2388   }  // InstrumentedMutexLock l(&mutex_)
2389 
2390   sv_context.Clean();
2391   // this is outside the mutex
2392   if (s.ok()) {
2393     NewThreadStatusCfInfo(
2394         reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
2395   }
2396   return s;
2397 }
2398 
DropColumnFamily(ColumnFamilyHandle * column_family)2399 Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
2400   assert(column_family != nullptr);
2401   Status s = DropColumnFamilyImpl(column_family);
2402   if (s.ok()) {
2403     s = WriteOptionsFile(true /*need_mutex_lock*/,
2404                          true /*need_enter_write_thread*/);
2405   }
2406   return s;
2407 }
2408 
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > & column_families)2409 Status DBImpl::DropColumnFamilies(
2410     const std::vector<ColumnFamilyHandle*>& column_families) {
2411   Status s;
2412   bool success_once = false;
2413   for (auto* handle : column_families) {
2414     s = DropColumnFamilyImpl(handle);
2415     if (!s.ok()) {
2416       break;
2417     }
2418     success_once = true;
2419   }
2420   if (success_once) {
2421     Status persist_options_status = WriteOptionsFile(
2422         true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2423     if (s.ok() && !persist_options_status.ok()) {
2424       s = persist_options_status;
2425     }
2426   }
2427   return s;
2428 }
2429 
DropColumnFamilyImpl(ColumnFamilyHandle * column_family)2430 Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
2431   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2432   auto cfd = cfh->cfd();
2433   if (cfd->GetID() == 0) {
2434     return Status::InvalidArgument("Can't drop default column family");
2435   }
2436 
2437   bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
2438 
2439   VersionEdit edit;
2440   edit.DropColumnFamily();
2441   edit.SetColumnFamily(cfd->GetID());
2442 
2443   Status s;
2444   {
2445     InstrumentedMutexLock l(&mutex_);
2446     if (cfd->IsDropped()) {
2447       s = Status::InvalidArgument("Column family already dropped!\n");
2448     }
2449     if (s.ok()) {
2450       // we drop column family from a single write thread
2451       WriteThread::Writer w;
2452       write_thread_.EnterUnbatched(&w, &mutex_);
2453       s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
2454                                  &mutex_);
2455       write_thread_.ExitUnbatched(&w);
2456     }
2457     if (s.ok()) {
2458       auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2459       max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
2460                                     mutable_cf_options->max_write_buffer_number;
2461     }
2462 
2463     if (!cf_support_snapshot) {
2464       // Dropped Column Family doesn't support snapshot. Need to recalculate
2465       // is_snapshot_supported_.
2466       bool new_is_snapshot_supported = true;
2467       for (auto c : *versions_->GetColumnFamilySet()) {
2468         if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
2469           new_is_snapshot_supported = false;
2470           break;
2471         }
2472       }
2473       is_snapshot_supported_ = new_is_snapshot_supported;
2474     }
2475     bg_cv_.SignalAll();
2476   }
2477 
2478   if (s.ok()) {
2479     // Note that here we erase the associated cf_info of the to-be-dropped
2480     // cfd before its ref-count goes to zero to avoid having to erase cf_info
2481     // later inside db_mutex.
2482     EraseThreadStatusCfInfo(cfd);
2483     assert(cfd->IsDropped());
2484     ROCKS_LOG_INFO(immutable_db_options_.info_log,
2485                    "Dropped column family with id %u\n", cfd->GetID());
2486   } else {
2487     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2488                     "Dropping column family with id %u FAILED -- %s\n",
2489                     cfd->GetID(), s.ToString().c_str());
2490   }
2491 
2492   return s;
2493 }
2494 
KeyMayExist(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,std::string * timestamp,bool * value_found)2495 bool DBImpl::KeyMayExist(const ReadOptions& read_options,
2496                          ColumnFamilyHandle* column_family, const Slice& key,
2497                          std::string* value, std::string* timestamp,
2498                          bool* value_found) {
2499   assert(value != nullptr);
2500   if (value_found != nullptr) {
2501     // falsify later if key-may-exist but can't fetch value
2502     *value_found = true;
2503   }
2504   ReadOptions roptions = read_options;
2505   roptions.read_tier = kBlockCacheTier;  // read from block cache only
2506   PinnableSlice pinnable_val;
2507   GetImplOptions get_impl_options;
2508   get_impl_options.column_family = column_family;
2509   get_impl_options.value = &pinnable_val;
2510   get_impl_options.value_found = value_found;
2511   get_impl_options.timestamp = timestamp;
2512   auto s = GetImpl(roptions, key, get_impl_options);
2513   value->assign(pinnable_val.data(), pinnable_val.size());
2514 
2515   // If block_cache is enabled and the index block of the table didn't
2516   // not present in block_cache, the return value will be Status::Incomplete.
2517   // In this case, key may still exist in the table.
2518   return s.ok() || s.IsIncomplete();
2519 }
2520 
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)2521 Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
2522                               ColumnFamilyHandle* column_family) {
2523   if (read_options.managed) {
2524     return NewErrorIterator(
2525         Status::NotSupported("Managed iterator is not supported anymore."));
2526   }
2527   Iterator* result = nullptr;
2528   if (read_options.read_tier == kPersistedTier) {
2529     return NewErrorIterator(Status::NotSupported(
2530         "ReadTier::kPersistedData is not yet supported in iterators."));
2531   }
2532   // if iterator wants internal keys, we can only proceed if
2533   // we can guarantee the deletes haven't been processed yet
2534   if (immutable_db_options_.preserve_deletes &&
2535       read_options.iter_start_seqnum > 0 &&
2536       read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
2537     return NewErrorIterator(Status::InvalidArgument(
2538         "Iterator requested internal keys which are too old and are not"
2539         " guaranteed to be preserved, try larger iter_start_seqnum opt."));
2540   }
2541   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2542   auto cfd = cfh->cfd();
2543   ReadCallback* read_callback = nullptr;  // No read callback provided.
2544   if (read_options.tailing) {
2545 #ifdef ROCKSDB_LITE
2546     // not supported in lite version
2547     result = nullptr;
2548 
2549 #else
2550     SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2551     auto iter = new ForwardIterator(this, read_options, cfd, sv);
2552     result = NewDBIterator(
2553         env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2554         cfd->user_comparator(), iter, kMaxSequenceNumber,
2555         sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
2556         this, cfd);
2557 #endif
2558   } else {
2559     // Note: no need to consider the special case of
2560     // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
2561     // WritePreparedTxnDB
2562     auto snapshot = read_options.snapshot != nullptr
2563                         ? read_options.snapshot->GetSequenceNumber()
2564                         : versions_->LastSequence();
2565     result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
2566   }
2567   return result;
2568 }
2569 
NewIteratorImpl(const ReadOptions & read_options,ColumnFamilyData * cfd,SequenceNumber snapshot,ReadCallback * read_callback,bool allow_blob,bool allow_refresh)2570 ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
2571                                             ColumnFamilyData* cfd,
2572                                             SequenceNumber snapshot,
2573                                             ReadCallback* read_callback,
2574                                             bool allow_blob,
2575                                             bool allow_refresh) {
2576   SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2577 
2578   // Try to generate a DB iterator tree in continuous memory area to be
2579   // cache friendly. Here is an example of result:
2580   // +-------------------------------+
2581   // |                               |
2582   // | ArenaWrappedDBIter            |
2583   // |  +                            |
2584   // |  +---> Inner Iterator   ------------+
2585   // |  |                            |     |
2586   // |  |    +-- -- -- -- -- -- -- --+     |
2587   // |  +--- | Arena                 |     |
2588   // |       |                       |     |
2589   // |          Allocated Memory:    |     |
2590   // |       |   +-------------------+     |
2591   // |       |   | DBIter            | <---+
2592   // |           |  +                |
2593   // |       |   |  +-> iter_  ------------+
2594   // |       |   |                   |     |
2595   // |       |   +-------------------+     |
2596   // |       |   | MergingIterator   | <---+
2597   // |           |  +                |
2598   // |       |   |  +->child iter1  ------------+
2599   // |       |   |  |                |          |
2600   // |           |  +->child iter2  ----------+ |
2601   // |       |   |  |                |        | |
2602   // |       |   |  +->child iter3  --------+ | |
2603   // |           |                   |      | | |
2604   // |       |   +-------------------+      | | |
2605   // |       |   | Iterator1         | <--------+
2606   // |       |   +-------------------+      | |
2607   // |       |   | Iterator2         | <------+
2608   // |       |   +-------------------+      |
2609   // |       |   | Iterator3         | <----+
2610   // |       |   +-------------------+
2611   // |       |                       |
2612   // +-------+-----------------------+
2613   //
2614   // ArenaWrappedDBIter inlines an arena area where all the iterators in
2615   // the iterator tree are allocated in the order of being accessed when
2616   // querying.
2617   // Laying out the iterators in the order of being accessed makes it more
2618   // likely that any iterator pointer is close to the iterator it points to so
2619   // that they are likely to be in the same cache line and/or page.
2620   ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2621       env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
2622       sv->mutable_cf_options.max_sequential_skip_in_iterations,
2623       sv->version_number, read_callback, this, cfd, allow_blob,
2624       read_options.snapshot != nullptr ? false : allow_refresh);
2625 
2626   InternalIterator* internal_iter =
2627       NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
2628                           db_iter->GetRangeDelAggregator(), snapshot);
2629   db_iter->SetIterUnderDBIter(internal_iter);
2630 
2631   return db_iter;
2632 }
2633 
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)2634 Status DBImpl::NewIterators(
2635     const ReadOptions& read_options,
2636     const std::vector<ColumnFamilyHandle*>& column_families,
2637     std::vector<Iterator*>* iterators) {
2638   if (read_options.managed) {
2639     return Status::NotSupported("Managed iterator is not supported anymore.");
2640   }
2641   if (read_options.read_tier == kPersistedTier) {
2642     return Status::NotSupported(
2643         "ReadTier::kPersistedData is not yet supported in iterators.");
2644   }
2645   ReadCallback* read_callback = nullptr;  // No read callback provided.
2646   iterators->clear();
2647   iterators->reserve(column_families.size());
2648   if (read_options.tailing) {
2649 #ifdef ROCKSDB_LITE
2650     return Status::InvalidArgument(
2651         "Tailing iterator not supported in RocksDB lite");
2652 #else
2653     for (auto cfh : column_families) {
2654       auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
2655       SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2656       auto iter = new ForwardIterator(this, read_options, cfd, sv);
2657       iterators->push_back(NewDBIterator(
2658           env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2659           cfd->user_comparator(), iter, kMaxSequenceNumber,
2660           sv->mutable_cf_options.max_sequential_skip_in_iterations,
2661           read_callback, this, cfd));
2662     }
2663 #endif
2664   } else {
2665     // Note: no need to consider the special case of
2666     // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
2667     // WritePreparedTxnDB
2668     auto snapshot = read_options.snapshot != nullptr
2669                         ? read_options.snapshot->GetSequenceNumber()
2670                         : versions_->LastSequence();
2671     for (size_t i = 0; i < column_families.size(); ++i) {
2672       auto* cfd =
2673           reinterpret_cast<ColumnFamilyHandleImpl*>(column_families[i])->cfd();
2674       iterators->push_back(
2675           NewIteratorImpl(read_options, cfd, snapshot, read_callback));
2676     }
2677   }
2678 
2679   return Status::OK();
2680 }
2681 
GetSnapshot()2682 const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
2683 
2684 #ifndef ROCKSDB_LITE
GetSnapshotForWriteConflictBoundary()2685 const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
2686   return GetSnapshotImpl(true);
2687 }
2688 #endif  // ROCKSDB_LITE
2689 
GetSnapshotImpl(bool is_write_conflict_boundary,bool lock)2690 SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
2691                                       bool lock) {
2692   int64_t unix_time = 0;
2693   env_->GetCurrentTime(&unix_time);  // Ignore error
2694   SnapshotImpl* s = new SnapshotImpl;
2695 
2696   if (lock) {
2697     mutex_.Lock();
2698   }
2699   // returns null if the underlying memtable does not support snapshot.
2700   if (!is_snapshot_supported_) {
2701     if (lock) {
2702       mutex_.Unlock();
2703     }
2704     delete s;
2705     return nullptr;
2706   }
2707   auto snapshot_seq = last_seq_same_as_publish_seq_
2708                           ? versions_->LastSequence()
2709                           : versions_->LastPublishedSequence();
2710   SnapshotImpl* snapshot =
2711       snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
2712   if (lock) {
2713     mutex_.Unlock();
2714   }
2715   return snapshot;
2716 }
2717 
2718 namespace {
2719 typedef autovector<ColumnFamilyData*, 2> CfdList;
CfdListContains(const CfdList & list,ColumnFamilyData * cfd)2720 bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
2721   for (const ColumnFamilyData* t : list) {
2722     if (t == cfd) {
2723       return true;
2724     }
2725   }
2726   return false;
2727 }
2728 }  //  namespace
2729 
ReleaseSnapshot(const Snapshot * s)2730 void DBImpl::ReleaseSnapshot(const Snapshot* s) {
2731   const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
2732   {
2733     InstrumentedMutexLock l(&mutex_);
2734     snapshots_.Delete(casted_s);
2735     uint64_t oldest_snapshot;
2736     if (snapshots_.empty()) {
2737       oldest_snapshot = last_seq_same_as_publish_seq_
2738                             ? versions_->LastSequence()
2739                             : versions_->LastPublishedSequence();
2740     } else {
2741       oldest_snapshot = snapshots_.oldest()->number_;
2742     }
2743     // Avoid to go through every column family by checking a global threshold
2744     // first.
2745     if (oldest_snapshot > bottommost_files_mark_threshold_) {
2746       CfdList cf_scheduled;
2747       for (auto* cfd : *versions_->GetColumnFamilySet()) {
2748         cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
2749         if (!cfd->current()
2750                  ->storage_info()
2751                  ->BottommostFilesMarkedForCompaction()
2752                  .empty()) {
2753           SchedulePendingCompaction(cfd);
2754           MaybeScheduleFlushOrCompaction();
2755           cf_scheduled.push_back(cfd);
2756         }
2757       }
2758 
2759       // Calculate a new threshold, skipping those CFs where compactions are
2760       // scheduled. We do not do the same pass as the previous loop because
2761       // mutex might be unlocked during the loop, making the result inaccurate.
2762       SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
2763       for (auto* cfd : *versions_->GetColumnFamilySet()) {
2764         if (CfdListContains(cf_scheduled, cfd)) {
2765           continue;
2766         }
2767         new_bottommost_files_mark_threshold = std::min(
2768             new_bottommost_files_mark_threshold,
2769             cfd->current()->storage_info()->bottommost_files_mark_threshold());
2770       }
2771       bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
2772     }
2773   }
2774   delete casted_s;
2775 }
2776 
2777 #ifndef ROCKSDB_LITE
GetPropertiesOfAllTables(ColumnFamilyHandle * column_family,TablePropertiesCollection * props)2778 Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
2779                                         TablePropertiesCollection* props) {
2780   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2781   auto cfd = cfh->cfd();
2782 
2783   // Increment the ref count
2784   mutex_.Lock();
2785   auto version = cfd->current();
2786   version->Ref();
2787   mutex_.Unlock();
2788 
2789   auto s = version->GetPropertiesOfAllTables(props);
2790 
2791   // Decrement the ref count
2792   mutex_.Lock();
2793   version->Unref();
2794   mutex_.Unlock();
2795 
2796   return s;
2797 }
2798 
GetPropertiesOfTablesInRange(ColumnFamilyHandle * column_family,const Range * range,std::size_t n,TablePropertiesCollection * props)2799 Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
2800                                             const Range* range, std::size_t n,
2801                                             TablePropertiesCollection* props) {
2802   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2803   auto cfd = cfh->cfd();
2804 
2805   // Increment the ref count
2806   mutex_.Lock();
2807   auto version = cfd->current();
2808   version->Ref();
2809   mutex_.Unlock();
2810 
2811   auto s = version->GetPropertiesOfTablesInRange(range, n, props);
2812 
2813   // Decrement the ref count
2814   mutex_.Lock();
2815   version->Unref();
2816   mutex_.Unlock();
2817 
2818   return s;
2819 }
2820 
2821 #endif  // ROCKSDB_LITE
2822 
GetName() const2823 const std::string& DBImpl::GetName() const { return dbname_; }
2824 
GetEnv() const2825 Env* DBImpl::GetEnv() const { return env_; }
2826 
GetFileSystem() const2827 FileSystem* DB::GetFileSystem() const {
2828   static LegacyFileSystemWrapper fs_wrap(GetEnv());
2829   return &fs_wrap;
2830 }
2831 
GetFileSystem() const2832 FileSystem* DBImpl::GetFileSystem() const {
2833   return immutable_db_options_.fs.get();
2834 }
2835 
GetOptions(ColumnFamilyHandle * column_family) const2836 Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
2837   InstrumentedMutexLock l(&mutex_);
2838   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2839   return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
2840                  cfh->cfd()->GetLatestCFOptions());
2841 }
2842 
GetDBOptions() const2843 DBOptions DBImpl::GetDBOptions() const {
2844   InstrumentedMutexLock l(&mutex_);
2845   return BuildDBOptions(immutable_db_options_, mutable_db_options_);
2846 }
2847 
GetProperty(ColumnFamilyHandle * column_family,const Slice & property,std::string * value)2848 bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
2849                          const Slice& property, std::string* value) {
2850   const DBPropertyInfo* property_info = GetPropertyInfo(property);
2851   value->clear();
2852   auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2853   if (property_info == nullptr) {
2854     return false;
2855   } else if (property_info->handle_int) {
2856     uint64_t int_value;
2857     bool ret_value =
2858         GetIntPropertyInternal(cfd, *property_info, false, &int_value);
2859     if (ret_value) {
2860       *value = ToString(int_value);
2861     }
2862     return ret_value;
2863   } else if (property_info->handle_string) {
2864     InstrumentedMutexLock l(&mutex_);
2865     return cfd->internal_stats()->GetStringProperty(*property_info, property,
2866                                                     value);
2867   } else if (property_info->handle_string_dbimpl) {
2868     std::string tmp_value;
2869     bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
2870     if (ret_value) {
2871       *value = tmp_value;
2872     }
2873     return ret_value;
2874   }
2875   // Shouldn't reach here since exactly one of handle_string and handle_int
2876   // should be non-nullptr.
2877   assert(false);
2878   return false;
2879 }
2880 
GetMapProperty(ColumnFamilyHandle * column_family,const Slice & property,std::map<std::string,std::string> * value)2881 bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
2882                             const Slice& property,
2883                             std::map<std::string, std::string>* value) {
2884   const DBPropertyInfo* property_info = GetPropertyInfo(property);
2885   value->clear();
2886   auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2887   if (property_info == nullptr) {
2888     return false;
2889   } else if (property_info->handle_map) {
2890     InstrumentedMutexLock l(&mutex_);
2891     return cfd->internal_stats()->GetMapProperty(*property_info, property,
2892                                                  value);
2893   }
2894   // If we reach this point it means that handle_map is not provided for the
2895   // requested property
2896   return false;
2897 }
2898 
GetIntProperty(ColumnFamilyHandle * column_family,const Slice & property,uint64_t * value)2899 bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
2900                             const Slice& property, uint64_t* value) {
2901   const DBPropertyInfo* property_info = GetPropertyInfo(property);
2902   if (property_info == nullptr || property_info->handle_int == nullptr) {
2903     return false;
2904   }
2905   auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2906   return GetIntPropertyInternal(cfd, *property_info, false, value);
2907 }
2908 
GetIntPropertyInternal(ColumnFamilyData * cfd,const DBPropertyInfo & property_info,bool is_locked,uint64_t * value)2909 bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
2910                                     const DBPropertyInfo& property_info,
2911                                     bool is_locked, uint64_t* value) {
2912   assert(property_info.handle_int != nullptr);
2913   if (!property_info.need_out_of_mutex) {
2914     if (is_locked) {
2915       mutex_.AssertHeld();
2916       return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2917     } else {
2918       InstrumentedMutexLock l(&mutex_);
2919       return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2920     }
2921   } else {
2922     SuperVersion* sv = nullptr;
2923     if (!is_locked) {
2924       sv = GetAndRefSuperVersion(cfd);
2925     } else {
2926       sv = cfd->GetSuperVersion();
2927     }
2928 
2929     bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
2930         property_info, sv->current, value);
2931 
2932     if (!is_locked) {
2933       ReturnAndCleanupSuperVersion(cfd, sv);
2934     }
2935 
2936     return ret;
2937   }
2938 }
2939 
GetPropertyHandleOptionsStatistics(std::string * value)2940 bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
2941   assert(value != nullptr);
2942   Statistics* statistics = immutable_db_options_.statistics.get();
2943   if (!statistics) {
2944     return false;
2945   }
2946   *value = statistics->ToString();
2947   return true;
2948 }
2949 
2950 #ifndef ROCKSDB_LITE
ResetStats()2951 Status DBImpl::ResetStats() {
2952   InstrumentedMutexLock l(&mutex_);
2953   for (auto* cfd : *versions_->GetColumnFamilySet()) {
2954     if (cfd->initialized()) {
2955       cfd->internal_stats()->Clear();
2956     }
2957   }
2958   return Status::OK();
2959 }
2960 #endif  // ROCKSDB_LITE
2961 
GetAggregatedIntProperty(const Slice & property,uint64_t * aggregated_value)2962 bool DBImpl::GetAggregatedIntProperty(const Slice& property,
2963                                       uint64_t* aggregated_value) {
2964   const DBPropertyInfo* property_info = GetPropertyInfo(property);
2965   if (property_info == nullptr || property_info->handle_int == nullptr) {
2966     return false;
2967   }
2968 
2969   uint64_t sum = 0;
2970   {
2971     // Needs mutex to protect the list of column families.
2972     InstrumentedMutexLock l(&mutex_);
2973     uint64_t value;
2974     for (auto* cfd : *versions_->GetColumnFamilySet()) {
2975       if (!cfd->initialized()) {
2976         continue;
2977       }
2978       if (GetIntPropertyInternal(cfd, *property_info, true, &value)) {
2979         sum += value;
2980       } else {
2981         return false;
2982       }
2983     }
2984   }
2985   *aggregated_value = sum;
2986   return true;
2987 }
2988 
GetAndRefSuperVersion(ColumnFamilyData * cfd)2989 SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
2990   // TODO(ljin): consider using GetReferencedSuperVersion() directly
2991   return cfd->GetThreadLocalSuperVersion(this);
2992 }
2993 
2994 // REQUIRED: this function should only be called on the write thread or if the
2995 // mutex is held.
GetAndRefSuperVersion(uint32_t column_family_id)2996 SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
2997   auto column_family_set = versions_->GetColumnFamilySet();
2998   auto cfd = column_family_set->GetColumnFamily(column_family_id);
2999   if (!cfd) {
3000     return nullptr;
3001   }
3002 
3003   return GetAndRefSuperVersion(cfd);
3004 }
3005 
CleanupSuperVersion(SuperVersion * sv)3006 void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
3007   // Release SuperVersion
3008   if (sv->Unref()) {
3009     bool defer_purge =
3010             immutable_db_options().avoid_unnecessary_blocking_io;
3011     {
3012       InstrumentedMutexLock l(&mutex_);
3013       sv->Cleanup();
3014       if (defer_purge) {
3015         AddSuperVersionsToFreeQueue(sv);
3016         SchedulePurge();
3017       }
3018     }
3019     if (!defer_purge) {
3020       delete sv;
3021     }
3022     RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
3023   }
3024   RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
3025 }
3026 
ReturnAndCleanupSuperVersion(ColumnFamilyData * cfd,SuperVersion * sv)3027 void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
3028                                           SuperVersion* sv) {
3029   if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
3030     CleanupSuperVersion(sv);
3031   }
3032 }
3033 
3034 // REQUIRED: this function should only be called on the write thread.
ReturnAndCleanupSuperVersion(uint32_t column_family_id,SuperVersion * sv)3035 void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
3036                                           SuperVersion* sv) {
3037   auto column_family_set = versions_->GetColumnFamilySet();
3038   auto cfd = column_family_set->GetColumnFamily(column_family_id);
3039 
3040   // If SuperVersion is held, and we successfully fetched a cfd using
3041   // GetAndRefSuperVersion(), it must still exist.
3042   assert(cfd != nullptr);
3043   ReturnAndCleanupSuperVersion(cfd, sv);
3044 }
3045 
3046 // REQUIRED: this function should only be called on the write thread or if the
3047 // mutex is held.
GetColumnFamilyHandle(uint32_t column_family_id)3048 ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
3049   ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
3050 
3051   if (!cf_memtables->Seek(column_family_id)) {
3052     return nullptr;
3053   }
3054 
3055   return cf_memtables->GetColumnFamilyHandle();
3056 }
3057 
3058 // REQUIRED: mutex is NOT held.
GetColumnFamilyHandleUnlocked(uint32_t column_family_id)3059 std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
3060     uint32_t column_family_id) {
3061   InstrumentedMutexLock l(&mutex_);
3062 
3063   auto* cfd =
3064       versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
3065   if (cfd == nullptr) {
3066     return nullptr;
3067   }
3068 
3069   return std::unique_ptr<ColumnFamilyHandleImpl>(
3070       new ColumnFamilyHandleImpl(cfd, this, &mutex_));
3071 }
3072 
GetApproximateMemTableStats(ColumnFamilyHandle * column_family,const Range & range,uint64_t * const count,uint64_t * const size)3073 void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
3074                                          const Range& range,
3075                                          uint64_t* const count,
3076                                          uint64_t* const size) {
3077   ColumnFamilyHandleImpl* cfh =
3078       reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3079   ColumnFamilyData* cfd = cfh->cfd();
3080   SuperVersion* sv = GetAndRefSuperVersion(cfd);
3081 
3082   // Convert user_key into a corresponding internal key.
3083   InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
3084   InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
3085   MemTable::MemTableStats memStats =
3086       sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
3087   MemTable::MemTableStats immStats =
3088       sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
3089   *count = memStats.count + immStats.count;
3090   *size = memStats.size + immStats.size;
3091 
3092   ReturnAndCleanupSuperVersion(cfd, sv);
3093 }
3094 
GetApproximateSizes(const SizeApproximationOptions & options,ColumnFamilyHandle * column_family,const Range * range,int n,uint64_t * sizes)3095 Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
3096                                    ColumnFamilyHandle* column_family,
3097                                    const Range* range, int n, uint64_t* sizes) {
3098   if (!options.include_memtabtles && !options.include_files) {
3099     return Status::InvalidArgument("Invalid options");
3100   }
3101 
3102   Version* v;
3103   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3104   auto cfd = cfh->cfd();
3105   SuperVersion* sv = GetAndRefSuperVersion(cfd);
3106   v = sv->current;
3107 
3108   for (int i = 0; i < n; i++) {
3109     // Convert user_key into a corresponding internal key.
3110     InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
3111     InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
3112     sizes[i] = 0;
3113     if (options.include_files) {
3114       sizes[i] += versions_->ApproximateSize(
3115           options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
3116           /*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
3117     }
3118     if (options.include_memtabtles) {
3119       sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
3120       sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
3121     }
3122   }
3123 
3124   ReturnAndCleanupSuperVersion(cfd, sv);
3125   return Status::OK();
3126 }
3127 
3128 std::list<uint64_t>::iterator
CaptureCurrentFileNumberInPendingOutputs()3129 DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
3130   // We need to remember the iterator of our insert, because after the
3131   // background job is done, we need to remove that element from
3132   // pending_outputs_.
3133   pending_outputs_.push_back(versions_->current_next_file_number());
3134   auto pending_outputs_inserted_elem = pending_outputs_.end();
3135   --pending_outputs_inserted_elem;
3136   return pending_outputs_inserted_elem;
3137 }
3138 
ReleaseFileNumberFromPendingOutputs(std::unique_ptr<std::list<uint64_t>::iterator> & v)3139 void DBImpl::ReleaseFileNumberFromPendingOutputs(
3140     std::unique_ptr<std::list<uint64_t>::iterator>& v) {
3141   if (v.get() != nullptr) {
3142     pending_outputs_.erase(*v.get());
3143     v.reset();
3144   }
3145 }
3146 
3147 #ifndef ROCKSDB_LITE
GetUpdatesSince(SequenceNumber seq,std::unique_ptr<TransactionLogIterator> * iter,const TransactionLogIterator::ReadOptions & read_options)3148 Status DBImpl::GetUpdatesSince(
3149     SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
3150     const TransactionLogIterator::ReadOptions& read_options) {
3151   RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
3152   if (seq > versions_->LastSequence()) {
3153     return Status::NotFound("Requested sequence not yet written in the db");
3154   }
3155   return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
3156 }
3157 
DeleteFile(std::string name)3158 Status DBImpl::DeleteFile(std::string name) {
3159   uint64_t number;
3160   FileType type;
3161   WalFileType log_type;
3162   if (!ParseFileName(name, &number, &type, &log_type) ||
3163       (type != kTableFile && type != kLogFile)) {
3164     ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
3165                     name.c_str());
3166     return Status::InvalidArgument("Invalid file name");
3167   }
3168 
3169   Status status;
3170   if (type == kLogFile) {
3171     // Only allow deleting archived log files
3172     if (log_type != kArchivedLogFile) {
3173       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3174                       "DeleteFile %s failed - not archived log.\n",
3175                       name.c_str());
3176       return Status::NotSupported("Delete only supported for archived logs");
3177     }
3178     status = wal_manager_.DeleteFile(name, number);
3179     if (!status.ok()) {
3180       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3181                       "DeleteFile %s failed -- %s.\n", name.c_str(),
3182                       status.ToString().c_str());
3183     }
3184     return status;
3185   }
3186 
3187   int level;
3188   FileMetaData* metadata;
3189   ColumnFamilyData* cfd;
3190   VersionEdit edit;
3191   JobContext job_context(next_job_id_.fetch_add(1), true);
3192   {
3193     InstrumentedMutexLock l(&mutex_);
3194     status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
3195     if (!status.ok()) {
3196       ROCKS_LOG_WARN(immutable_db_options_.info_log,
3197                      "DeleteFile %s failed. File not found\n", name.c_str());
3198       job_context.Clean();
3199       return Status::InvalidArgument("File not found");
3200     }
3201     assert(level < cfd->NumberLevels());
3202 
3203     // If the file is being compacted no need to delete.
3204     if (metadata->being_compacted) {
3205       ROCKS_LOG_INFO(immutable_db_options_.info_log,
3206                      "DeleteFile %s Skipped. File about to be compacted\n",
3207                      name.c_str());
3208       job_context.Clean();
3209       return Status::OK();
3210     }
3211 
3212     // Only the files in the last level can be deleted externally.
3213     // This is to make sure that any deletion tombstones are not
3214     // lost. Check that the level passed is the last level.
3215     auto* vstoreage = cfd->current()->storage_info();
3216     for (int i = level + 1; i < cfd->NumberLevels(); i++) {
3217       if (vstoreage->NumLevelFiles(i) != 0) {
3218         ROCKS_LOG_WARN(immutable_db_options_.info_log,
3219                        "DeleteFile %s FAILED. File not in last level\n",
3220                        name.c_str());
3221         job_context.Clean();
3222         return Status::InvalidArgument("File not in last level");
3223       }
3224     }
3225     // if level == 0, it has to be the oldest file
3226     if (level == 0 &&
3227         vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
3228       ROCKS_LOG_WARN(immutable_db_options_.info_log,
3229                      "DeleteFile %s failed ---"
3230                      " target file in level 0 must be the oldest.",
3231                      name.c_str());
3232       job_context.Clean();
3233       return Status::InvalidArgument("File in level 0, but not oldest");
3234     }
3235     edit.SetColumnFamily(cfd->GetID());
3236     edit.DeleteFile(level, number);
3237     status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3238                                     &edit, &mutex_, directories_.GetDbDir());
3239     if (status.ok()) {
3240       InstallSuperVersionAndScheduleWork(cfd,
3241                                          &job_context.superversion_contexts[0],
3242                                          *cfd->GetLatestMutableCFOptions());
3243     }
3244     FindObsoleteFiles(&job_context, false);
3245   }  // lock released here
3246 
3247   LogFlush(immutable_db_options_.info_log);
3248   // remove files outside the db-lock
3249   if (job_context.HaveSomethingToDelete()) {
3250     // Call PurgeObsoleteFiles() without holding mutex.
3251     PurgeObsoleteFiles(job_context);
3252   }
3253   job_context.Clean();
3254   return status;
3255 }
3256 
DeleteFilesInRanges(ColumnFamilyHandle * column_family,const RangePtr * ranges,size_t n,bool include_end)3257 Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
3258                                    const RangePtr* ranges, size_t n,
3259                                    bool include_end) {
3260   Status status;
3261   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3262   ColumnFamilyData* cfd = cfh->cfd();
3263   VersionEdit edit;
3264   std::set<FileMetaData*> deleted_files;
3265   JobContext job_context(next_job_id_.fetch_add(1), true);
3266   {
3267     InstrumentedMutexLock l(&mutex_);
3268     Version* input_version = cfd->current();
3269 
3270     auto* vstorage = input_version->storage_info();
3271     for (size_t r = 0; r < n; r++) {
3272       auto begin = ranges[r].start, end = ranges[r].limit;
3273       for (int i = 1; i < cfd->NumberLevels(); i++) {
3274         if (vstorage->LevelFiles(i).empty() ||
3275             !vstorage->OverlapInLevel(i, begin, end)) {
3276           continue;
3277         }
3278         std::vector<FileMetaData*> level_files;
3279         InternalKey begin_storage, end_storage, *begin_key, *end_key;
3280         if (begin == nullptr) {
3281           begin_key = nullptr;
3282         } else {
3283           begin_storage.SetMinPossibleForUserKey(*begin);
3284           begin_key = &begin_storage;
3285         }
3286         if (end == nullptr) {
3287           end_key = nullptr;
3288         } else {
3289           end_storage.SetMaxPossibleForUserKey(*end);
3290           end_key = &end_storage;
3291         }
3292 
3293         vstorage->GetCleanInputsWithinInterval(
3294             i, begin_key, end_key, &level_files, -1 /* hint_index */,
3295             nullptr /* file_index */);
3296         FileMetaData* level_file;
3297         for (uint32_t j = 0; j < level_files.size(); j++) {
3298           level_file = level_files[j];
3299           if (level_file->being_compacted) {
3300             continue;
3301           }
3302           if (deleted_files.find(level_file) != deleted_files.end()) {
3303             continue;
3304           }
3305           if (!include_end && end != nullptr &&
3306               cfd->user_comparator()->Compare(level_file->largest.user_key(),
3307                                               *end) == 0) {
3308             continue;
3309           }
3310           edit.SetColumnFamily(cfd->GetID());
3311           edit.DeleteFile(i, level_file->fd.GetNumber());
3312           deleted_files.insert(level_file);
3313           level_file->being_compacted = true;
3314         }
3315       }
3316     }
3317     if (edit.GetDeletedFiles().empty()) {
3318       job_context.Clean();
3319       return Status::OK();
3320     }
3321     input_version->Ref();
3322     status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3323                                     &edit, &mutex_, directories_.GetDbDir());
3324     if (status.ok()) {
3325       InstallSuperVersionAndScheduleWork(cfd,
3326                                          &job_context.superversion_contexts[0],
3327                                          *cfd->GetLatestMutableCFOptions());
3328     }
3329     for (auto* deleted_file : deleted_files) {
3330       deleted_file->being_compacted = false;
3331     }
3332     input_version->Unref();
3333     FindObsoleteFiles(&job_context, false);
3334   }  // lock released here
3335 
3336   LogFlush(immutable_db_options_.info_log);
3337   // remove files outside the db-lock
3338   if (job_context.HaveSomethingToDelete()) {
3339     // Call PurgeObsoleteFiles() without holding mutex.
3340     PurgeObsoleteFiles(job_context);
3341   }
3342   job_context.Clean();
3343   return status;
3344 }
3345 
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)3346 void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
3347   InstrumentedMutexLock l(&mutex_);
3348   versions_->GetLiveFilesMetaData(metadata);
3349 }
3350 
GetColumnFamilyMetaData(ColumnFamilyHandle * column_family,ColumnFamilyMetaData * cf_meta)3351 void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
3352                                      ColumnFamilyMetaData* cf_meta) {
3353   assert(column_family);
3354   auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
3355   auto* sv = GetAndRefSuperVersion(cfd);
3356   {
3357     // Without mutex, Version::GetColumnFamilyMetaData will have data race with
3358     // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
3359     // this may cause regression. An alternative is to make
3360     // FileMetaData::being_compacted atomic, but it will make FileMetaData
3361     // non-copy-able. Another option is to separate these variables from
3362     // original FileMetaData struct, and this requires re-organization of data
3363     // structures. For now, we take the easy approach. If
3364     // DB::GetColumnFamilyMetaData is not called frequently, the regression
3365     // should not be big. We still need to keep an eye on it.
3366     InstrumentedMutexLock l(&mutex_);
3367     sv->current->GetColumnFamilyMetaData(cf_meta);
3368   }
3369   ReturnAndCleanupSuperVersion(cfd, sv);
3370 }
3371 
3372 #endif  // ROCKSDB_LITE
3373 
CheckConsistency()3374 Status DBImpl::CheckConsistency() {
3375   mutex_.AssertHeld();
3376   std::vector<LiveFileMetaData> metadata;
3377   versions_->GetLiveFilesMetaData(&metadata);
3378   TEST_SYNC_POINT("DBImpl::CheckConsistency:AfterGetLiveFilesMetaData");
3379 
3380   std::string corruption_messages;
3381 
3382   if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
3383     // Instead of calling GetFileSize() for each expected file, call
3384     // GetChildren() for the DB directory and check that all expected files
3385     // are listed, without checking their sizes.
3386     // Since sst files might be in different directories, do it for each
3387     // directory separately.
3388     std::map<std::string, std::vector<std::string>> files_by_directory;
3389     for (const auto& md : metadata) {
3390       // md.name has a leading "/". Remove it.
3391       std::string fname = md.name;
3392       if (!fname.empty() && fname[0] == '/') {
3393         fname = fname.substr(1);
3394       }
3395       files_by_directory[md.db_path].push_back(fname);
3396     }
3397     for (const auto& dir_files : files_by_directory) {
3398       std::string directory = dir_files.first;
3399       std::vector<std::string> existing_files;
3400       Status s = env_->GetChildren(directory, &existing_files);
3401       if (!s.ok()) {
3402         corruption_messages +=
3403             "Can't list files in " + directory + ": " + s.ToString() + "\n";
3404         continue;
3405       }
3406       std::sort(existing_files.begin(), existing_files.end());
3407 
3408       for (const std::string& fname : dir_files.second) {
3409         if (!std::binary_search(existing_files.begin(), existing_files.end(),
3410                                 fname) &&
3411             !std::binary_search(existing_files.begin(), existing_files.end(),
3412                                 Rocks2LevelTableFileName(fname))) {
3413           corruption_messages +=
3414               "Missing sst file " + fname + " in " + directory + "\n";
3415         }
3416       }
3417     }
3418   } else {
3419     for (const auto& md : metadata) {
3420       // md.name has a leading "/".
3421       std::string file_path = md.db_path + md.name;
3422 
3423       uint64_t fsize = 0;
3424       TEST_SYNC_POINT("DBImpl::CheckConsistency:BeforeGetFileSize");
3425       Status s = env_->GetFileSize(file_path, &fsize);
3426       if (!s.ok() &&
3427           env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
3428         s = Status::OK();
3429       }
3430       if (!s.ok()) {
3431         corruption_messages +=
3432             "Can't access " + md.name + ": " + s.ToString() + "\n";
3433       } else if (fsize != md.size) {
3434         corruption_messages += "Sst file size mismatch: " + file_path +
3435                                ". Size recorded in manifest " +
3436                                ToString(md.size) + ", actual size " +
3437                                ToString(fsize) + "\n";
3438       }
3439     }
3440   }
3441 
3442   if (corruption_messages.size() == 0) {
3443     return Status::OK();
3444   } else {
3445     return Status::Corruption(corruption_messages);
3446   }
3447 }
3448 
GetDbIdentity(std::string & identity) const3449 Status DBImpl::GetDbIdentity(std::string& identity) const {
3450   identity.assign(db_id_);
3451   return Status::OK();
3452 }
3453 
GetDbIdentityFromIdentityFile(std::string * identity) const3454 Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
3455   std::string idfilename = IdentityFileName(dbname_);
3456   const FileOptions soptions;
3457 
3458   Status s = ReadFileToString(fs_.get(), idfilename, identity);
3459   if (!s.ok()) {
3460     return s;
3461   }
3462 
3463   // If last character is '\n' remove it from identity
3464   if (identity->size() > 0 && identity->back() == '\n') {
3465     identity->pop_back();
3466   }
3467   return s;
3468 }
3469 
3470 // Default implementation -- returns not supported status
CreateColumnFamily(const ColumnFamilyOptions &,const std::string &,ColumnFamilyHandle **)3471 Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
3472                               const std::string& /*column_family_name*/,
3473                               ColumnFamilyHandle** /*handle*/) {
3474   return Status::NotSupported("");
3475 }
3476 
CreateColumnFamilies(const ColumnFamilyOptions &,const std::vector<std::string> &,std::vector<ColumnFamilyHandle * > *)3477 Status DB::CreateColumnFamilies(
3478     const ColumnFamilyOptions& /*cf_options*/,
3479     const std::vector<std::string>& /*column_family_names*/,
3480     std::vector<ColumnFamilyHandle*>* /*handles*/) {
3481   return Status::NotSupported("");
3482 }
3483 
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> &,std::vector<ColumnFamilyHandle * > *)3484 Status DB::CreateColumnFamilies(
3485     const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
3486     std::vector<ColumnFamilyHandle*>* /*handles*/) {
3487   return Status::NotSupported("");
3488 }
3489 
DropColumnFamily(ColumnFamilyHandle *)3490 Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
3491   return Status::NotSupported("");
3492 }
3493 
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > &)3494 Status DB::DropColumnFamilies(
3495     const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
3496   return Status::NotSupported("");
3497 }
3498 
DestroyColumnFamilyHandle(ColumnFamilyHandle * column_family)3499 Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
3500   delete column_family;
3501   return Status::OK();
3502 }
3503 
~DB()3504 DB::~DB() {}
3505 
Close()3506 Status DBImpl::Close() {
3507   if (!closed_) {
3508     {
3509       InstrumentedMutexLock l(&mutex_);
3510       // If there is unreleased snapshot, fail the close call
3511       if (!snapshots_.empty()) {
3512         return Status::Aborted("Cannot close DB with unreleased snapshot.");
3513       }
3514     }
3515 
3516     closed_ = true;
3517     return CloseImpl();
3518   }
3519   return Status::OK();
3520 }
3521 
ListColumnFamilies(const DBOptions & db_options,const std::string & name,std::vector<std::string> * column_families)3522 Status DB::ListColumnFamilies(const DBOptions& db_options,
3523                               const std::string& name,
3524                               std::vector<std::string>* column_families) {
3525   const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
3526   return VersionSet::ListColumnFamilies(column_families, name, fs.get());
3527 }
3528 
~Snapshot()3529 Snapshot::~Snapshot() {}
3530 
DestroyDB(const std::string & dbname,const Options & options,const std::vector<ColumnFamilyDescriptor> & column_families)3531 Status DestroyDB(const std::string& dbname, const Options& options,
3532                  const std::vector<ColumnFamilyDescriptor>& column_families) {
3533   ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
3534   Env* env = soptions.env;
3535   std::vector<std::string> filenames;
3536   bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
3537 
3538   // Reset the logger because it holds a handle to the
3539   // log file and prevents cleanup and directory removal
3540   soptions.info_log.reset();
3541   // Ignore error in case directory does not exist
3542   env->GetChildren(dbname, &filenames);
3543 
3544   FileLock* lock;
3545   const std::string lockname = LockFileName(dbname);
3546   Status result = env->LockFile(lockname, &lock);
3547   if (result.ok()) {
3548     uint64_t number;
3549     FileType type;
3550     InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
3551     for (const auto& fname : filenames) {
3552       if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
3553           type != kDBLockFile) {  // Lock file will be deleted at end
3554         Status del;
3555         std::string path_to_delete = dbname + "/" + fname;
3556         if (type == kMetaDatabase) {
3557           del = DestroyDB(path_to_delete, options);
3558         } else if (type == kTableFile || type == kLogFile) {
3559           del = DeleteDBFile(&soptions, path_to_delete, dbname,
3560                              /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3561         } else {
3562           del = env->DeleteFile(path_to_delete);
3563         }
3564         if (result.ok() && !del.ok()) {
3565           result = del;
3566         }
3567       }
3568     }
3569 
3570     std::set<std::string> paths;
3571     for (const DbPath& db_path : options.db_paths) {
3572       paths.insert(db_path.path);
3573     }
3574     for (const ColumnFamilyDescriptor& cf : column_families) {
3575       for (const DbPath& cf_path : cf.options.cf_paths) {
3576         paths.insert(cf_path.path);
3577       }
3578     }
3579     for (const auto& path : paths) {
3580       if (env->GetChildren(path, &filenames).ok()) {
3581         for (const auto& fname : filenames) {
3582           if (ParseFileName(fname, &number, &type) &&
3583               type == kTableFile) {  // Lock file will be deleted at end
3584             std::string table_path = path + "/" + fname;
3585             Status del = DeleteDBFile(&soptions, table_path, dbname,
3586                                       /*force_bg=*/false, /*force_fg=*/false);
3587             if (result.ok() && !del.ok()) {
3588               result = del;
3589             }
3590           }
3591         }
3592         env->DeleteDir(path);
3593       }
3594     }
3595 
3596     std::vector<std::string> walDirFiles;
3597     std::string archivedir = ArchivalDirectory(dbname);
3598     bool wal_dir_exists = false;
3599     if (dbname != soptions.wal_dir) {
3600       wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
3601       archivedir = ArchivalDirectory(soptions.wal_dir);
3602     }
3603 
3604     // Archive dir may be inside wal dir or dbname and should be
3605     // processed and removed before those otherwise we have issues
3606     // removing them
3607     std::vector<std::string> archiveFiles;
3608     if (env->GetChildren(archivedir, &archiveFiles).ok()) {
3609       // Delete archival files.
3610       for (const auto& file : archiveFiles) {
3611         if (ParseFileName(file, &number, &type) && type == kLogFile) {
3612           Status del =
3613               DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
3614                            /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3615           if (result.ok() && !del.ok()) {
3616             result = del;
3617           }
3618         }
3619       }
3620       env->DeleteDir(archivedir);
3621     }
3622 
3623     // Delete log files in the WAL dir
3624     if (wal_dir_exists) {
3625       for (const auto& file : walDirFiles) {
3626         if (ParseFileName(file, &number, &type) && type == kLogFile) {
3627           Status del =
3628               DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
3629                            soptions.wal_dir, /*force_bg=*/false,
3630                            /*force_fg=*/!wal_in_db_path);
3631           if (result.ok() && !del.ok()) {
3632             result = del;
3633           }
3634         }
3635       }
3636       env->DeleteDir(soptions.wal_dir);
3637     }
3638 
3639     env->UnlockFile(lock);  // Ignore error since state is already gone
3640     env->DeleteFile(lockname);
3641 
3642     // sst_file_manager holds a ref to the logger. Make sure the logger is
3643     // gone before trying to remove the directory.
3644     soptions.sst_file_manager.reset();
3645 
3646     env->DeleteDir(dbname);  // Ignore error in case dir contains other files
3647   }
3648   return result;
3649 }
3650 
WriteOptionsFile(bool need_mutex_lock,bool need_enter_write_thread)3651 Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
3652                                 bool need_enter_write_thread) {
3653 #ifndef ROCKSDB_LITE
3654   WriteThread::Writer w;
3655   if (need_mutex_lock) {
3656     mutex_.Lock();
3657   } else {
3658     mutex_.AssertHeld();
3659   }
3660   if (need_enter_write_thread) {
3661     write_thread_.EnterUnbatched(&w, &mutex_);
3662   }
3663 
3664   std::vector<std::string> cf_names;
3665   std::vector<ColumnFamilyOptions> cf_opts;
3666 
3667   // This part requires mutex to protect the column family options
3668   for (auto cfd : *versions_->GetColumnFamilySet()) {
3669     if (cfd->IsDropped()) {
3670       continue;
3671     }
3672     cf_names.push_back(cfd->GetName());
3673     cf_opts.push_back(cfd->GetLatestCFOptions());
3674   }
3675 
3676   // Unlock during expensive operations.  New writes cannot get here
3677   // because the single write thread ensures all new writes get queued.
3678   DBOptions db_options =
3679       BuildDBOptions(immutable_db_options_, mutable_db_options_);
3680   mutex_.Unlock();
3681 
3682   TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
3683   TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
3684 
3685   std::string file_name =
3686       TempOptionsFileName(GetName(), versions_->NewFileNumber());
3687   Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
3688                                    GetFileSystem());
3689 
3690   if (s.ok()) {
3691     s = RenameTempFileToOptionsFile(file_name);
3692   }
3693   // restore lock
3694   if (!need_mutex_lock) {
3695     mutex_.Lock();
3696   }
3697   if (need_enter_write_thread) {
3698     write_thread_.ExitUnbatched(&w);
3699   }
3700   if (!s.ok()) {
3701     ROCKS_LOG_WARN(immutable_db_options_.info_log,
3702                    "Unnable to persist options -- %s", s.ToString().c_str());
3703     if (immutable_db_options_.fail_if_options_file_error) {
3704       return Status::IOError("Unable to persist options.",
3705                              s.ToString().c_str());
3706     }
3707   }
3708 #else
3709   (void)need_mutex_lock;
3710   (void)need_enter_write_thread;
3711 #endif  // !ROCKSDB_LITE
3712   return Status::OK();
3713 }
3714 
3715 #ifndef ROCKSDB_LITE
3716 namespace {
DeleteOptionsFilesHelper(const std::map<uint64_t,std::string> & filenames,const size_t num_files_to_keep,const std::shared_ptr<Logger> & info_log,Env * env)3717 void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
3718                               const size_t num_files_to_keep,
3719                               const std::shared_ptr<Logger>& info_log,
3720                               Env* env) {
3721   if (filenames.size() <= num_files_to_keep) {
3722     return;
3723   }
3724   for (auto iter = std::next(filenames.begin(), num_files_to_keep);
3725        iter != filenames.end(); ++iter) {
3726     if (!env->DeleteFile(iter->second).ok()) {
3727       ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
3728                      iter->second.c_str());
3729     }
3730   }
3731 }
3732 }  // namespace
3733 #endif  // !ROCKSDB_LITE
3734 
DeleteObsoleteOptionsFiles()3735 Status DBImpl::DeleteObsoleteOptionsFiles() {
3736 #ifndef ROCKSDB_LITE
3737   std::vector<std::string> filenames;
3738   // use ordered map to store keep the filenames sorted from the newest
3739   // to the oldest.
3740   std::map<uint64_t, std::string> options_filenames;
3741   Status s;
3742   s = GetEnv()->GetChildren(GetName(), &filenames);
3743   if (!s.ok()) {
3744     return s;
3745   }
3746   for (auto& filename : filenames) {
3747     uint64_t file_number;
3748     FileType type;
3749     if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
3750       options_filenames.insert(
3751           {std::numeric_limits<uint64_t>::max() - file_number,
3752            GetName() + "/" + filename});
3753     }
3754   }
3755 
3756   // Keeps the latest 2 Options file
3757   const size_t kNumOptionsFilesKept = 2;
3758   DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
3759                            immutable_db_options_.info_log, GetEnv());
3760   return Status::OK();
3761 #else
3762   return Status::OK();
3763 #endif  // !ROCKSDB_LITE
3764 }
3765 
RenameTempFileToOptionsFile(const std::string & file_name)3766 Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
3767 #ifndef ROCKSDB_LITE
3768   Status s;
3769 
3770   uint64_t options_file_number = versions_->NewFileNumber();
3771   std::string options_file_name =
3772       OptionsFileName(GetName(), options_file_number);
3773   // Retry if the file name happen to conflict with an existing one.
3774   s = GetEnv()->RenameFile(file_name, options_file_name);
3775   if (s.ok()) {
3776     InstrumentedMutexLock l(&mutex_);
3777     versions_->options_file_number_ = options_file_number;
3778   }
3779 
3780   if (0 == disable_delete_obsolete_files_) {
3781     DeleteObsoleteOptionsFiles();
3782   }
3783   return s;
3784 #else
3785   (void)file_name;
3786   return Status::OK();
3787 #endif  // !ROCKSDB_LITE
3788 }
3789 
3790 #ifdef ROCKSDB_USING_THREAD_STATUS
3791 
NewThreadStatusCfInfo(ColumnFamilyData * cfd) const3792 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
3793   if (immutable_db_options_.enable_thread_tracking) {
3794     ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
3795                                           cfd->ioptions()->env);
3796   }
3797 }
3798 
EraseThreadStatusCfInfo(ColumnFamilyData * cfd) const3799 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
3800   if (immutable_db_options_.enable_thread_tracking) {
3801     ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
3802   }
3803 }
3804 
EraseThreadStatusDbInfo() const3805 void DBImpl::EraseThreadStatusDbInfo() const {
3806   if (immutable_db_options_.enable_thread_tracking) {
3807     ThreadStatusUtil::EraseDatabaseInfo(this);
3808   }
3809 }
3810 
3811 #else
NewThreadStatusCfInfo(ColumnFamilyData *) const3812 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
3813 
EraseThreadStatusCfInfo(ColumnFamilyData *) const3814 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
3815 
EraseThreadStatusDbInfo() const3816 void DBImpl::EraseThreadStatusDbInfo() const {}
3817 #endif  // ROCKSDB_USING_THREAD_STATUS
3818 
3819 //
3820 // A global method that can dump out the build version
DumpRocksDBBuildVersion(Logger * log)3821 void DumpRocksDBBuildVersion(Logger* log) {
3822 #if !defined(IOS_CROSS_COMPILE)
3823   // if we compile with Xcode, we don't run build_detect_version, so we don't
3824   // generate util/build_version.cc
3825   ROCKS_LOG_HEADER(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR,
3826                    ROCKSDB_MINOR, ROCKSDB_PATCH);
3827   ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha);
3828   ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date);
3829 #else
3830   (void)log;  // ignore "-Wunused-parameter"
3831 #endif
3832 }
3833 
3834 #ifndef ROCKSDB_LITE
GetEarliestMemTableSequenceNumber(SuperVersion * sv,bool include_history)3835 SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
3836                                                          bool include_history) {
3837   // Find the earliest sequence number that we know we can rely on reading
3838   // from the memtable without needing to check sst files.
3839   SequenceNumber earliest_seq =
3840       sv->imm->GetEarliestSequenceNumber(include_history);
3841   if (earliest_seq == kMaxSequenceNumber) {
3842     earliest_seq = sv->mem->GetEarliestSequenceNumber();
3843   }
3844   assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
3845 
3846   return earliest_seq;
3847 }
3848 #endif  // ROCKSDB_LITE
3849 
3850 #ifndef ROCKSDB_LITE
GetLatestSequenceForKey(SuperVersion * sv,const Slice & key,bool cache_only,SequenceNumber lower_bound_seq,SequenceNumber * seq,bool * found_record_for_key,bool * is_blob_index)3851 Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
3852                                        bool cache_only,
3853                                        SequenceNumber lower_bound_seq,
3854                                        SequenceNumber* seq,
3855                                        bool* found_record_for_key,
3856                                        bool* is_blob_index) {
3857   Status s;
3858   MergeContext merge_context;
3859   SequenceNumber max_covering_tombstone_seq = 0;
3860 
3861   ReadOptions read_options;
3862   SequenceNumber current_seq = versions_->LastSequence();
3863   LookupKey lkey(key, current_seq);
3864 
3865   *seq = kMaxSequenceNumber;
3866   *found_record_for_key = false;
3867 
3868   // Check if there is a record for this key in the latest memtable
3869   sv->mem->Get(lkey, nullptr, nullptr, &s, &merge_context,
3870                &max_covering_tombstone_seq, seq, read_options,
3871                nullptr /*read_callback*/, is_blob_index);
3872 
3873   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3874     // unexpected error reading memtable.
3875     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3876                     "Unexpected status returned from MemTable::Get: %s\n",
3877                     s.ToString().c_str());
3878 
3879     return s;
3880   }
3881 
3882   if (*seq != kMaxSequenceNumber) {
3883     // Found a sequence number, no need to check immutable memtables
3884     *found_record_for_key = true;
3885     return Status::OK();
3886   }
3887 
3888   SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
3889   if (lower_bound_in_mem != kMaxSequenceNumber &&
3890       lower_bound_in_mem < lower_bound_seq) {
3891     *found_record_for_key = false;
3892     return Status::OK();
3893   }
3894 
3895   // Check if there is a record for this key in the immutable memtables
3896   sv->imm->Get(lkey, nullptr, nullptr, &s, &merge_context,
3897                &max_covering_tombstone_seq, seq, read_options,
3898                nullptr /*read_callback*/, is_blob_index);
3899 
3900   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3901     // unexpected error reading memtable.
3902     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3903                     "Unexpected status returned from MemTableList::Get: %s\n",
3904                     s.ToString().c_str());
3905 
3906     return s;
3907   }
3908 
3909   if (*seq != kMaxSequenceNumber) {
3910     // Found a sequence number, no need to check memtable history
3911     *found_record_for_key = true;
3912     return Status::OK();
3913   }
3914 
3915   SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
3916   if (lower_bound_in_imm != kMaxSequenceNumber &&
3917       lower_bound_in_imm < lower_bound_seq) {
3918     *found_record_for_key = false;
3919     return Status::OK();
3920   }
3921 
3922   // Check if there is a record for this key in the immutable memtables
3923   sv->imm->GetFromHistory(lkey, nullptr, nullptr, &s, &merge_context,
3924                           &max_covering_tombstone_seq, seq, read_options,
3925                           is_blob_index);
3926 
3927   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3928     // unexpected error reading memtable.
3929     ROCKS_LOG_ERROR(
3930         immutable_db_options_.info_log,
3931         "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
3932         s.ToString().c_str());
3933 
3934     return s;
3935   }
3936 
3937   if (*seq != kMaxSequenceNumber) {
3938     // Found a sequence number, no need to check SST files
3939     *found_record_for_key = true;
3940     return Status::OK();
3941   }
3942 
3943   // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
3944   // check here to skip the history if possible. But currently the caller
3945   // already does that. Maybe we should move the logic here later.
3946 
3947   // TODO(agiardullo): possible optimization: consider checking cached
3948   // SST files if cache_only=true?
3949   if (!cache_only) {
3950     // Check tables
3951     sv->current->Get(read_options, lkey, nullptr, nullptr, &s, &merge_context,
3952                      &max_covering_tombstone_seq, nullptr /* value_found */,
3953                      found_record_for_key, seq, nullptr /*read_callback*/,
3954                      is_blob_index);
3955 
3956     if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3957       // unexpected error reading SST files
3958       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3959                       "Unexpected status returned from Version::Get: %s\n",
3960                       s.ToString().c_str());
3961     }
3962   }
3963 
3964   return s;
3965 }
3966 
IngestExternalFile(ColumnFamilyHandle * column_family,const std::vector<std::string> & external_files,const IngestExternalFileOptions & ingestion_options)3967 Status DBImpl::IngestExternalFile(
3968     ColumnFamilyHandle* column_family,
3969     const std::vector<std::string>& external_files,
3970     const IngestExternalFileOptions& ingestion_options) {
3971   IngestExternalFileArg arg;
3972   arg.column_family = column_family;
3973   arg.external_files = external_files;
3974   arg.options = ingestion_options;
3975   return IngestExternalFiles({arg});
3976 }
3977 
IngestExternalFiles(const std::vector<IngestExternalFileArg> & args)3978 Status DBImpl::IngestExternalFiles(
3979     const std::vector<IngestExternalFileArg>& args) {
3980   if (args.empty()) {
3981     return Status::InvalidArgument("ingestion arg list is empty");
3982   }
3983   {
3984     std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
3985     for (const auto& arg : args) {
3986       if (arg.column_family == nullptr) {
3987         return Status::InvalidArgument("column family handle is null");
3988       } else if (unique_cfhs.count(arg.column_family) > 0) {
3989         return Status::InvalidArgument(
3990             "ingestion args have duplicate column families");
3991       }
3992       unique_cfhs.insert(arg.column_family);
3993     }
3994   }
3995   // Ingest multiple external SST files atomically.
3996   size_t num_cfs = args.size();
3997   for (size_t i = 0; i != num_cfs; ++i) {
3998     if (args[i].external_files.empty()) {
3999       char err_msg[128] = {0};
4000       snprintf(err_msg, 128, "external_files[%zu] is empty", i);
4001       return Status::InvalidArgument(err_msg);
4002     }
4003   }
4004   for (const auto& arg : args) {
4005     const IngestExternalFileOptions& ingest_opts = arg.options;
4006     if (ingest_opts.ingest_behind &&
4007         !immutable_db_options_.allow_ingest_behind) {
4008       return Status::InvalidArgument(
4009           "can't ingest_behind file in DB with allow_ingest_behind=false");
4010     }
4011   }
4012 
4013   // TODO (yanqin) maybe handle the case in which column_families have
4014   // duplicates
4015   std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4016   size_t total = 0;
4017   for (const auto& arg : args) {
4018     total += arg.external_files.size();
4019   }
4020   uint64_t next_file_number = 0;
4021   Status status = ReserveFileNumbersBeforeIngestion(
4022       static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
4023       pending_output_elem, &next_file_number);
4024   if (!status.ok()) {
4025     InstrumentedMutexLock l(&mutex_);
4026     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4027     return status;
4028   }
4029 
4030   std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
4031   for (const auto& arg : args) {
4032     auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
4033     ingestion_jobs.emplace_back(
4034         env_, versions_.get(), cfd, immutable_db_options_, file_options_,
4035         &snapshots_, arg.options, &directories_, &event_logger_);
4036   }
4037   std::vector<std::pair<bool, Status>> exec_results;
4038   for (size_t i = 0; i != num_cfs; ++i) {
4039     exec_results.emplace_back(false, Status::OK());
4040   }
4041   // TODO(yanqin) maybe make jobs run in parallel
4042   uint64_t start_file_number = next_file_number;
4043   for (size_t i = 1; i != num_cfs; ++i) {
4044     start_file_number += args[i - 1].external_files.size();
4045     auto* cfd =
4046         static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4047     SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
4048     exec_results[i].second = ingestion_jobs[i].Prepare(
4049         args[i].external_files, start_file_number, super_version);
4050     exec_results[i].first = true;
4051     CleanupSuperVersion(super_version);
4052   }
4053   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
4054   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
4055   {
4056     auto* cfd =
4057         static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
4058     SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
4059     exec_results[0].second = ingestion_jobs[0].Prepare(
4060         args[0].external_files, next_file_number, super_version);
4061     exec_results[0].first = true;
4062     CleanupSuperVersion(super_version);
4063   }
4064   for (const auto& exec_result : exec_results) {
4065     if (!exec_result.second.ok()) {
4066       status = exec_result.second;
4067       break;
4068     }
4069   }
4070   if (!status.ok()) {
4071     for (size_t i = 0; i != num_cfs; ++i) {
4072       if (exec_results[i].first) {
4073         ingestion_jobs[i].Cleanup(status);
4074       }
4075     }
4076     InstrumentedMutexLock l(&mutex_);
4077     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4078     return status;
4079   }
4080 
4081   std::vector<SuperVersionContext> sv_ctxs;
4082   for (size_t i = 0; i != num_cfs; ++i) {
4083     sv_ctxs.emplace_back(true /* create_superversion */);
4084   }
4085   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
4086   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
4087   TEST_SYNC_POINT("DBImpl::AddFile:Start");
4088   {
4089     InstrumentedMutexLock l(&mutex_);
4090     TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
4091 
4092     // Stop writes to the DB by entering both write threads
4093     WriteThread::Writer w;
4094     write_thread_.EnterUnbatched(&w, &mutex_);
4095     WriteThread::Writer nonmem_w;
4096     if (two_write_queues_) {
4097       nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4098     }
4099 
4100     // When unordered_write is enabled, the keys are writing to memtable in an
4101     // unordered way. If the ingestion job checks memtable key range before the
4102     // key landing in memtable, the ingestion job may skip the necessary
4103     // memtable flush.
4104     // So wait here to ensure there is no pending write to memtable.
4105     WaitForPendingWrites();
4106 
4107     num_running_ingest_file_ += static_cast<int>(num_cfs);
4108     TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
4109 
4110     bool at_least_one_cf_need_flush = false;
4111     std::vector<bool> need_flush(num_cfs, false);
4112     for (size_t i = 0; i != num_cfs; ++i) {
4113       auto* cfd =
4114           static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4115       if (cfd->IsDropped()) {
4116         // TODO (yanqin) investigate whether we should abort ingestion or
4117         // proceed with other non-dropped column families.
4118         status = Status::InvalidArgument(
4119             "cannot ingest an external file into a dropped CF");
4120         break;
4121       }
4122       bool tmp = false;
4123       status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
4124       need_flush[i] = tmp;
4125       at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
4126       if (!status.ok()) {
4127         break;
4128       }
4129     }
4130     TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
4131                              &at_least_one_cf_need_flush);
4132 
4133     if (status.ok() && at_least_one_cf_need_flush) {
4134       FlushOptions flush_opts;
4135       flush_opts.allow_write_stall = true;
4136       if (immutable_db_options_.atomic_flush) {
4137         autovector<ColumnFamilyData*> cfds_to_flush;
4138         SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
4139         mutex_.Unlock();
4140         status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
4141                                       FlushReason::kExternalFileIngestion,
4142                                       true /* writes_stopped */);
4143         mutex_.Lock();
4144       } else {
4145         for (size_t i = 0; i != num_cfs; ++i) {
4146           if (need_flush[i]) {
4147             mutex_.Unlock();
4148             auto* cfd =
4149                 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
4150                     ->cfd();
4151             status = FlushMemTable(cfd, flush_opts,
4152                                    FlushReason::kExternalFileIngestion,
4153                                    true /* writes_stopped */);
4154             mutex_.Lock();
4155             if (!status.ok()) {
4156               break;
4157             }
4158           }
4159         }
4160       }
4161     }
4162     // Run ingestion jobs.
4163     if (status.ok()) {
4164       for (size_t i = 0; i != num_cfs; ++i) {
4165         status = ingestion_jobs[i].Run();
4166         if (!status.ok()) {
4167           break;
4168         }
4169       }
4170     }
4171     if (status.ok()) {
4172       int consumed_seqno_count =
4173           ingestion_jobs[0].ConsumedSequenceNumbersCount();
4174 #ifndef NDEBUG
4175       for (size_t i = 1; i != num_cfs; ++i) {
4176         assert(!!consumed_seqno_count ==
4177                !!ingestion_jobs[i].ConsumedSequenceNumbersCount());
4178         consumed_seqno_count +=
4179             ingestion_jobs[i].ConsumedSequenceNumbersCount();
4180       }
4181 #endif
4182       if (consumed_seqno_count > 0) {
4183         const SequenceNumber last_seqno = versions_->LastSequence();
4184         versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
4185         versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
4186         versions_->SetLastSequence(last_seqno + consumed_seqno_count);
4187       }
4188       autovector<ColumnFamilyData*> cfds_to_commit;
4189       autovector<const MutableCFOptions*> mutable_cf_options_list;
4190       autovector<autovector<VersionEdit*>> edit_lists;
4191       uint32_t num_entries = 0;
4192       for (size_t i = 0; i != num_cfs; ++i) {
4193         auto* cfd =
4194             static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4195         if (cfd->IsDropped()) {
4196           continue;
4197         }
4198         cfds_to_commit.push_back(cfd);
4199         mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
4200         autovector<VersionEdit*> edit_list;
4201         edit_list.push_back(ingestion_jobs[i].edit());
4202         edit_lists.push_back(edit_list);
4203         ++num_entries;
4204       }
4205       // Mark the version edits as an atomic group if the number of version
4206       // edits exceeds 1.
4207       if (cfds_to_commit.size() > 1) {
4208         for (auto& edits : edit_lists) {
4209           assert(edits.size() == 1);
4210           edits[0]->MarkAtomicGroup(--num_entries);
4211         }
4212         assert(0 == num_entries);
4213       }
4214       status =
4215           versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
4216                                  edit_lists, &mutex_, directories_.GetDbDir());
4217     }
4218 
4219     if (status.ok()) {
4220       for (size_t i = 0; i != num_cfs; ++i) {
4221         auto* cfd =
4222             static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4223         if (!cfd->IsDropped()) {
4224           InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
4225                                              *cfd->GetLatestMutableCFOptions());
4226 #ifndef NDEBUG
4227           if (0 == i && num_cfs > 1) {
4228             TEST_SYNC_POINT(
4229                 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
4230             TEST_SYNC_POINT(
4231                 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
4232           }
4233 #endif  // !NDEBUG
4234         }
4235       }
4236     }
4237 
4238     // Resume writes to the DB
4239     if (two_write_queues_) {
4240       nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4241     }
4242     write_thread_.ExitUnbatched(&w);
4243 
4244     if (status.ok()) {
4245       for (auto& job : ingestion_jobs) {
4246         job.UpdateStats();
4247       }
4248     }
4249     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4250     num_running_ingest_file_ -= static_cast<int>(num_cfs);
4251     if (0 == num_running_ingest_file_) {
4252       bg_cv_.SignalAll();
4253     }
4254     TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
4255   }
4256   // mutex_ is unlocked here
4257 
4258   // Cleanup
4259   for (size_t i = 0; i != num_cfs; ++i) {
4260     sv_ctxs[i].Clean();
4261     // This may rollback jobs that have completed successfully. This is
4262     // intended for atomicity.
4263     ingestion_jobs[i].Cleanup(status);
4264   }
4265   if (status.ok()) {
4266     for (size_t i = 0; i != num_cfs; ++i) {
4267       auto* cfd =
4268           static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4269       if (!cfd->IsDropped()) {
4270         NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
4271       }
4272     }
4273   }
4274   return status;
4275 }
4276 
CreateColumnFamilyWithImport(const ColumnFamilyOptions & options,const std::string & column_family_name,const ImportColumnFamilyOptions & import_options,const ExportImportFilesMetaData & metadata,ColumnFamilyHandle ** handle)4277 Status DBImpl::CreateColumnFamilyWithImport(
4278     const ColumnFamilyOptions& options, const std::string& column_family_name,
4279     const ImportColumnFamilyOptions& import_options,
4280     const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
4281   assert(handle != nullptr);
4282   assert(*handle == nullptr);
4283   std::string cf_comparator_name = options.comparator->Name();
4284   if (cf_comparator_name != metadata.db_comparator_name) {
4285     return Status::InvalidArgument("Comparator name mismatch");
4286   }
4287 
4288   // Create column family.
4289   auto status = CreateColumnFamily(options, column_family_name, handle);
4290   if (!status.ok()) {
4291     return status;
4292   }
4293 
4294   // Import sst files from metadata.
4295   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(*handle);
4296   auto cfd = cfh->cfd();
4297   ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
4298                                    immutable_db_options_, file_options_,
4299                                    import_options, metadata.files);
4300 
4301   SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
4302   VersionEdit dummy_edit;
4303   uint64_t next_file_number = 0;
4304   std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4305   {
4306     // Lock db mutex
4307     InstrumentedMutexLock l(&mutex_);
4308     if (error_handler_.IsDBStopped()) {
4309       // Don't import files when there is a bg_error
4310       status = error_handler_.GetBGError();
4311     }
4312 
4313     // Make sure that bg cleanup wont delete the files that we are importing
4314     pending_output_elem.reset(new std::list<uint64_t>::iterator(
4315         CaptureCurrentFileNumberInPendingOutputs()));
4316 
4317     if (status.ok()) {
4318       // If crash happen after a hard link established, Recover function may
4319       // reuse the file number that has already assigned to the internal file,
4320       // and this will overwrite the external file. To protect the external
4321       // file, we have to make sure the file number will never being reused.
4322       next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
4323       auto cf_options = cfd->GetLatestMutableCFOptions();
4324       status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4325                                       directories_.GetDbDir());
4326       if (status.ok()) {
4327         InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4328       }
4329     }
4330   }
4331   dummy_sv_ctx.Clean();
4332 
4333   if (status.ok()) {
4334     SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
4335     status = import_job.Prepare(next_file_number, sv);
4336     CleanupSuperVersion(sv);
4337   }
4338 
4339   if (status.ok()) {
4340     SuperVersionContext sv_context(true /*create_superversion*/);
4341     {
4342       // Lock db mutex
4343       InstrumentedMutexLock l(&mutex_);
4344 
4345       // Stop writes to the DB by entering both write threads
4346       WriteThread::Writer w;
4347       write_thread_.EnterUnbatched(&w, &mutex_);
4348       WriteThread::Writer nonmem_w;
4349       if (two_write_queues_) {
4350         nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4351       }
4352 
4353       num_running_ingest_file_++;
4354       assert(!cfd->IsDropped());
4355       status = import_job.Run();
4356 
4357       // Install job edit [Mutex will be unlocked here]
4358       if (status.ok()) {
4359         auto cf_options = cfd->GetLatestMutableCFOptions();
4360         status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
4361                                         &mutex_, directories_.GetDbDir());
4362         if (status.ok()) {
4363           InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
4364         }
4365       }
4366 
4367       // Resume writes to the DB
4368       if (two_write_queues_) {
4369         nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4370       }
4371       write_thread_.ExitUnbatched(&w);
4372 
4373       num_running_ingest_file_--;
4374       if (num_running_ingest_file_ == 0) {
4375         bg_cv_.SignalAll();
4376       }
4377     }
4378     // mutex_ is unlocked here
4379 
4380     sv_context.Clean();
4381   }
4382 
4383   {
4384     InstrumentedMutexLock l(&mutex_);
4385     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4386   }
4387 
4388   import_job.Cleanup(status);
4389   if (!status.ok()) {
4390     DropColumnFamily(*handle);
4391     DestroyColumnFamilyHandle(*handle);
4392     *handle = nullptr;
4393   }
4394   return status;
4395 }
4396 
VerifyChecksum(const ReadOptions & read_options)4397 Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
4398   Status s;
4399   std::vector<ColumnFamilyData*> cfd_list;
4400   {
4401     InstrumentedMutexLock l(&mutex_);
4402     for (auto cfd : *versions_->GetColumnFamilySet()) {
4403       if (!cfd->IsDropped() && cfd->initialized()) {
4404         cfd->Ref();
4405         cfd_list.push_back(cfd);
4406       }
4407     }
4408   }
4409   std::vector<SuperVersion*> sv_list;
4410   for (auto cfd : cfd_list) {
4411     sv_list.push_back(cfd->GetReferencedSuperVersion(this));
4412   }
4413   for (auto& sv : sv_list) {
4414     VersionStorageInfo* vstorage = sv->current->storage_info();
4415     ColumnFamilyData* cfd = sv->current->cfd();
4416     Options opts;
4417     {
4418       InstrumentedMutexLock l(&mutex_);
4419       opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
4420                      cfd->GetLatestCFOptions());
4421     }
4422     for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
4423       for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
4424            j++) {
4425         const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd;
4426         std::string fname = TableFileName(cfd->ioptions()->cf_paths,
4427                                           fd.GetNumber(), fd.GetPathId());
4428         s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(opts, file_options_,
4429                                                      read_options, fname);
4430       }
4431     }
4432     if (!s.ok()) {
4433       break;
4434     }
4435   }
4436   bool defer_purge =
4437           immutable_db_options().avoid_unnecessary_blocking_io;
4438   {
4439     InstrumentedMutexLock l(&mutex_);
4440     for (auto sv : sv_list) {
4441       if (sv && sv->Unref()) {
4442         sv->Cleanup();
4443         if (defer_purge) {
4444           AddSuperVersionsToFreeQueue(sv);
4445         } else {
4446           delete sv;
4447         }
4448       }
4449     }
4450     if (defer_purge) {
4451       SchedulePurge();
4452     }
4453     for (auto cfd : cfd_list) {
4454       cfd->UnrefAndTryDelete();
4455     }
4456   }
4457   return s;
4458 }
4459 
NotifyOnExternalFileIngested(ColumnFamilyData * cfd,const ExternalSstFileIngestionJob & ingestion_job)4460 void DBImpl::NotifyOnExternalFileIngested(
4461     ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
4462   if (immutable_db_options_.listeners.empty()) {
4463     return;
4464   }
4465 
4466   for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
4467     ExternalFileIngestionInfo info;
4468     info.cf_name = cfd->GetName();
4469     info.external_file_path = f.external_file_path;
4470     info.internal_file_path = f.internal_file_path;
4471     info.global_seqno = f.assigned_seqno;
4472     info.table_properties = f.table_properties;
4473     for (auto listener : immutable_db_options_.listeners) {
4474       listener->OnExternalFileIngested(this, info);
4475     }
4476   }
4477 }
4478 
WaitForIngestFile()4479 void DBImpl::WaitForIngestFile() {
4480   mutex_.AssertHeld();
4481   while (num_running_ingest_file_ > 0) {
4482     bg_cv_.Wait();
4483   }
4484 }
4485 
StartTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4486 Status DBImpl::StartTrace(const TraceOptions& trace_options,
4487                           std::unique_ptr<TraceWriter>&& trace_writer) {
4488   InstrumentedMutexLock lock(&trace_mutex_);
4489   tracer_.reset(new Tracer(env_, trace_options, std::move(trace_writer)));
4490   return Status::OK();
4491 }
4492 
EndTrace()4493 Status DBImpl::EndTrace() {
4494   InstrumentedMutexLock lock(&trace_mutex_);
4495   Status s;
4496   if (tracer_ != nullptr) {
4497     s = tracer_->Close();
4498     tracer_.reset();
4499   } else {
4500     return Status::IOError("No trace file to close");
4501   }
4502   return s;
4503 }
4504 
StartBlockCacheTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4505 Status DBImpl::StartBlockCacheTrace(
4506     const TraceOptions& trace_options,
4507     std::unique_ptr<TraceWriter>&& trace_writer) {
4508   return block_cache_tracer_.StartTrace(env_, trace_options,
4509                                         std::move(trace_writer));
4510 }
4511 
EndBlockCacheTrace()4512 Status DBImpl::EndBlockCacheTrace() {
4513   block_cache_tracer_.EndTrace();
4514   return Status::OK();
4515 }
4516 
TraceIteratorSeek(const uint32_t & cf_id,const Slice & key)4517 Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
4518   Status s;
4519   if (tracer_) {
4520     InstrumentedMutexLock lock(&trace_mutex_);
4521     if (tracer_) {
4522       s = tracer_->IteratorSeek(cf_id, key);
4523     }
4524   }
4525   return s;
4526 }
4527 
TraceIteratorSeekForPrev(const uint32_t & cf_id,const Slice & key)4528 Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
4529                                         const Slice& key) {
4530   Status s;
4531   if (tracer_) {
4532     InstrumentedMutexLock lock(&trace_mutex_);
4533     if (tracer_) {
4534       s = tracer_->IteratorSeekForPrev(cf_id, key);
4535     }
4536   }
4537   return s;
4538 }
4539 
ReserveFileNumbersBeforeIngestion(ColumnFamilyData * cfd,uint64_t num,std::unique_ptr<std::list<uint64_t>::iterator> & pending_output_elem,uint64_t * next_file_number)4540 Status DBImpl::ReserveFileNumbersBeforeIngestion(
4541     ColumnFamilyData* cfd, uint64_t num,
4542     std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
4543     uint64_t* next_file_number) {
4544   Status s;
4545   SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
4546   assert(nullptr != next_file_number);
4547   InstrumentedMutexLock l(&mutex_);
4548   if (error_handler_.IsDBStopped()) {
4549     // Do not ingest files when there is a bg_error
4550     return error_handler_.GetBGError();
4551   }
4552   pending_output_elem.reset(new std::list<uint64_t>::iterator(
4553       CaptureCurrentFileNumberInPendingOutputs()));
4554   *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
4555   auto cf_options = cfd->GetLatestMutableCFOptions();
4556   VersionEdit dummy_edit;
4557   // If crash happen after a hard link established, Recover function may
4558   // reuse the file number that has already assigned to the internal file,
4559   // and this will overwrite the external file. To protect the external
4560   // file, we have to make sure the file number will never being reused.
4561   s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4562                              directories_.GetDbDir());
4563   if (s.ok()) {
4564     InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4565   }
4566   dummy_sv_ctx.Clean();
4567   return s;
4568 }
4569 
GetCreationTimeOfOldestFile(uint64_t * creation_time)4570 Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
4571   if (mutable_db_options_.max_open_files == -1) {
4572     uint64_t oldest_time = port::kMaxUint64;
4573     for (auto cfd : *versions_->GetColumnFamilySet()) {
4574       if (!cfd->IsDropped()) {
4575         uint64_t ctime;
4576         {
4577           SuperVersion* sv = GetAndRefSuperVersion(cfd);
4578           Version* version = sv->current;
4579           version->GetCreationTimeOfOldestFile(&ctime);
4580           ReturnAndCleanupSuperVersion(cfd, sv);
4581         }
4582 
4583         if (ctime < oldest_time) {
4584           oldest_time = ctime;
4585         }
4586         if (oldest_time == 0) {
4587           break;
4588         }
4589       }
4590     }
4591     *creation_time = oldest_time;
4592     return Status::OK();
4593   } else {
4594     return Status::NotSupported("This API only works if max_open_files = -1");
4595   }
4596 }
4597 #endif  // ROCKSDB_LITE
4598 
4599 }  // namespace ROCKSDB_NAMESPACE
4600