1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_impl/db_impl.h"
11 
12 #include <cinttypes>
13 #include <vector>
14 
15 #include "db/column_family.h"
16 #include "db/job_context.h"
17 #include "db/version_set.h"
18 #include "rocksdb/status.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 
22 #ifndef ROCKSDB_LITE
SuggestCompactRange(ColumnFamilyHandle * column_family,const Slice * begin,const Slice * end)23 Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
24                                    const Slice* begin, const Slice* end) {
25   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
26   auto cfd = cfh->cfd();
27   InternalKey start_key, end_key;
28   if (begin != nullptr) {
29     start_key.SetMinPossibleForUserKey(*begin);
30   }
31   if (end != nullptr) {
32     end_key.SetMaxPossibleForUserKey(*end);
33   }
34   {
35     InstrumentedMutexLock l(&mutex_);
36     auto vstorage = cfd->current()->storage_info();
37     for (int level = 0; level < vstorage->num_non_empty_levels() - 1; ++level) {
38       std::vector<FileMetaData*> inputs;
39       vstorage->GetOverlappingInputs(
40           level, begin == nullptr ? nullptr : &start_key,
41           end == nullptr ? nullptr : &end_key, &inputs);
42       for (auto f : inputs) {
43         f->marked_for_compaction = true;
44       }
45     }
46     // Since we have some more files to compact, we should also recompute
47     // compaction score
48     vstorage->ComputeCompactionScore(*cfd->ioptions(),
49                                      *cfd->GetLatestMutableCFOptions());
50     SchedulePendingCompaction(cfd);
51     MaybeScheduleFlushOrCompaction();
52   }
53   return Status::OK();
54 }
55 
PromoteL0(ColumnFamilyHandle * column_family,int target_level)56 Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
57   assert(column_family);
58 
59   if (target_level < 1) {
60     ROCKS_LOG_INFO(immutable_db_options_.info_log,
61                    "PromoteL0 FAILED. Invalid target level %d\n", target_level);
62     return Status::InvalidArgument("Invalid target level");
63   }
64 
65   Status status;
66   VersionEdit edit;
67   JobContext job_context(next_job_id_.fetch_add(1), true);
68   {
69     InstrumentedMutexLock l(&mutex_);
70     auto* cfd = static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
71     const auto* vstorage = cfd->current()->storage_info();
72 
73     if (target_level >= vstorage->num_levels()) {
74       ROCKS_LOG_INFO(immutable_db_options_.info_log,
75                      "PromoteL0 FAILED. Target level %d does not exist\n",
76                      target_level);
77       job_context.Clean();
78       return Status::InvalidArgument("Target level does not exist");
79     }
80 
81     // Sort L0 files by range.
82     const InternalKeyComparator* icmp = &cfd->internal_comparator();
83     auto l0_files = vstorage->LevelFiles(0);
84     std::sort(l0_files.begin(), l0_files.end(),
85               [icmp](FileMetaData* f1, FileMetaData* f2) {
86                 return icmp->Compare(f1->largest, f2->largest) < 0;
87               });
88 
89     // Check that no L0 file is being compacted and that they have
90     // non-overlapping ranges.
91     for (size_t i = 0; i < l0_files.size(); ++i) {
92       auto f = l0_files[i];
93       if (f->being_compacted) {
94         ROCKS_LOG_INFO(immutable_db_options_.info_log,
95                        "PromoteL0 FAILED. File %" PRIu64 " being compacted\n",
96                        f->fd.GetNumber());
97         job_context.Clean();
98         return Status::InvalidArgument("PromoteL0 called during L0 compaction");
99       }
100 
101       if (i == 0) continue;
102       auto prev_f = l0_files[i - 1];
103       if (icmp->Compare(prev_f->largest, f->smallest) >= 0) {
104         ROCKS_LOG_INFO(immutable_db_options_.info_log,
105                        "PromoteL0 FAILED. Files %" PRIu64 " and %" PRIu64
106                        " have overlapping ranges\n",
107                        prev_f->fd.GetNumber(), f->fd.GetNumber());
108         job_context.Clean();
109         return Status::InvalidArgument("L0 has overlapping files");
110       }
111     }
112 
113     // Check that all levels up to target_level are empty.
114     for (int level = 1; level <= target_level; ++level) {
115       if (vstorage->NumLevelFiles(level) > 0) {
116         ROCKS_LOG_INFO(immutable_db_options_.info_log,
117                        "PromoteL0 FAILED. Level %d not empty\n", level);
118         job_context.Clean();
119         return Status::InvalidArgument(
120             "All levels up to target_level "
121             "must be empty");
122       }
123     }
124 
125     edit.SetColumnFamily(cfd->GetID());
126     for (const auto& f : l0_files) {
127       edit.DeleteFile(0, f->fd.GetNumber());
128       edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
129                    f->fd.GetFileSize(), f->smallest, f->largest,
130                    f->fd.smallest_seqno, f->fd.largest_seqno,
131                    f->marked_for_compaction, f->oldest_blob_file_number,
132                    f->oldest_ancester_time, f->file_creation_time,
133                    f->file_checksum, f->file_checksum_func_name);
134     }
135 
136     status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
137                                     &edit, &mutex_, directories_.GetDbDir());
138     if (status.ok()) {
139       InstallSuperVersionAndScheduleWork(cfd,
140                                          &job_context.superversion_contexts[0],
141                                          *cfd->GetLatestMutableCFOptions());
142     }
143   }  // lock released here
144   LogFlush(immutable_db_options_.info_log);
145   job_context.Clean();
146 
147   return status;
148 }
149 #endif  // ROCKSDB_LITE
150 
151 }  // namespace ROCKSDB_NAMESPACE
152