1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/db_impl/db_impl_readonly.h"
7 #include "db/arena_wrapped_db_iter.h"
8
9 #include "db/compacted_db_impl.h"
10 #include "db/db_impl/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/merge_context.h"
13 #include "monitoring/perf_context_imp.h"
14
15 namespace ROCKSDB_NAMESPACE {
16
17 #ifndef ROCKSDB_LITE
18
DBImplReadOnly(const DBOptions & db_options,const std::string & dbname)19 DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
20 const std::string& dbname)
21 : DBImpl(db_options, dbname) {
22 ROCKS_LOG_INFO(immutable_db_options_.info_log,
23 "Opening the db in read only mode");
24 LogFlush(immutable_db_options_.info_log);
25 }
26
~DBImplReadOnly()27 DBImplReadOnly::~DBImplReadOnly() {}
28
29 // Implementations of the DB interface
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)30 Status DBImplReadOnly::Get(const ReadOptions& read_options,
31 ColumnFamilyHandle* column_family, const Slice& key,
32 PinnableSlice* pinnable_val) {
33 assert(pinnable_val != nullptr);
34 // TODO: stopwatch DB_GET needed?, perf timer needed?
35 PERF_TIMER_GUARD(get_snapshot_time);
36 Status s;
37 SequenceNumber snapshot = versions_->LastSequence();
38 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
39 auto cfd = cfh->cfd();
40 if (tracer_) {
41 InstrumentedMutexLock lock(&trace_mutex_);
42 if (tracer_) {
43 tracer_->Get(column_family, key);
44 }
45 }
46 SuperVersion* super_version = cfd->GetSuperVersion();
47 MergeContext merge_context;
48 SequenceNumber max_covering_tombstone_seq = 0;
49 LookupKey lkey(key, snapshot);
50 PERF_TIMER_STOP(get_snapshot_time);
51 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
52 /*timestamp=*/nullptr, &s, &merge_context,
53 &max_covering_tombstone_seq, read_options)) {
54 pinnable_val->PinSelf();
55 RecordTick(stats_, MEMTABLE_HIT);
56 } else {
57 PERF_TIMER_GUARD(get_from_output_files_time);
58 super_version->current->Get(read_options, lkey, pinnable_val,
59 /*timestamp=*/nullptr, &s, &merge_context,
60 &max_covering_tombstone_seq);
61 RecordTick(stats_, MEMTABLE_MISS);
62 }
63 RecordTick(stats_, NUMBER_KEYS_READ);
64 size_t size = pinnable_val->size();
65 RecordTick(stats_, BYTES_READ, size);
66 RecordInHistogram(stats_, BYTES_PER_READ, size);
67 PERF_COUNTER_ADD(get_read_bytes, size);
68 return s;
69 }
70
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)71 Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
72 ColumnFamilyHandle* column_family) {
73 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
74 auto cfd = cfh->cfd();
75 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
76 SequenceNumber latest_snapshot = versions_->LastSequence();
77 SequenceNumber read_seq =
78 read_options.snapshot != nullptr
79 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
80 ->number_
81 : latest_snapshot;
82 ReadCallback* read_callback = nullptr; // No read callback provided.
83 auto db_iter = NewArenaWrappedDbIterator(
84 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
85 read_seq,
86 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
87 super_version->version_number, read_callback);
88 auto internal_iter =
89 NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
90 db_iter->GetRangeDelAggregator(), read_seq);
91 db_iter->SetIterUnderDBIter(internal_iter);
92 return db_iter;
93 }
94
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)95 Status DBImplReadOnly::NewIterators(
96 const ReadOptions& read_options,
97 const std::vector<ColumnFamilyHandle*>& column_families,
98 std::vector<Iterator*>* iterators) {
99 ReadCallback* read_callback = nullptr; // No read callback provided.
100 if (iterators == nullptr) {
101 return Status::InvalidArgument("iterators not allowed to be nullptr");
102 }
103 iterators->clear();
104 iterators->reserve(column_families.size());
105 SequenceNumber latest_snapshot = versions_->LastSequence();
106 SequenceNumber read_seq =
107 read_options.snapshot != nullptr
108 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
109 ->number_
110 : latest_snapshot;
111
112 for (auto cfh : column_families) {
113 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
114 auto* sv = cfd->GetSuperVersion()->Ref();
115 auto* db_iter = NewArenaWrappedDbIterator(
116 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
117 sv->mutable_cf_options.max_sequential_skip_in_iterations,
118 sv->version_number, read_callback);
119 auto* internal_iter =
120 NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
121 db_iter->GetRangeDelAggregator(), read_seq);
122 db_iter->SetIterUnderDBIter(internal_iter);
123 iterators->push_back(db_iter);
124 }
125
126 return Status::OK();
127 }
128
OpenForReadOnly(const Options & options,const std::string & dbname,DB ** dbptr,bool)129 Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
130 DB** dbptr, bool /*error_if_log_file_exist*/) {
131 *dbptr = nullptr;
132
133 // Try to first open DB as fully compacted DB
134 Status s;
135 s = CompactedDBImpl::Open(options, dbname, dbptr);
136 if (s.ok()) {
137 return s;
138 }
139
140 DBOptions db_options(options);
141 ColumnFamilyOptions cf_options(options);
142 std::vector<ColumnFamilyDescriptor> column_families;
143 column_families.push_back(
144 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
145 std::vector<ColumnFamilyHandle*> handles;
146
147 s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
148 if (s.ok()) {
149 assert(handles.size() == 1);
150 // i can delete the handle since DBImpl is always holding a
151 // reference to default column family
152 delete handles[0];
153 }
154 return s;
155 }
156
OpenForReadOnly(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,bool error_if_log_file_exist)157 Status DB::OpenForReadOnly(
158 const DBOptions& db_options, const std::string& dbname,
159 const std::vector<ColumnFamilyDescriptor>& column_families,
160 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
161 bool error_if_log_file_exist) {
162 *dbptr = nullptr;
163 handles->clear();
164
165 SuperVersionContext sv_context(/* create_superversion */ true);
166 DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
167 impl->mutex_.Lock();
168 Status s = impl->Recover(column_families, true /* read only */,
169 error_if_log_file_exist);
170 if (s.ok()) {
171 // set column family handles
172 for (auto cf : column_families) {
173 auto cfd =
174 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
175 if (cfd == nullptr) {
176 s = Status::InvalidArgument("Column family not found: ", cf.name);
177 break;
178 }
179 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
180 }
181 }
182 if (s.ok()) {
183 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
184 sv_context.NewSuperVersion();
185 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
186 }
187 }
188 impl->mutex_.Unlock();
189 sv_context.Clean();
190 if (s.ok()) {
191 *dbptr = impl;
192 for (auto* h : *handles) {
193 impl->NewThreadStatusCfInfo(
194 reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
195 }
196 } else {
197 for (auto h : *handles) {
198 delete h;
199 }
200 handles->clear();
201 delete impl;
202 }
203 return s;
204 }
205
206 #else // !ROCKSDB_LITE
207
208 Status DB::OpenForReadOnly(const Options& /*options*/,
209 const std::string& /*dbname*/, DB** /*dbptr*/,
210 bool /*error_if_log_file_exist*/) {
211 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
212 }
213
214 Status DB::OpenForReadOnly(
215 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
216 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
217 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
218 bool /*error_if_log_file_exist*/) {
219 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
220 }
221 #endif // !ROCKSDB_LITE
222
223 } // namespace ROCKSDB_NAMESPACE
224