1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/version_edit_handler.h"
11 
12 #include "monitoring/persistent_stats_history.h"
13 
14 namespace ROCKSDB_NAMESPACE {
15 
VersionEditHandler(bool read_only,const std::vector<ColumnFamilyDescriptor> & column_families,VersionSet * version_set,bool track_missing_files,bool no_error_if_table_files_missing)16 VersionEditHandler::VersionEditHandler(
17     bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
18     VersionSet* version_set, bool track_missing_files,
19     bool no_error_if_table_files_missing)
20     : read_only_(read_only),
21       column_families_(column_families),
22       status_(),
23       version_set_(version_set),
24       track_missing_files_(track_missing_files),
25       no_error_if_table_files_missing_(no_error_if_table_files_missing),
26       initialized_(false) {
27   assert(version_set_ != nullptr);
28 }
29 
Iterate(log::Reader & reader,std::string * db_id)30 Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
31   Slice record;
32   std::string scratch;
33   size_t recovered_edits = 0;
34   Status s = Initialize();
35   while (reader.ReadRecord(&record, &scratch) && s.ok()) {
36     VersionEdit edit;
37     s = edit.DecodeFrom(record);
38     if (!s.ok()) {
39       break;
40     }
41     if (edit.has_db_id_) {
42       version_set_->db_id_ = edit.GetDbId();
43       if (db_id != nullptr) {
44         *db_id = version_set_->db_id_;
45       }
46     }
47     s = read_buffer_.AddEdit(&edit);
48     if (!s.ok()) {
49       break;
50     }
51     ColumnFamilyData* cfd = nullptr;
52     if (edit.is_in_atomic_group_) {
53       if (read_buffer_.IsFull()) {
54         for (auto& e : read_buffer_.replay_buffer()) {
55           s = ApplyVersionEdit(e, &cfd);
56           if (!s.ok()) {
57             break;
58           }
59           ++recovered_edits;
60         }
61         if (!s.ok()) {
62           break;
63         }
64         read_buffer_.Clear();
65       }
66     } else {
67       s = ApplyVersionEdit(edit, &cfd);
68       if (s.ok()) {
69         ++recovered_edits;
70       }
71     }
72   }
73 
74   CheckIterationResult(reader, &s);
75 
76   if (!s.ok()) {
77     status_ = s;
78   }
79   return s;
80 }
81 
Initialize()82 Status VersionEditHandler::Initialize() {
83   Status s;
84   if (!initialized_) {
85     for (const auto& cf_desc : column_families_) {
86       name_to_options_.emplace(cf_desc.name, cf_desc.options);
87     }
88     auto default_cf_iter = name_to_options_.find(kDefaultColumnFamilyName);
89     if (default_cf_iter == name_to_options_.end()) {
90       s = Status::InvalidArgument("Default column family not specified");
91     }
92     if (s.ok()) {
93       VersionEdit default_cf_edit;
94       default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
95       default_cf_edit.SetColumnFamily(0);
96       ColumnFamilyData* cfd =
97           CreateCfAndInit(default_cf_iter->second, default_cf_edit);
98       assert(cfd != nullptr);
99 #ifdef NDEBUG
100       (void)cfd;
101 #endif
102       initialized_ = true;
103     }
104   }
105   return s;
106 }
107 
ApplyVersionEdit(VersionEdit & edit,ColumnFamilyData ** cfd)108 Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit,
109                                             ColumnFamilyData** cfd) {
110   Status s;
111   if (edit.is_column_family_add_) {
112     s = OnColumnFamilyAdd(edit, cfd);
113   } else if (edit.is_column_family_drop_) {
114     s = OnColumnFamilyDrop(edit, cfd);
115   } else {
116     s = OnNonCfOperation(edit, cfd);
117   }
118   if (s.ok()) {
119     assert(cfd != nullptr);
120     s = ExtractInfoFromVersionEdit(*cfd, edit);
121   }
122   return s;
123 }
124 
OnColumnFamilyAdd(VersionEdit & edit,ColumnFamilyData ** cfd)125 Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit,
126                                              ColumnFamilyData** cfd) {
127   bool cf_in_not_found = false;
128   bool cf_in_builders = false;
129   CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders);
130 
131   assert(cfd != nullptr);
132   *cfd = nullptr;
133   Status s;
134   if (cf_in_builders || cf_in_not_found) {
135     s = Status::Corruption("MANIFEST adding the same column family twice: " +
136                            edit.column_family_name_);
137   }
138   if (s.ok()) {
139     auto cf_options = name_to_options_.find(edit.column_family_name_);
140     // implicitly add persistent_stats column family without requiring user
141     // to specify
142     ColumnFamilyData* tmp_cfd = nullptr;
143     bool is_persistent_stats_column_family =
144         edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
145     if (cf_options == name_to_options_.end() &&
146         !is_persistent_stats_column_family) {
147       column_families_not_found_.emplace(edit.column_family_,
148                                          edit.column_family_name_);
149     } else {
150       if (is_persistent_stats_column_family) {
151         ColumnFamilyOptions cfo;
152         OptimizeForPersistentStats(&cfo);
153         tmp_cfd = CreateCfAndInit(cfo, edit);
154       } else {
155         tmp_cfd = CreateCfAndInit(cf_options->second, edit);
156       }
157       *cfd = tmp_cfd;
158     }
159   }
160   return s;
161 }
162 
OnColumnFamilyDrop(VersionEdit & edit,ColumnFamilyData ** cfd)163 Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit,
164                                               ColumnFamilyData** cfd) {
165   bool cf_in_not_found = false;
166   bool cf_in_builders = false;
167   CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders);
168 
169   assert(cfd != nullptr);
170   *cfd = nullptr;
171   ColumnFamilyData* tmp_cfd = nullptr;
172   Status s;
173   if (cf_in_builders) {
174     tmp_cfd = DestroyCfAndCleanup(edit);
175   } else if (cf_in_not_found) {
176     column_families_not_found_.erase(edit.column_family_);
177   } else {
178     s = Status::Corruption("MANIFEST - dropping non-existing column family");
179   }
180   *cfd = tmp_cfd;
181   return s;
182 }
183 
OnNonCfOperation(VersionEdit & edit,ColumnFamilyData ** cfd)184 Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
185                                             ColumnFamilyData** cfd) {
186   bool cf_in_not_found = false;
187   bool cf_in_builders = false;
188   CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders);
189 
190   assert(cfd != nullptr);
191   *cfd = nullptr;
192   Status s;
193   if (!cf_in_not_found) {
194     if (!cf_in_builders) {
195       s = Status::Corruption(
196           "MANIFEST record referencing unknown column family");
197     }
198     ColumnFamilyData* tmp_cfd = nullptr;
199     if (s.ok()) {
200       auto builder_iter = builders_.find(edit.column_family_);
201       assert(builder_iter != builders_.end());
202       tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
203           edit.column_family_);
204       assert(tmp_cfd != nullptr);
205       s = MaybeCreateVersion(edit, tmp_cfd, /*force_create_version=*/false);
206       if (s.ok()) {
207         s = builder_iter->second->version_builder()->Apply(&edit);
208       }
209     }
210     *cfd = tmp_cfd;
211   }
212   return s;
213 }
214 
215 // TODO maybe cache the computation result
HasMissingFiles() const216 bool VersionEditHandler::HasMissingFiles() const {
217   bool ret = false;
218   for (const auto& elem : cf_to_missing_files_) {
219     const auto& missing_files = elem.second;
220     if (!missing_files.empty()) {
221       ret = true;
222       break;
223     }
224   }
225   return ret;
226 }
227 
CheckColumnFamilyId(const VersionEdit & edit,bool * cf_in_not_found,bool * cf_in_builders) const228 void VersionEditHandler::CheckColumnFamilyId(const VersionEdit& edit,
229                                              bool* cf_in_not_found,
230                                              bool* cf_in_builders) const {
231   assert(cf_in_not_found != nullptr);
232   assert(cf_in_builders != nullptr);
233   // Not found means that user didn't supply that column
234   // family option AND we encountered column family add
235   // record. Once we encounter column family drop record,
236   // we will delete the column family from
237   // column_families_not_found.
238   bool in_not_found = column_families_not_found_.find(edit.column_family_) !=
239                       column_families_not_found_.end();
240   // in builders means that user supplied that column family
241   // option AND that we encountered column family add record
242   bool in_builders = builders_.find(edit.column_family_) != builders_.end();
243   // They cannot both be true
244   assert(!(in_not_found && in_builders));
245   *cf_in_not_found = in_not_found;
246   *cf_in_builders = in_builders;
247 }
248 
CheckIterationResult(const log::Reader & reader,Status * s)249 void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
250                                               Status* s) {
251   assert(s != nullptr);
252   if (!s->ok()) {
253     read_buffer_.Clear();
254   } else if (!version_edit_params_.has_log_number_ ||
255              !version_edit_params_.has_next_file_number_ ||
256              !version_edit_params_.has_last_sequence_) {
257     std::string msg("no ");
258     if (!version_edit_params_.has_log_number_) {
259       msg.append("log_file_number, ");
260     }
261     if (!version_edit_params_.has_next_file_number_) {
262       msg.append("next_file_number, ");
263     }
264     if (!version_edit_params_.has_last_sequence_) {
265       msg.append("last_sequence, ");
266     }
267     msg = msg.substr(0, msg.size() - 2);
268     msg.append(" entry in MANIFEST");
269     *s = Status::Corruption(msg);
270   }
271   if (s->ok() && !read_only_ && !column_families_not_found_.empty()) {
272     std::string msg;
273     for (const auto& cf : column_families_not_found_) {
274       msg.append(", ");
275       msg.append(cf.second);
276     }
277     msg = msg.substr(2);
278     *s = Status::InvalidArgument("Column families not opened: " + msg);
279   }
280   if (s->ok()) {
281     version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily(
282         version_edit_params_.max_column_family_);
283     version_set_->MarkMinLogNumberToKeep2PC(
284         version_edit_params_.min_log_number_to_keep_);
285     version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_);
286     version_set_->MarkFileNumberUsed(version_edit_params_.log_number_);
287     for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
288       auto builder_iter = builders_.find(cfd->GetID());
289       assert(builder_iter != builders_.end());
290       auto* builder = builder_iter->second->version_builder();
291       if (!builder->CheckConsistencyForNumLevels()) {
292         *s = Status::InvalidArgument(
293             "db has more levels than options.num_levels");
294         break;
295       }
296     }
297   }
298   if (s->ok()) {
299     for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
300       if (cfd->IsDropped()) {
301         continue;
302       }
303       if (read_only_) {
304         cfd->table_cache()->SetTablesAreImmortal();
305       }
306       *s = LoadTables(cfd, /*prefetch_index_and_filter_in_cache=*/false,
307                       /*is_initial_load=*/true);
308       if (!s->ok()) {
309         break;
310       }
311     }
312   }
313   if (s->ok()) {
314     for (auto* cfd : *(version_set_->column_family_set_)) {
315       if (cfd->IsDropped()) {
316         continue;
317       }
318       assert(cfd->initialized());
319       VersionEdit edit;
320       *s = MaybeCreateVersion(edit, cfd, /*force_create_version=*/true);
321       if (!s->ok()) {
322         break;
323       }
324     }
325   }
326   if (s->ok()) {
327     version_set_->manifest_file_size_ = reader.GetReadOffset();
328     assert(version_set_->manifest_file_size_ > 0);
329     version_set_->next_file_number_.store(
330         version_edit_params_.next_file_number_ + 1);
331     version_set_->last_allocated_sequence_ =
332         version_edit_params_.last_sequence_;
333     version_set_->last_published_sequence_ =
334         version_edit_params_.last_sequence_;
335     version_set_->last_sequence_ = version_edit_params_.last_sequence_;
336     version_set_->prev_log_number_ = version_edit_params_.prev_log_number_;
337   }
338 }
339 
CreateCfAndInit(const ColumnFamilyOptions & cf_options,const VersionEdit & edit)340 ColumnFamilyData* VersionEditHandler::CreateCfAndInit(
341     const ColumnFamilyOptions& cf_options, const VersionEdit& edit) {
342   ColumnFamilyData* cfd = version_set_->CreateColumnFamily(cf_options, &edit);
343   assert(cfd != nullptr);
344   cfd->set_initialized();
345   assert(builders_.find(edit.column_family_) == builders_.end());
346   builders_.emplace(edit.column_family_,
347                     VersionBuilderUPtr(new BaseReferencedVersionBuilder(cfd)));
348   if (track_missing_files_) {
349     cf_to_missing_files_.emplace(edit.column_family_,
350                                  std::unordered_set<uint64_t>());
351   }
352   return cfd;
353 }
354 
DestroyCfAndCleanup(const VersionEdit & edit)355 ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
356     const VersionEdit& edit) {
357   auto builder_iter = builders_.find(edit.column_family_);
358   assert(builder_iter != builders_.end());
359   builders_.erase(builder_iter);
360   if (track_missing_files_) {
361     auto missing_files_iter = cf_to_missing_files_.find(edit.column_family_);
362     assert(missing_files_iter != cf_to_missing_files_.end());
363     cf_to_missing_files_.erase(missing_files_iter);
364   }
365   ColumnFamilyData* ret =
366       version_set_->GetColumnFamilySet()->GetColumnFamily(edit.column_family_);
367   assert(ret != nullptr);
368   if (ret->UnrefAndTryDelete()) {
369     ret = nullptr;
370   } else {
371     assert(false);
372   }
373   return ret;
374 }
375 
MaybeCreateVersion(const VersionEdit &,ColumnFamilyData * cfd,bool force_create_version)376 Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
377                                               ColumnFamilyData* cfd,
378                                               bool force_create_version) {
379   assert(cfd->initialized());
380   if (force_create_version) {
381     auto builder_iter = builders_.find(cfd->GetID());
382     assert(builder_iter != builders_.end());
383     auto* builder = builder_iter->second->version_builder();
384     auto* v = new Version(cfd, version_set_, version_set_->file_options_,
385                           *cfd->GetLatestMutableCFOptions(),
386                           version_set_->current_version_number_++);
387     builder->SaveTo(v->storage_info());
388     // Install new version
389     v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
390                     !(version_set_->db_options_->skip_stats_update_on_db_open));
391     version_set_->AppendVersion(cfd, v);
392   }
393   return Status::OK();
394 }
395 
LoadTables(ColumnFamilyData * cfd,bool prefetch_index_and_filter_in_cache,bool is_initial_load)396 Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
397                                       bool prefetch_index_and_filter_in_cache,
398                                       bool is_initial_load) {
399   assert(cfd != nullptr);
400   assert(!cfd->IsDropped());
401   Status s;
402   auto builder_iter = builders_.find(cfd->GetID());
403   assert(builder_iter != builders_.end());
404   assert(builder_iter->second != nullptr);
405   VersionBuilder* builder = builder_iter->second->version_builder();
406   assert(builder);
407   s = builder->LoadTableHandlers(
408       cfd->internal_stats(),
409       version_set_->db_options_->max_file_opening_threads,
410       prefetch_index_and_filter_in_cache, is_initial_load,
411       cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
412   if (s.IsPathNotFound() && no_error_if_table_files_missing_) {
413     s = Status::OK();
414   }
415   if (!s.ok() && !version_set_->db_options_->paranoid_checks) {
416     s = Status::OK();
417   }
418   return s;
419 }
420 
ExtractInfoFromVersionEdit(ColumnFamilyData * cfd,const VersionEdit & edit)421 Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
422                                                       const VersionEdit& edit) {
423   Status s;
424   if (cfd != nullptr) {
425     if (edit.has_db_id_) {
426       version_edit_params_.SetDBId(edit.db_id_);
427     }
428     if (edit.has_log_number_) {
429       if (cfd->GetLogNumber() > edit.log_number_) {
430         ROCKS_LOG_WARN(
431             version_set_->db_options()->info_log,
432             "MANIFEST corruption detected, but ignored - Log numbers in "
433             "records NOT monotonically increasing");
434       } else {
435         cfd->SetLogNumber(edit.log_number_);
436         version_edit_params_.SetLogNumber(edit.log_number_);
437       }
438     }
439     if (edit.has_comparator_ &&
440         edit.comparator_ != cfd->user_comparator()->Name()) {
441       s = Status::InvalidArgument(
442           cfd->user_comparator()->Name(),
443           "does not match existing comparator " + edit.comparator_);
444     }
445   }
446 
447   if (s.ok()) {
448     if (edit.has_prev_log_number_) {
449       version_edit_params_.SetPrevLogNumber(edit.prev_log_number_);
450     }
451     if (edit.has_next_file_number_) {
452       version_edit_params_.SetNextFile(edit.next_file_number_);
453     }
454     if (edit.has_max_column_family_) {
455       version_edit_params_.SetMaxColumnFamily(edit.max_column_family_);
456     }
457     if (edit.has_min_log_number_to_keep_) {
458       version_edit_params_.min_log_number_to_keep_ =
459           std::max(version_edit_params_.min_log_number_to_keep_,
460                    edit.min_log_number_to_keep_);
461     }
462     if (edit.has_last_sequence_) {
463       version_edit_params_.SetLastSequence(edit.last_sequence_);
464     }
465     if (!version_edit_params_.has_prev_log_number_) {
466       version_edit_params_.SetPrevLogNumber(0);
467     }
468   }
469   return s;
470 }
471 
VersionEditHandlerPointInTime(bool read_only,const std::vector<ColumnFamilyDescriptor> & column_families,VersionSet * version_set)472 VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
473     bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
474     VersionSet* version_set)
475     : VersionEditHandler(read_only, column_families, version_set,
476                          /*track_missing_files=*/true,
477                          /*no_error_if_table_files_missing=*/true) {}
478 
~VersionEditHandlerPointInTime()479 VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
480   for (const auto& elem : versions_) {
481     delete elem.second;
482   }
483   versions_.clear();
484 }
485 
CheckIterationResult(const log::Reader & reader,Status * s)486 void VersionEditHandlerPointInTime::CheckIterationResult(
487     const log::Reader& reader, Status* s) {
488   VersionEditHandler::CheckIterationResult(reader, s);
489   assert(s != nullptr);
490   if (s->ok()) {
491     for (auto* cfd : *(version_set_->column_family_set_)) {
492       if (cfd->IsDropped()) {
493         continue;
494       }
495       assert(cfd->initialized());
496       auto v_iter = versions_.find(cfd->GetID());
497       if (v_iter != versions_.end()) {
498         assert(v_iter->second != nullptr);
499 
500         version_set_->AppendVersion(cfd, v_iter->second);
501         versions_.erase(v_iter);
502       }
503     }
504   }
505 }
506 
DestroyCfAndCleanup(const VersionEdit & edit)507 ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup(
508     const VersionEdit& edit) {
509   ColumnFamilyData* cfd = VersionEditHandler::DestroyCfAndCleanup(edit);
510   auto v_iter = versions_.find(edit.column_family_);
511   if (v_iter != versions_.end()) {
512     delete v_iter->second;
513     versions_.erase(v_iter);
514   }
515   return cfd;
516 }
517 
MaybeCreateVersion(const VersionEdit & edit,ColumnFamilyData * cfd,bool force_create_version)518 Status VersionEditHandlerPointInTime::MaybeCreateVersion(
519     const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
520   assert(cfd != nullptr);
521   if (!force_create_version) {
522     assert(edit.column_family_ == cfd->GetID());
523   }
524   auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID());
525   assert(missing_files_iter != cf_to_missing_files_.end());
526   std::unordered_set<uint64_t>& missing_files = missing_files_iter->second;
527   const bool prev_has_missing_files = !missing_files.empty();
528   for (const auto& file : edit.GetDeletedFiles()) {
529     uint64_t file_num = file.second;
530     auto fiter = missing_files.find(file_num);
531     if (fiter != missing_files.end()) {
532       missing_files.erase(fiter);
533     }
534   }
535   Status s;
536   for (const auto& elem : edit.GetNewFiles()) {
537     const FileMetaData& meta = elem.second;
538     const FileDescriptor& fd = meta.fd;
539     uint64_t file_num = fd.GetNumber();
540     const std::string fpath =
541         MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num);
542     s = version_set_->VerifyFileMetadata(fpath, meta);
543     if (s.IsPathNotFound() || s.IsNotFound()) {
544       missing_files.insert(file_num);
545       s = Status::OK();
546     }
547   }
548   bool missing_info = !version_edit_params_.has_log_number_ ||
549                       !version_edit_params_.has_next_file_number_ ||
550                       !version_edit_params_.has_last_sequence_;
551 
552   // Create version before apply edit
553   if (!missing_info && ((!missing_files.empty() && !prev_has_missing_files) ||
554                         (missing_files.empty() && force_create_version))) {
555     auto builder_iter = builders_.find(cfd->GetID());
556     assert(builder_iter != builders_.end());
557     auto* builder = builder_iter->second->version_builder();
558     auto* version = new Version(cfd, version_set_, version_set_->file_options_,
559                                 *cfd->GetLatestMutableCFOptions(),
560                                 version_set_->current_version_number_++);
561     builder->SaveTo(version->storage_info());
562     version->PrepareApply(
563         *cfd->GetLatestMutableCFOptions(),
564         !version_set_->db_options_->skip_stats_update_on_db_open);
565     auto v_iter = versions_.find(cfd->GetID());
566     if (v_iter != versions_.end()) {
567       delete v_iter->second;
568       v_iter->second = version;
569     } else {
570       versions_.emplace(cfd->GetID(), version);
571     }
572   }
573   return s;
574 }
575 
576 }  // namespace ROCKSDB_NAMESPACE
577