1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include <mutex>
9 #include <string>
10 #include <thread>
11 #include <vector>
12 
13 #include "db/db_impl/db_impl.h"
14 #include "port/port.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/env.h"
17 #include "test_util/sync_point.h"
18 #include "test_util/testharness.h"
19 #include "util/string_util.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 class CompactFilesTest : public testing::Test {
24  public:
CompactFilesTest()25   CompactFilesTest() {
26     env_ = Env::Default();
27     db_name_ = test::PerThreadDBPath("compact_files_test");
28   }
29 
30   std::string db_name_;
31   Env* env_;
32 };
33 
34 // A class which remembers the name of each flushed file.
35 class FlushedFileCollector : public EventListener {
36  public:
FlushedFileCollector()37   FlushedFileCollector() {}
~FlushedFileCollector()38   ~FlushedFileCollector() override {}
39 
OnFlushCompleted(DB *,const FlushJobInfo & info)40   void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
41     std::lock_guard<std::mutex> lock(mutex_);
42     flushed_files_.push_back(info.file_path);
43   }
44 
GetFlushedFiles()45   std::vector<std::string> GetFlushedFiles() {
46     std::lock_guard<std::mutex> lock(mutex_);
47     std::vector<std::string> result;
48     for (auto fname : flushed_files_) {
49       result.push_back(fname);
50     }
51     return result;
52   }
ClearFlushedFiles()53   void ClearFlushedFiles() {
54     std::lock_guard<std::mutex> lock(mutex_);
55     flushed_files_.clear();
56   }
57 
58  private:
59   std::vector<std::string> flushed_files_;
60   std::mutex mutex_;
61 };
62 
TEST_F(CompactFilesTest,L0ConflictsFiles)63 TEST_F(CompactFilesTest, L0ConflictsFiles) {
64   Options options;
65   // to trigger compaction more easily
66   const int kWriteBufferSize = 10000;
67   const int kLevel0Trigger = 2;
68   options.create_if_missing = true;
69   options.compaction_style = kCompactionStyleLevel;
70   // Small slowdown and stop trigger for experimental purpose.
71   options.level0_slowdown_writes_trigger = 20;
72   options.level0_stop_writes_trigger = 20;
73   options.level0_stop_writes_trigger = 20;
74   options.write_buffer_size = kWriteBufferSize;
75   options.level0_file_num_compaction_trigger = kLevel0Trigger;
76   options.compression = kNoCompression;
77 
78   DB* db = nullptr;
79   DestroyDB(db_name_, options);
80   Status s = DB::Open(options, db_name_, &db);
81   assert(s.ok());
82   assert(db);
83 
84   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
85       {"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
86       {"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
87   });
88   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
89 
90   // create couple files
91   // Background compaction starts and waits in BackgroundCallCompaction:0
92   for (int i = 0; i < kLevel0Trigger * 4; ++i) {
93     db->Put(WriteOptions(), ToString(i), "");
94     db->Put(WriteOptions(), ToString(100 - i), "");
95     db->Flush(FlushOptions());
96   }
97 
98   ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
99   db->GetColumnFamilyMetaData(&meta);
100   std::string file1;
101   for (auto& file : meta.levels[0].files) {
102     ASSERT_EQ(0, meta.levels[0].level);
103     if (file1 == "") {
104       file1 = file.db_path + "/" + file.name;
105     } else {
106       std::string file2 = file.db_path + "/" + file.name;
107       // Another thread starts a compact files and creates an L0 compaction
108       // The background compaction then notices that there is an L0 compaction
109       // already in progress and doesn't do an L0 compaction
110       // Once the background compaction finishes, the compact files finishes
111       ASSERT_OK(db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
112                                  {file1, file2}, 0));
113       break;
114     }
115   }
116   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
117   delete db;
118 }
119 
TEST_F(CompactFilesTest,ObsoleteFiles)120 TEST_F(CompactFilesTest, ObsoleteFiles) {
121   Options options;
122   // to trigger compaction more easily
123   const int kWriteBufferSize = 65536;
124   options.create_if_missing = true;
125   // Disable RocksDB background compaction.
126   options.compaction_style = kCompactionStyleNone;
127   options.level0_slowdown_writes_trigger = (1 << 30);
128   options.level0_stop_writes_trigger = (1 << 30);
129   options.write_buffer_size = kWriteBufferSize;
130   options.max_write_buffer_number = 2;
131   options.compression = kNoCompression;
132 
133   // Add listener
134   FlushedFileCollector* collector = new FlushedFileCollector();
135   options.listeners.emplace_back(collector);
136 
137   DB* db = nullptr;
138   DestroyDB(db_name_, options);
139   Status s = DB::Open(options, db_name_, &db);
140   assert(s.ok());
141   assert(db);
142 
143   // create couple files
144   for (int i = 1000; i < 2000; ++i) {
145     db->Put(WriteOptions(), ToString(i),
146             std::string(kWriteBufferSize / 10, 'a' + (i % 26)));
147   }
148 
149   auto l0_files = collector->GetFlushedFiles();
150   ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
151   reinterpret_cast<DBImpl*>(db)->TEST_WaitForCompact();
152 
153   // verify all compaction input files are deleted
154   for (auto fname : l0_files) {
155     ASSERT_EQ(Status::NotFound(), env_->FileExists(fname));
156   }
157   delete db;
158 }
159 
TEST_F(CompactFilesTest,NotCutOutputOnLevel0)160 TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
161   Options options;
162   options.create_if_missing = true;
163   // Disable RocksDB background compaction.
164   options.compaction_style = kCompactionStyleNone;
165   options.level0_slowdown_writes_trigger = 1000;
166   options.level0_stop_writes_trigger = 1000;
167   options.write_buffer_size = 65536;
168   options.max_write_buffer_number = 2;
169   options.compression = kNoCompression;
170   options.max_compaction_bytes = 5000;
171 
172   // Add listener
173   FlushedFileCollector* collector = new FlushedFileCollector();
174   options.listeners.emplace_back(collector);
175 
176   DB* db = nullptr;
177   DestroyDB(db_name_, options);
178   Status s = DB::Open(options, db_name_, &db);
179   assert(s.ok());
180   assert(db);
181 
182   // create couple files
183   for (int i = 0; i < 500; ++i) {
184     db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
185   }
186   reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
187   auto l0_files_1 = collector->GetFlushedFiles();
188   collector->ClearFlushedFiles();
189   for (int i = 0; i < 500; ++i) {
190     db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
191   }
192   reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
193   auto l0_files_2 = collector->GetFlushedFiles();
194   ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
195   ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
196   // no assertion failure
197   delete db;
198 }
199 
TEST_F(CompactFilesTest,CapturingPendingFiles)200 TEST_F(CompactFilesTest, CapturingPendingFiles) {
201   Options options;
202   options.create_if_missing = true;
203   // Disable RocksDB background compaction.
204   options.compaction_style = kCompactionStyleNone;
205   // Always do full scans for obsolete files (needed to reproduce the issue).
206   options.delete_obsolete_files_period_micros = 0;
207 
208   // Add listener.
209   FlushedFileCollector* collector = new FlushedFileCollector();
210   options.listeners.emplace_back(collector);
211 
212   DB* db = nullptr;
213   DestroyDB(db_name_, options);
214   Status s = DB::Open(options, db_name_, &db);
215   assert(s.ok());
216   assert(db);
217 
218   // Create 5 files.
219   for (int i = 0; i < 5; ++i) {
220     db->Put(WriteOptions(), "key" + ToString(i), "value");
221     db->Flush(FlushOptions());
222   }
223 
224   auto l0_files = collector->GetFlushedFiles();
225   EXPECT_EQ(5, l0_files.size());
226 
227   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
228       {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
229       {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
230   });
231   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
232 
233   // Start compacting files.
234   ROCKSDB_NAMESPACE::port::Thread compaction_thread(
235       [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });
236 
237   // In the meantime flush another file.
238   TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
239   db->Put(WriteOptions(), "key5", "value");
240   db->Flush(FlushOptions());
241   TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
242 
243   compaction_thread.join();
244 
245   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
246 
247   delete db;
248 
249   // Make sure we can reopen the DB.
250   s = DB::Open(options, db_name_, &db);
251   ASSERT_TRUE(s.ok());
252   assert(db);
253   delete db;
254 }
255 
TEST_F(CompactFilesTest,CompactionFilterWithGetSv)256 TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
257   class FilterWithGet : public CompactionFilter {
258    public:
259     bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
260                 std::string* /*new_value*/,
261                 bool* /*value_changed*/) const override {
262       if (db_ == nullptr) {
263         return true;
264       }
265       std::string res;
266       db_->Get(ReadOptions(), "", &res);
267       return true;
268     }
269 
270     void SetDB(DB* db) {
271       db_ = db;
272     }
273 
274     const char* Name() const override { return "FilterWithGet"; }
275 
276    private:
277     DB* db_;
278   };
279 
280 
281   std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
282 
283   Options options;
284   options.create_if_missing = true;
285   options.compaction_filter = cf.get();
286 
287   DB* db = nullptr;
288   DestroyDB(db_name_, options);
289   Status s = DB::Open(options, db_name_, &db);
290   ASSERT_OK(s);
291 
292   cf->SetDB(db);
293 
294   // Write one L0 file
295   db->Put(WriteOptions(), "K1", "V1");
296   db->Flush(FlushOptions());
297 
298   // Compact all L0 files using CompactFiles
299   ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
300   db->GetColumnFamilyMetaData(&meta);
301   for (auto& file : meta.levels[0].files) {
302     std::string fname = file.db_path + "/" + file.name;
303     ASSERT_OK(
304         db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0));
305   }
306 
307 
308   delete db;
309 }
310 
TEST_F(CompactFilesTest,SentinelCompressionType)311 TEST_F(CompactFilesTest, SentinelCompressionType) {
312   if (!Zlib_Supported()) {
313     fprintf(stderr, "zlib compression not supported, skip this test\n");
314     return;
315   }
316   if (!Snappy_Supported()) {
317     fprintf(stderr, "snappy compression not supported, skip this test\n");
318     return;
319   }
320   // Check that passing `CompressionType::kDisableCompressionOption` to
321   // `CompactFiles` causes it to use the column family compression options.
322   for (auto compaction_style :
323        {CompactionStyle::kCompactionStyleLevel,
324         CompactionStyle::kCompactionStyleUniversal,
325         CompactionStyle::kCompactionStyleNone}) {
326     DestroyDB(db_name_, Options());
327     Options options;
328     options.compaction_style = compaction_style;
329     // L0: Snappy, L1: ZSTD, L2: Snappy
330     options.compression_per_level = {CompressionType::kSnappyCompression,
331                                      CompressionType::kZlibCompression,
332                                      CompressionType::kSnappyCompression};
333     options.create_if_missing = true;
334     FlushedFileCollector* collector = new FlushedFileCollector();
335     options.listeners.emplace_back(collector);
336     DB* db = nullptr;
337     ASSERT_OK(DB::Open(options, db_name_, &db));
338 
339     db->Put(WriteOptions(), "key", "val");
340     db->Flush(FlushOptions());
341 
342     auto l0_files = collector->GetFlushedFiles();
343     ASSERT_EQ(1, l0_files.size());
344 
345     // L0->L1 compaction, so output should be ZSTD-compressed
346     CompactionOptions compaction_opts;
347     compaction_opts.compression = CompressionType::kDisableCompressionOption;
348     ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1));
349 
350     ROCKSDB_NAMESPACE::TablePropertiesCollection all_tables_props;
351     ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props));
352     for (const auto& name_and_table_props : all_tables_props) {
353       ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression),
354                 name_and_table_props.second->compression_name);
355     }
356     delete db;
357   }
358 }
359 
TEST_F(CompactFilesTest,GetCompactionJobInfo)360 TEST_F(CompactFilesTest, GetCompactionJobInfo) {
361   Options options;
362   options.create_if_missing = true;
363   // Disable RocksDB background compaction.
364   options.compaction_style = kCompactionStyleNone;
365   options.level0_slowdown_writes_trigger = 1000;
366   options.level0_stop_writes_trigger = 1000;
367   options.write_buffer_size = 65536;
368   options.max_write_buffer_number = 2;
369   options.compression = kNoCompression;
370   options.max_compaction_bytes = 5000;
371 
372   // Add listener
373   FlushedFileCollector* collector = new FlushedFileCollector();
374   options.listeners.emplace_back(collector);
375 
376   DB* db = nullptr;
377   DestroyDB(db_name_, options);
378   Status s = DB::Open(options, db_name_, &db);
379   assert(s.ok());
380   assert(db);
381 
382   // create couple files
383   for (int i = 0; i < 500; ++i) {
384     db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
385   }
386   reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
387   auto l0_files_1 = collector->GetFlushedFiles();
388   CompactionOptions co;
389   co.compression = CompressionType::kLZ4Compression;
390   CompactionJobInfo compaction_job_info{};
391   ASSERT_OK(
392       db->CompactFiles(co, l0_files_1, 0, -1, nullptr, &compaction_job_info));
393   ASSERT_EQ(compaction_job_info.base_input_level, 0);
394   ASSERT_EQ(compaction_job_info.cf_id, db->DefaultColumnFamily()->GetID());
395   ASSERT_EQ(compaction_job_info.cf_name, db->DefaultColumnFamily()->GetName());
396   ASSERT_EQ(compaction_job_info.compaction_reason,
397             CompactionReason::kManualCompaction);
398   ASSERT_EQ(compaction_job_info.compression, CompressionType::kLZ4Compression);
399   ASSERT_EQ(compaction_job_info.output_level, 0);
400   ASSERT_OK(compaction_job_info.status);
401   // no assertion failure
402   delete db;
403 }
404 
405 }  // namespace ROCKSDB_NAMESPACE
406 
main(int argc,char ** argv)407 int main(int argc, char** argv) {
408   ::testing::InitGoogleTest(&argc, argv);
409   return RUN_ALL_TESTS();
410 }
411 
412 #else
413 #include <stdio.h>
414 
main(int,char **)415 int main(int /*argc*/, char** /*argv*/) {
416   fprintf(stderr,
417           "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n");
418   return 0;
419 }
420 
421 #endif  // !ROCKSDB_LITE
422