1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/compaction/compaction_picker_fifo.h"
11 #ifndef ROCKSDB_LITE
12 
13 #include <cinttypes>
14 #include <string>
15 #include <vector>
16 #include "db/column_family.h"
17 #include "logging/log_buffer.h"
18 #include "util/string_util.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 namespace {
GetTotalFilesSize(const std::vector<FileMetaData * > & files)22 uint64_t GetTotalFilesSize(const std::vector<FileMetaData*>& files) {
23   uint64_t total_size = 0;
24   for (const auto& f : files) {
25     total_size += f->fd.file_size;
26   }
27   return total_size;
28 }
29 }  // anonymous namespace
30 
NeedsCompaction(const VersionStorageInfo * vstorage) const31 bool FIFOCompactionPicker::NeedsCompaction(
32     const VersionStorageInfo* vstorage) const {
33   const int kLevel0 = 0;
34   return vstorage->CompactionScore(kLevel0) >= 1;
35 }
36 
PickTTLCompaction(const std::string & cf_name,const MutableCFOptions & mutable_cf_options,VersionStorageInfo * vstorage,LogBuffer * log_buffer)37 Compaction* FIFOCompactionPicker::PickTTLCompaction(
38     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
39     VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
40   assert(mutable_cf_options.ttl > 0);
41 
42   const int kLevel0 = 0;
43   const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
44   uint64_t total_size = GetTotalFilesSize(level_files);
45 
46   int64_t _current_time;
47   auto status = ioptions_.env->GetCurrentTime(&_current_time);
48   if (!status.ok()) {
49     ROCKS_LOG_BUFFER(log_buffer,
50                      "[%s] FIFO compaction: Couldn't get current time: %s. "
51                      "Not doing compactions based on TTL. ",
52                      cf_name.c_str(), status.ToString().c_str());
53     return nullptr;
54   }
55   const uint64_t current_time = static_cast<uint64_t>(_current_time);
56 
57   if (!level0_compactions_in_progress_.empty()) {
58     ROCKS_LOG_BUFFER(
59         log_buffer,
60         "[%s] FIFO compaction: Already executing compaction. No need "
61         "to run parallel compactions since compactions are very fast",
62         cf_name.c_str());
63     return nullptr;
64   }
65 
66   std::vector<CompactionInputFiles> inputs;
67   inputs.emplace_back();
68   inputs[0].level = 0;
69 
70   // avoid underflow
71   if (current_time > mutable_cf_options.ttl) {
72     for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
73       FileMetaData* f = *ritr;
74       uint64_t creation_time = f->TryGetFileCreationTime();
75       if (creation_time == kUnknownFileCreationTime ||
76           creation_time >= (current_time - mutable_cf_options.ttl)) {
77         break;
78       }
79       total_size -= f->compensated_file_size;
80       inputs[0].files.push_back(f);
81     }
82   }
83 
84   // Return a nullptr and proceed to size-based FIFO compaction if:
85   // 1. there are no files older than ttl OR
86   // 2. there are a few files older than ttl, but deleting them will not bring
87   //    the total size to be less than max_table_files_size threshold.
88   if (inputs[0].files.empty() ||
89       total_size >
90           mutable_cf_options.compaction_options_fifo.max_table_files_size) {
91     return nullptr;
92   }
93 
94   for (const auto& f : inputs[0].files) {
95     ROCKS_LOG_BUFFER(log_buffer,
96                      "[%s] FIFO compaction: picking file %" PRIu64
97                      " with creation time %" PRIu64 " for deletion",
98                      cf_name.c_str(), f->fd.GetNumber(),
99                      f->TryGetFileCreationTime());
100   }
101 
102   Compaction* c = new Compaction(
103       vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
104       kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
105       {}, /* is manual */ false, vstorage->CompactionScore(0),
106       /* is deletion compaction */ true, CompactionReason::kFIFOTtl);
107   return c;
108 }
109 
PickSizeCompaction(const std::string & cf_name,const MutableCFOptions & mutable_cf_options,VersionStorageInfo * vstorage,LogBuffer * log_buffer)110 Compaction* FIFOCompactionPicker::PickSizeCompaction(
111     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
112     VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
113   const int kLevel0 = 0;
114   const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
115   uint64_t total_size = GetTotalFilesSize(level_files);
116 
117   if (total_size <=
118           mutable_cf_options.compaction_options_fifo.max_table_files_size ||
119       level_files.size() == 0) {
120     // total size not exceeded
121     if (mutable_cf_options.compaction_options_fifo.allow_compaction &&
122         level_files.size() > 0) {
123       CompactionInputFiles comp_inputs;
124       // try to prevent same files from being compacted multiple times, which
125       // could produce large files that may never TTL-expire. Achieve this by
126       // disallowing compactions with files larger than memtable (inflate its
127       // size by 10% to account for uncompressed L0 files that may have size
128       // slightly greater than memtable size limit).
129       size_t max_compact_bytes_per_del_file =
130           static_cast<size_t>(MultiplyCheckOverflow(
131               static_cast<uint64_t>(mutable_cf_options.write_buffer_size),
132               1.1));
133       if (FindIntraL0Compaction(
134               level_files,
135               mutable_cf_options
136                   .level0_file_num_compaction_trigger /* min_files_to_compact */
137               ,
138               max_compact_bytes_per_del_file,
139               mutable_cf_options.max_compaction_bytes, &comp_inputs)) {
140         Compaction* c = new Compaction(
141             vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0,
142             16 * 1024 * 1024 /* output file size limit */,
143             0 /* max compaction bytes, not applicable */,
144             0 /* output path ID */, mutable_cf_options.compression,
145             ioptions_.compression_opts, 0 /* max_subcompactions */, {},
146             /* is manual */ false, vstorage->CompactionScore(0),
147             /* is deletion compaction */ false,
148             CompactionReason::kFIFOReduceNumFiles);
149         return c;
150       }
151     }
152 
153     ROCKS_LOG_BUFFER(
154         log_buffer,
155         "[%s] FIFO compaction: nothing to do. Total size %" PRIu64
156         ", max size %" PRIu64 "\n",
157         cf_name.c_str(), total_size,
158         mutable_cf_options.compaction_options_fifo.max_table_files_size);
159     return nullptr;
160   }
161 
162   if (!level0_compactions_in_progress_.empty()) {
163     ROCKS_LOG_BUFFER(
164         log_buffer,
165         "[%s] FIFO compaction: Already executing compaction. No need "
166         "to run parallel compactions since compactions are very fast",
167         cf_name.c_str());
168     return nullptr;
169   }
170 
171   std::vector<CompactionInputFiles> inputs;
172   inputs.emplace_back();
173   inputs[0].level = 0;
174 
175   for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
176     auto f = *ritr;
177     total_size -= f->compensated_file_size;
178     inputs[0].files.push_back(f);
179     char tmp_fsize[16];
180     AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
181     ROCKS_LOG_BUFFER(log_buffer,
182                      "[%s] FIFO compaction: picking file %" PRIu64
183                      " with size %s for deletion",
184                      cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
185     if (total_size <=
186         mutable_cf_options.compaction_options_fifo.max_table_files_size) {
187       break;
188     }
189   }
190 
191   Compaction* c = new Compaction(
192       vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
193       kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
194       {}, /* is manual */ false, vstorage->CompactionScore(0),
195       /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
196   return c;
197 }
198 
PickCompaction(const std::string & cf_name,const MutableCFOptions & mutable_cf_options,VersionStorageInfo * vstorage,LogBuffer * log_buffer,SequenceNumber)199 Compaction* FIFOCompactionPicker::PickCompaction(
200     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
201     VersionStorageInfo* vstorage, LogBuffer* log_buffer,
202     SequenceNumber /*earliest_memtable_seqno*/) {
203   assert(vstorage->num_levels() == 1);
204 
205   Compaction* c = nullptr;
206   if (mutable_cf_options.ttl > 0) {
207     c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
208   }
209   if (c == nullptr) {
210     c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
211   }
212   RegisterCompaction(c);
213   return c;
214 }
215 
CompactRange(const std::string & cf_name,const MutableCFOptions & mutable_cf_options,VersionStorageInfo * vstorage,int input_level,int output_level,const CompactRangeOptions &,const InternalKey *,const InternalKey *,InternalKey ** compaction_end,bool *,uint64_t)216 Compaction* FIFOCompactionPicker::CompactRange(
217     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
218     VersionStorageInfo* vstorage, int input_level, int output_level,
219     const CompactRangeOptions& /*compact_range_options*/,
220     const InternalKey* /*begin*/, const InternalKey* /*end*/,
221     InternalKey** compaction_end, bool* /*manual_conflict*/,
222     uint64_t /*max_file_num_to_ignore*/) {
223 #ifdef NDEBUG
224   (void)input_level;
225   (void)output_level;
226 #endif
227   assert(input_level == 0);
228   assert(output_level == 0);
229   *compaction_end = nullptr;
230   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
231   Compaction* c =
232       PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
233   log_buffer.FlushBufferToLog();
234   return c;
235 }
236 
237 }  // namespace ROCKSDB_NAMESPACE
238 #endif  // !ROCKSDB_LITE
239