1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/db_impl/db_impl_readonly.h"
7 #include "db/arena_wrapped_db_iter.h"
8 
9 #include "db/compacted_db_impl.h"
10 #include "db/db_impl/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/merge_context.h"
13 #include "monitoring/perf_context_imp.h"
14 
15 namespace ROCKSDB_NAMESPACE {
16 
17 #ifndef ROCKSDB_LITE
18 
DBImplReadOnly(const DBOptions & db_options,const std::string & dbname)19 DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
20                                const std::string& dbname)
21     : DBImpl(db_options, dbname) {
22   ROCKS_LOG_INFO(immutable_db_options_.info_log,
23                  "Opening the db in read only mode");
24   LogFlush(immutable_db_options_.info_log);
25 }
26 
~DBImplReadOnly()27 DBImplReadOnly::~DBImplReadOnly() {}
28 
29 // Implementations of the DB interface
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)30 Status DBImplReadOnly::Get(const ReadOptions& read_options,
31                            ColumnFamilyHandle* column_family, const Slice& key,
32                            PinnableSlice* pinnable_val) {
33   assert(pinnable_val != nullptr);
34   // TODO: stopwatch DB_GET needed?, perf timer needed?
35   PERF_TIMER_GUARD(get_snapshot_time);
36   Status s;
37   SequenceNumber snapshot = versions_->LastSequence();
38   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
39   auto cfd = cfh->cfd();
40   if (tracer_) {
41     InstrumentedMutexLock lock(&trace_mutex_);
42     if (tracer_) {
43       tracer_->Get(column_family, key);
44     }
45   }
46   SuperVersion* super_version = cfd->GetSuperVersion();
47   MergeContext merge_context;
48   SequenceNumber max_covering_tombstone_seq = 0;
49   LookupKey lkey(key, snapshot);
50   PERF_TIMER_STOP(get_snapshot_time);
51   if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
52                               /*timestamp=*/nullptr, &s, &merge_context,
53                               &max_covering_tombstone_seq, read_options)) {
54     pinnable_val->PinSelf();
55     RecordTick(stats_, MEMTABLE_HIT);
56   } else {
57     PERF_TIMER_GUARD(get_from_output_files_time);
58     super_version->current->Get(read_options, lkey, pinnable_val,
59                                 /*timestamp=*/nullptr, &s, &merge_context,
60                                 &max_covering_tombstone_seq);
61     RecordTick(stats_, MEMTABLE_MISS);
62   }
63   RecordTick(stats_, NUMBER_KEYS_READ);
64   size_t size = pinnable_val->size();
65   RecordTick(stats_, BYTES_READ, size);
66   RecordInHistogram(stats_, BYTES_PER_READ, size);
67   PERF_COUNTER_ADD(get_read_bytes, size);
68   return s;
69 }
70 
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)71 Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
72                                       ColumnFamilyHandle* column_family) {
73   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
74   auto cfd = cfh->cfd();
75   SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
76   SequenceNumber latest_snapshot = versions_->LastSequence();
77   SequenceNumber read_seq =
78       read_options.snapshot != nullptr
79           ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
80                 ->number_
81           : latest_snapshot;
82   ReadCallback* read_callback = nullptr;  // No read callback provided.
83   auto db_iter = NewArenaWrappedDbIterator(
84       env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
85       read_seq,
86       super_version->mutable_cf_options.max_sequential_skip_in_iterations,
87       super_version->version_number, read_callback);
88   auto internal_iter =
89       NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
90                           db_iter->GetRangeDelAggregator(), read_seq);
91   db_iter->SetIterUnderDBIter(internal_iter);
92   return db_iter;
93 }
94 
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)95 Status DBImplReadOnly::NewIterators(
96     const ReadOptions& read_options,
97     const std::vector<ColumnFamilyHandle*>& column_families,
98     std::vector<Iterator*>* iterators) {
99   ReadCallback* read_callback = nullptr;  // No read callback provided.
100   if (iterators == nullptr) {
101     return Status::InvalidArgument("iterators not allowed to be nullptr");
102   }
103   iterators->clear();
104   iterators->reserve(column_families.size());
105   SequenceNumber latest_snapshot = versions_->LastSequence();
106   SequenceNumber read_seq =
107       read_options.snapshot != nullptr
108           ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
109                 ->number_
110           : latest_snapshot;
111 
112   for (auto cfh : column_families) {
113     auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
114     auto* sv = cfd->GetSuperVersion()->Ref();
115     auto* db_iter = NewArenaWrappedDbIterator(
116         env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
117         sv->mutable_cf_options.max_sequential_skip_in_iterations,
118         sv->version_number, read_callback);
119     auto* internal_iter =
120         NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
121                             db_iter->GetRangeDelAggregator(), read_seq);
122     db_iter->SetIterUnderDBIter(internal_iter);
123     iterators->push_back(db_iter);
124   }
125 
126   return Status::OK();
127 }
128 
OpenForReadOnly(const Options & options,const std::string & dbname,DB ** dbptr,bool)129 Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
130                            DB** dbptr, bool /*error_if_log_file_exist*/) {
131   *dbptr = nullptr;
132 
133   // Try to first open DB as fully compacted DB
134   Status s;
135   s = CompactedDBImpl::Open(options, dbname, dbptr);
136   if (s.ok()) {
137     return s;
138   }
139 
140   DBOptions db_options(options);
141   ColumnFamilyOptions cf_options(options);
142   std::vector<ColumnFamilyDescriptor> column_families;
143   column_families.push_back(
144       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
145   std::vector<ColumnFamilyHandle*> handles;
146 
147   s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
148   if (s.ok()) {
149     assert(handles.size() == 1);
150     // i can delete the handle since DBImpl is always holding a
151     // reference to default column family
152     delete handles[0];
153   }
154   return s;
155 }
156 
OpenForReadOnly(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,bool error_if_log_file_exist)157 Status DB::OpenForReadOnly(
158     const DBOptions& db_options, const std::string& dbname,
159     const std::vector<ColumnFamilyDescriptor>& column_families,
160     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
161     bool error_if_log_file_exist) {
162   *dbptr = nullptr;
163   handles->clear();
164 
165   SuperVersionContext sv_context(/* create_superversion */ true);
166   DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
167   impl->mutex_.Lock();
168   Status s = impl->Recover(column_families, true /* read only */,
169                            error_if_log_file_exist);
170   if (s.ok()) {
171     // set column family handles
172     for (auto cf : column_families) {
173       auto cfd =
174           impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
175       if (cfd == nullptr) {
176         s = Status::InvalidArgument("Column family not found: ", cf.name);
177         break;
178       }
179       handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
180     }
181   }
182   if (s.ok()) {
183     for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
184       sv_context.NewSuperVersion();
185       cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
186     }
187   }
188   impl->mutex_.Unlock();
189   sv_context.Clean();
190   if (s.ok()) {
191     *dbptr = impl;
192     for (auto* h : *handles) {
193       impl->NewThreadStatusCfInfo(
194           reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
195     }
196   } else {
197     for (auto h : *handles) {
198       delete h;
199     }
200     handles->clear();
201     delete impl;
202   }
203   return s;
204 }
205 
206 #else   // !ROCKSDB_LITE
207 
208 Status DB::OpenForReadOnly(const Options& /*options*/,
209                            const std::string& /*dbname*/, DB** /*dbptr*/,
210                            bool /*error_if_log_file_exist*/) {
211   return Status::NotSupported("Not supported in ROCKSDB_LITE.");
212 }
213 
214 Status DB::OpenForReadOnly(
215     const DBOptions& /*db_options*/, const std::string& /*dbname*/,
216     const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
217     std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
218     bool /*error_if_log_file_exist*/) {
219   return Status::NotSupported("Not supported in ROCKSDB_LITE.");
220 }
221 #endif  // !ROCKSDB_LITE
222 
223 }  // namespace ROCKSDB_NAMESPACE
224