1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_edit_handler.h"
11
12 #include "monitoring/persistent_stats_history.h"
13
14 namespace ROCKSDB_NAMESPACE {
15
VersionEditHandler(bool read_only,const std::vector<ColumnFamilyDescriptor> & column_families,VersionSet * version_set,bool track_missing_files,bool no_error_if_table_files_missing)16 VersionEditHandler::VersionEditHandler(
17 bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
18 VersionSet* version_set, bool track_missing_files,
19 bool no_error_if_table_files_missing)
20 : read_only_(read_only),
21 column_families_(column_families),
22 status_(),
23 version_set_(version_set),
24 track_missing_files_(track_missing_files),
25 no_error_if_table_files_missing_(no_error_if_table_files_missing),
26 initialized_(false) {
27 assert(version_set_ != nullptr);
28 }
29
Iterate(log::Reader & reader,std::string * db_id)30 Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
31 Slice record;
32 std::string scratch;
33 size_t recovered_edits = 0;
34 Status s = Initialize();
35 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
36 VersionEdit edit;
37 s = edit.DecodeFrom(record);
38 if (!s.ok()) {
39 break;
40 }
41 if (edit.has_db_id_) {
42 version_set_->db_id_ = edit.GetDbId();
43 if (db_id != nullptr) {
44 *db_id = version_set_->db_id_;
45 }
46 }
47 s = read_buffer_.AddEdit(&edit);
48 if (!s.ok()) {
49 break;
50 }
51 ColumnFamilyData* cfd = nullptr;
52 if (edit.is_in_atomic_group_) {
53 if (read_buffer_.IsFull()) {
54 for (auto& e : read_buffer_.replay_buffer()) {
55 s = ApplyVersionEdit(e, &cfd);
56 if (!s.ok()) {
57 break;
58 }
59 ++recovered_edits;
60 }
61 if (!s.ok()) {
62 break;
63 }
64 read_buffer_.Clear();
65 }
66 } else {
67 s = ApplyVersionEdit(edit, &cfd);
68 if (s.ok()) {
69 ++recovered_edits;
70 }
71 }
72 }
73
74 CheckIterationResult(reader, &s);
75
76 if (!s.ok()) {
77 status_ = s;
78 }
79 return s;
80 }
81
Initialize()82 Status VersionEditHandler::Initialize() {
83 Status s;
84 if (!initialized_) {
85 for (const auto& cf_desc : column_families_) {
86 name_to_options_.emplace(cf_desc.name, cf_desc.options);
87 }
88 auto default_cf_iter = name_to_options_.find(kDefaultColumnFamilyName);
89 if (default_cf_iter == name_to_options_.end()) {
90 s = Status::InvalidArgument("Default column family not specified");
91 }
92 if (s.ok()) {
93 VersionEdit default_cf_edit;
94 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
95 default_cf_edit.SetColumnFamily(0);
96 ColumnFamilyData* cfd =
97 CreateCfAndInit(default_cf_iter->second, default_cf_edit);
98 assert(cfd != nullptr);
99 #ifdef NDEBUG
100 (void)cfd;
101 #endif
102 initialized_ = true;
103 }
104 }
105 return s;
106 }
107
ApplyVersionEdit(VersionEdit & edit,ColumnFamilyData ** cfd)108 Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit,
109 ColumnFamilyData** cfd) {
110 Status s;
111 if (edit.is_column_family_add_) {
112 s = OnColumnFamilyAdd(edit, cfd);
113 } else if (edit.is_column_family_drop_) {
114 s = OnColumnFamilyDrop(edit, cfd);
115 } else {
116 s = OnNonCfOperation(edit, cfd);
117 }
118 if (s.ok()) {
119 assert(cfd != nullptr);
120 s = ExtractInfoFromVersionEdit(*cfd, edit);
121 }
122 return s;
123 }
124
OnColumnFamilyAdd(VersionEdit & edit,ColumnFamilyData ** cfd)125 Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit,
126 ColumnFamilyData** cfd) {
127 bool cf_in_not_found = false;
128 bool cf_in_builders = false;
129 CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders);
130
131 assert(cfd != nullptr);
132 *cfd = nullptr;
133 Status s;
134 if (cf_in_builders || cf_in_not_found) {
135 s = Status::Corruption("MANIFEST adding the same column family twice: " +
136 edit.column_family_name_);
137 }
138 if (s.ok()) {
139 auto cf_options = name_to_options_.find(edit.column_family_name_);
140 // implicitly add persistent_stats column family without requiring user
141 // to specify
142 ColumnFamilyData* tmp_cfd = nullptr;
143 bool is_persistent_stats_column_family =
144 edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
145 if (cf_options == name_to_options_.end() &&
146 !is_persistent_stats_column_family) {
147 column_families_not_found_.emplace(edit.column_family_,
148 edit.column_family_name_);
149 } else {
150 if (is_persistent_stats_column_family) {
151 ColumnFamilyOptions cfo;
152 OptimizeForPersistentStats(&cfo);
153 tmp_cfd = CreateCfAndInit(cfo, edit);
154 } else {
155 tmp_cfd = CreateCfAndInit(cf_options->second, edit);
156 }
157 *cfd = tmp_cfd;
158 }
159 }
160 return s;
161 }
162
OnColumnFamilyDrop(VersionEdit & edit,ColumnFamilyData ** cfd)163 Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit,
164 ColumnFamilyData** cfd) {
165 bool cf_in_not_found = false;
166 bool cf_in_builders = false;
167 CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders);
168
169 assert(cfd != nullptr);
170 *cfd = nullptr;
171 ColumnFamilyData* tmp_cfd = nullptr;
172 Status s;
173 if (cf_in_builders) {
174 tmp_cfd = DestroyCfAndCleanup(edit);
175 } else if (cf_in_not_found) {
176 column_families_not_found_.erase(edit.column_family_);
177 } else {
178 s = Status::Corruption("MANIFEST - dropping non-existing column family");
179 }
180 *cfd = tmp_cfd;
181 return s;
182 }
183
OnNonCfOperation(VersionEdit & edit,ColumnFamilyData ** cfd)184 Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
185 ColumnFamilyData** cfd) {
186 bool cf_in_not_found = false;
187 bool cf_in_builders = false;
188 CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders);
189
190 assert(cfd != nullptr);
191 *cfd = nullptr;
192 Status s;
193 if (!cf_in_not_found) {
194 if (!cf_in_builders) {
195 s = Status::Corruption(
196 "MANIFEST record referencing unknown column family");
197 }
198 ColumnFamilyData* tmp_cfd = nullptr;
199 if (s.ok()) {
200 auto builder_iter = builders_.find(edit.column_family_);
201 assert(builder_iter != builders_.end());
202 tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
203 edit.column_family_);
204 assert(tmp_cfd != nullptr);
205 s = MaybeCreateVersion(edit, tmp_cfd, /*force_create_version=*/false);
206 if (s.ok()) {
207 s = builder_iter->second->version_builder()->Apply(&edit);
208 }
209 }
210 *cfd = tmp_cfd;
211 }
212 return s;
213 }
214
215 // TODO maybe cache the computation result
HasMissingFiles() const216 bool VersionEditHandler::HasMissingFiles() const {
217 bool ret = false;
218 for (const auto& elem : cf_to_missing_files_) {
219 const auto& missing_files = elem.second;
220 if (!missing_files.empty()) {
221 ret = true;
222 break;
223 }
224 }
225 return ret;
226 }
227
CheckColumnFamilyId(const VersionEdit & edit,bool * cf_in_not_found,bool * cf_in_builders) const228 void VersionEditHandler::CheckColumnFamilyId(const VersionEdit& edit,
229 bool* cf_in_not_found,
230 bool* cf_in_builders) const {
231 assert(cf_in_not_found != nullptr);
232 assert(cf_in_builders != nullptr);
233 // Not found means that user didn't supply that column
234 // family option AND we encountered column family add
235 // record. Once we encounter column family drop record,
236 // we will delete the column family from
237 // column_families_not_found.
238 bool in_not_found = column_families_not_found_.find(edit.column_family_) !=
239 column_families_not_found_.end();
240 // in builders means that user supplied that column family
241 // option AND that we encountered column family add record
242 bool in_builders = builders_.find(edit.column_family_) != builders_.end();
243 // They cannot both be true
244 assert(!(in_not_found && in_builders));
245 *cf_in_not_found = in_not_found;
246 *cf_in_builders = in_builders;
247 }
248
CheckIterationResult(const log::Reader & reader,Status * s)249 void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
250 Status* s) {
251 assert(s != nullptr);
252 if (!s->ok()) {
253 read_buffer_.Clear();
254 } else if (!version_edit_params_.has_log_number_ ||
255 !version_edit_params_.has_next_file_number_ ||
256 !version_edit_params_.has_last_sequence_) {
257 std::string msg("no ");
258 if (!version_edit_params_.has_log_number_) {
259 msg.append("log_file_number, ");
260 }
261 if (!version_edit_params_.has_next_file_number_) {
262 msg.append("next_file_number, ");
263 }
264 if (!version_edit_params_.has_last_sequence_) {
265 msg.append("last_sequence, ");
266 }
267 msg = msg.substr(0, msg.size() - 2);
268 msg.append(" entry in MANIFEST");
269 *s = Status::Corruption(msg);
270 }
271 if (s->ok() && !read_only_ && !column_families_not_found_.empty()) {
272 std::string msg;
273 for (const auto& cf : column_families_not_found_) {
274 msg.append(", ");
275 msg.append(cf.second);
276 }
277 msg = msg.substr(2);
278 *s = Status::InvalidArgument("Column families not opened: " + msg);
279 }
280 if (s->ok()) {
281 version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily(
282 version_edit_params_.max_column_family_);
283 version_set_->MarkMinLogNumberToKeep2PC(
284 version_edit_params_.min_log_number_to_keep_);
285 version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_);
286 version_set_->MarkFileNumberUsed(version_edit_params_.log_number_);
287 for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
288 auto builder_iter = builders_.find(cfd->GetID());
289 assert(builder_iter != builders_.end());
290 auto* builder = builder_iter->second->version_builder();
291 if (!builder->CheckConsistencyForNumLevels()) {
292 *s = Status::InvalidArgument(
293 "db has more levels than options.num_levels");
294 break;
295 }
296 }
297 }
298 if (s->ok()) {
299 for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
300 if (cfd->IsDropped()) {
301 continue;
302 }
303 if (read_only_) {
304 cfd->table_cache()->SetTablesAreImmortal();
305 }
306 *s = LoadTables(cfd, /*prefetch_index_and_filter_in_cache=*/false,
307 /*is_initial_load=*/true);
308 if (!s->ok()) {
309 break;
310 }
311 }
312 }
313 if (s->ok()) {
314 for (auto* cfd : *(version_set_->column_family_set_)) {
315 if (cfd->IsDropped()) {
316 continue;
317 }
318 assert(cfd->initialized());
319 VersionEdit edit;
320 *s = MaybeCreateVersion(edit, cfd, /*force_create_version=*/true);
321 if (!s->ok()) {
322 break;
323 }
324 }
325 }
326 if (s->ok()) {
327 version_set_->manifest_file_size_ = reader.GetReadOffset();
328 assert(version_set_->manifest_file_size_ > 0);
329 version_set_->next_file_number_.store(
330 version_edit_params_.next_file_number_ + 1);
331 version_set_->last_allocated_sequence_ =
332 version_edit_params_.last_sequence_;
333 version_set_->last_published_sequence_ =
334 version_edit_params_.last_sequence_;
335 version_set_->last_sequence_ = version_edit_params_.last_sequence_;
336 version_set_->prev_log_number_ = version_edit_params_.prev_log_number_;
337 }
338 }
339
CreateCfAndInit(const ColumnFamilyOptions & cf_options,const VersionEdit & edit)340 ColumnFamilyData* VersionEditHandler::CreateCfAndInit(
341 const ColumnFamilyOptions& cf_options, const VersionEdit& edit) {
342 ColumnFamilyData* cfd = version_set_->CreateColumnFamily(cf_options, &edit);
343 assert(cfd != nullptr);
344 cfd->set_initialized();
345 assert(builders_.find(edit.column_family_) == builders_.end());
346 builders_.emplace(edit.column_family_,
347 VersionBuilderUPtr(new BaseReferencedVersionBuilder(cfd)));
348 if (track_missing_files_) {
349 cf_to_missing_files_.emplace(edit.column_family_,
350 std::unordered_set<uint64_t>());
351 }
352 return cfd;
353 }
354
DestroyCfAndCleanup(const VersionEdit & edit)355 ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
356 const VersionEdit& edit) {
357 auto builder_iter = builders_.find(edit.column_family_);
358 assert(builder_iter != builders_.end());
359 builders_.erase(builder_iter);
360 if (track_missing_files_) {
361 auto missing_files_iter = cf_to_missing_files_.find(edit.column_family_);
362 assert(missing_files_iter != cf_to_missing_files_.end());
363 cf_to_missing_files_.erase(missing_files_iter);
364 }
365 ColumnFamilyData* ret =
366 version_set_->GetColumnFamilySet()->GetColumnFamily(edit.column_family_);
367 assert(ret != nullptr);
368 if (ret->UnrefAndTryDelete()) {
369 ret = nullptr;
370 } else {
371 assert(false);
372 }
373 return ret;
374 }
375
MaybeCreateVersion(const VersionEdit &,ColumnFamilyData * cfd,bool force_create_version)376 Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
377 ColumnFamilyData* cfd,
378 bool force_create_version) {
379 assert(cfd->initialized());
380 if (force_create_version) {
381 auto builder_iter = builders_.find(cfd->GetID());
382 assert(builder_iter != builders_.end());
383 auto* builder = builder_iter->second->version_builder();
384 auto* v = new Version(cfd, version_set_, version_set_->file_options_,
385 *cfd->GetLatestMutableCFOptions(),
386 version_set_->current_version_number_++);
387 builder->SaveTo(v->storage_info());
388 // Install new version
389 v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
390 !(version_set_->db_options_->skip_stats_update_on_db_open));
391 version_set_->AppendVersion(cfd, v);
392 }
393 return Status::OK();
394 }
395
LoadTables(ColumnFamilyData * cfd,bool prefetch_index_and_filter_in_cache,bool is_initial_load)396 Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
397 bool prefetch_index_and_filter_in_cache,
398 bool is_initial_load) {
399 assert(cfd != nullptr);
400 assert(!cfd->IsDropped());
401 Status s;
402 auto builder_iter = builders_.find(cfd->GetID());
403 assert(builder_iter != builders_.end());
404 assert(builder_iter->second != nullptr);
405 VersionBuilder* builder = builder_iter->second->version_builder();
406 assert(builder);
407 s = builder->LoadTableHandlers(
408 cfd->internal_stats(),
409 version_set_->db_options_->max_file_opening_threads,
410 prefetch_index_and_filter_in_cache, is_initial_load,
411 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
412 if (s.IsPathNotFound() && no_error_if_table_files_missing_) {
413 s = Status::OK();
414 }
415 if (!s.ok() && !version_set_->db_options_->paranoid_checks) {
416 s = Status::OK();
417 }
418 return s;
419 }
420
ExtractInfoFromVersionEdit(ColumnFamilyData * cfd,const VersionEdit & edit)421 Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
422 const VersionEdit& edit) {
423 Status s;
424 if (cfd != nullptr) {
425 if (edit.has_db_id_) {
426 version_edit_params_.SetDBId(edit.db_id_);
427 }
428 if (edit.has_log_number_) {
429 if (cfd->GetLogNumber() > edit.log_number_) {
430 ROCKS_LOG_WARN(
431 version_set_->db_options()->info_log,
432 "MANIFEST corruption detected, but ignored - Log numbers in "
433 "records NOT monotonically increasing");
434 } else {
435 cfd->SetLogNumber(edit.log_number_);
436 version_edit_params_.SetLogNumber(edit.log_number_);
437 }
438 }
439 if (edit.has_comparator_ &&
440 edit.comparator_ != cfd->user_comparator()->Name()) {
441 s = Status::InvalidArgument(
442 cfd->user_comparator()->Name(),
443 "does not match existing comparator " + edit.comparator_);
444 }
445 }
446
447 if (s.ok()) {
448 if (edit.has_prev_log_number_) {
449 version_edit_params_.SetPrevLogNumber(edit.prev_log_number_);
450 }
451 if (edit.has_next_file_number_) {
452 version_edit_params_.SetNextFile(edit.next_file_number_);
453 }
454 if (edit.has_max_column_family_) {
455 version_edit_params_.SetMaxColumnFamily(edit.max_column_family_);
456 }
457 if (edit.has_min_log_number_to_keep_) {
458 version_edit_params_.min_log_number_to_keep_ =
459 std::max(version_edit_params_.min_log_number_to_keep_,
460 edit.min_log_number_to_keep_);
461 }
462 if (edit.has_last_sequence_) {
463 version_edit_params_.SetLastSequence(edit.last_sequence_);
464 }
465 if (!version_edit_params_.has_prev_log_number_) {
466 version_edit_params_.SetPrevLogNumber(0);
467 }
468 }
469 return s;
470 }
471
VersionEditHandlerPointInTime(bool read_only,const std::vector<ColumnFamilyDescriptor> & column_families,VersionSet * version_set)472 VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
473 bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
474 VersionSet* version_set)
475 : VersionEditHandler(read_only, column_families, version_set,
476 /*track_missing_files=*/true,
477 /*no_error_if_table_files_missing=*/true) {}
478
~VersionEditHandlerPointInTime()479 VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
480 for (const auto& elem : versions_) {
481 delete elem.second;
482 }
483 versions_.clear();
484 }
485
CheckIterationResult(const log::Reader & reader,Status * s)486 void VersionEditHandlerPointInTime::CheckIterationResult(
487 const log::Reader& reader, Status* s) {
488 VersionEditHandler::CheckIterationResult(reader, s);
489 assert(s != nullptr);
490 if (s->ok()) {
491 for (auto* cfd : *(version_set_->column_family_set_)) {
492 if (cfd->IsDropped()) {
493 continue;
494 }
495 assert(cfd->initialized());
496 auto v_iter = versions_.find(cfd->GetID());
497 if (v_iter != versions_.end()) {
498 assert(v_iter->second != nullptr);
499
500 version_set_->AppendVersion(cfd, v_iter->second);
501 versions_.erase(v_iter);
502 }
503 }
504 }
505 }
506
DestroyCfAndCleanup(const VersionEdit & edit)507 ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup(
508 const VersionEdit& edit) {
509 ColumnFamilyData* cfd = VersionEditHandler::DestroyCfAndCleanup(edit);
510 auto v_iter = versions_.find(edit.column_family_);
511 if (v_iter != versions_.end()) {
512 delete v_iter->second;
513 versions_.erase(v_iter);
514 }
515 return cfd;
516 }
517
MaybeCreateVersion(const VersionEdit & edit,ColumnFamilyData * cfd,bool force_create_version)518 Status VersionEditHandlerPointInTime::MaybeCreateVersion(
519 const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
520 assert(cfd != nullptr);
521 if (!force_create_version) {
522 assert(edit.column_family_ == cfd->GetID());
523 }
524 auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID());
525 assert(missing_files_iter != cf_to_missing_files_.end());
526 std::unordered_set<uint64_t>& missing_files = missing_files_iter->second;
527 const bool prev_has_missing_files = !missing_files.empty();
528 for (const auto& file : edit.GetDeletedFiles()) {
529 uint64_t file_num = file.second;
530 auto fiter = missing_files.find(file_num);
531 if (fiter != missing_files.end()) {
532 missing_files.erase(fiter);
533 }
534 }
535 Status s;
536 for (const auto& elem : edit.GetNewFiles()) {
537 const FileMetaData& meta = elem.second;
538 const FileDescriptor& fd = meta.fd;
539 uint64_t file_num = fd.GetNumber();
540 const std::string fpath =
541 MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num);
542 s = version_set_->VerifyFileMetadata(fpath, meta);
543 if (s.IsPathNotFound() || s.IsNotFound()) {
544 missing_files.insert(file_num);
545 s = Status::OK();
546 }
547 }
548 bool missing_info = !version_edit_params_.has_log_number_ ||
549 !version_edit_params_.has_next_file_number_ ||
550 !version_edit_params_.has_last_sequence_;
551
552 // Create version before apply edit
553 if (!missing_info && ((!missing_files.empty() && !prev_has_missing_files) ||
554 (missing_files.empty() && force_create_version))) {
555 auto builder_iter = builders_.find(cfd->GetID());
556 assert(builder_iter != builders_.end());
557 auto* builder = builder_iter->second->version_builder();
558 auto* version = new Version(cfd, version_set_, version_set_->file_options_,
559 *cfd->GetLatestMutableCFOptions(),
560 version_set_->current_version_number_++);
561 builder->SaveTo(version->storage_info());
562 version->PrepareApply(
563 *cfd->GetLatestMutableCFOptions(),
564 !version_set_->db_options_->skip_stats_update_on_db_open);
565 auto v_iter = versions_.find(cfd->GetID());
566 if (v_iter != versions_.end()) {
567 delete v_iter->second;
568 v_iter->second = version;
569 } else {
570 versions_.emplace(cfd->GetID(), version);
571 }
572 }
573 return s;
574 }
575
576 } // namespace ROCKSDB_NAMESPACE
577