1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 //
7 // The goal of this tool is to be a simple stress test with focus on catching:
8 // * bugs in compaction/flush processes, especially the ones that cause
9 // assertion errors
10 // * bugs in the code that deletes obsolete files
11 //
12 // There are two parts of the test:
13 // * write_stress, a binary that writes to the database
14 // * write_stress_runner.py, a script that invokes and kills write_stress
15 //
16 // Here are some interesting parts of write_stress:
17 // * Runs with very high concurrency of compactions and flushes (32 threads
18 // total) and tries to create a huge amount of small files
19 // * The keys written to the database are not uniformly distributed -- there is
20 // a 3-character prefix that mutates occasionally (in prefix mutator thread), in
21 // such a way that the first character mutates slower than second, which mutates
22 // slower than third character. That way, the compaction stress tests some
23 // interesting compaction features like trivial moves and bottommost level
24 // calculation
25 // * There is a thread that creates an iterator, holds it for couple of seconds
26 // and then iterates over all keys. This is supposed to test RocksDB's abilities
27 // to keep the files alive when there are references to them.
28 // * Some writes trigger WAL sync. This is stress testing our WAL sync code.
29 // * At the end of the run, we make sure that we didn't leak any of the sst
30 // files
31 //
32 // write_stress_runner.py changes the mode in which we run write_stress and also
33 // kills and restarts it. There are some interesting characteristics:
34 // * At the beginning we divide the full test runtime into smaller parts --
35 // shorter runtimes (couple of seconds) and longer runtimes (100, 1000) seconds
36 // * The first time we run write_stress, we destroy the old DB. Every next time
37 // during the test, we use the same DB.
38 // * We can run in kill mode or clean-restart mode. Kill mode kills the
39 // write_stress violently.
40 // * We can run in mode where delete_obsolete_files_with_fullscan is true or
41 // false
42 // * We can run with low_open_files mode turned on or off. When it's turned on,
43 // we configure table cache to only hold a couple of files -- that way we need
44 // to reopen files every time we access them.
45 //
46 // Another goal was to create a stress test without a lot of parameters. So
47 // tools/write_stress_runner.py should only take one parameter -- runtime_sec
48 // and it should figure out everything else on its own.
49 
50 #include <cstdio>
51 
52 #ifndef GFLAGS
main()53 int main() {
54   fprintf(stderr, "Please install gflags to run rocksdb tools\n");
55   return 1;
56 }
57 #else
58 
59 #include <atomic>
60 #include <cinttypes>
61 #include <random>
62 #include <set>
63 #include <string>
64 #include <thread>
65 
66 #include "file/filename.h"
67 #include "port/port.h"
68 #include "rocksdb/db.h"
69 #include "rocksdb/env.h"
70 #include "rocksdb/options.h"
71 #include "rocksdb/slice.h"
72 #include "util/gflags_compat.h"
73 
74 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
75 using GFLAGS_NAMESPACE::RegisterFlagValidator;
76 using GFLAGS_NAMESPACE::SetUsageMessage;
77 
78 DEFINE_int32(key_size, 10, "Key size");
79 DEFINE_int32(value_size, 100, "Value size");
80 DEFINE_string(db, "", "Use the db with the following name.");
81 DEFINE_bool(destroy_db, true,
82             "Destroy the existing DB before running the test");
83 
84 DEFINE_int32(runtime_sec, 10 * 60, "How long are we running for, in seconds");
85 DEFINE_int32(seed, 139, "Random seed");
86 
87 DEFINE_double(prefix_mutate_period_sec, 1.0,
88               "How often are we going to mutate the prefix");
89 DEFINE_double(first_char_mutate_probability, 0.1,
90               "How likely are we to mutate the first char every period");
91 DEFINE_double(second_char_mutate_probability, 0.2,
92               "How likely are we to mutate the second char every period");
93 DEFINE_double(third_char_mutate_probability, 0.5,
94               "How likely are we to mutate the third char every period");
95 
96 DEFINE_int32(iterator_hold_sec, 5,
97              "How long will the iterator hold files before it gets destroyed");
98 
99 DEFINE_double(sync_probability, 0.01, "How often are we syncing writes");
100 DEFINE_bool(delete_obsolete_files_with_fullscan, false,
101             "If true, we delete obsolete files after each compaction/flush "
102             "using GetChildren() API");
103 DEFINE_bool(low_open_files_mode, false,
104             "If true, we set max_open_files to 20, so that every file access "
105             "needs to reopen it");
106 
107 namespace ROCKSDB_NAMESPACE {
108 
109 static const int kPrefixSize = 3;
110 
111 class WriteStress {
112  public:
WriteStress()113   WriteStress() : stop_(false) {
114     // initialize key_prefix
115     for (int i = 0; i < kPrefixSize; ++i) {
116       key_prefix_[i].store('a');
117     }
118 
119     // Choose a location for the test database if none given with --db=<path>
120     if (FLAGS_db.empty()) {
121       std::string default_db_path;
122       Env::Default()->GetTestDirectory(&default_db_path);
123       default_db_path += "/write_stress";
124       FLAGS_db = default_db_path;
125     }
126 
127     Options options;
128     if (FLAGS_destroy_db) {
129       DestroyDB(FLAGS_db, options);  // ignore
130     }
131 
132     // make the LSM tree deep, so that we have many concurrent flushes and
133     // compactions
134     options.create_if_missing = true;
135     options.write_buffer_size = 256 * 1024;              // 256k
136     options.max_bytes_for_level_base = 1 * 1024 * 1024;  // 1MB
137     options.target_file_size_base = 100 * 1024;          // 100k
138     options.max_write_buffer_number = 16;
139     options.max_background_compactions = 16;
140     options.max_background_flushes = 16;
141     options.max_open_files = FLAGS_low_open_files_mode ? 20 : -1;
142     if (FLAGS_delete_obsolete_files_with_fullscan) {
143       options.delete_obsolete_files_period_micros = 0;
144     }
145 
146     // open DB
147     DB* db;
148     Status s = DB::Open(options, FLAGS_db, &db);
149     if (!s.ok()) {
150       fprintf(stderr, "Can't open database: %s\n", s.ToString().c_str());
151       std::abort();
152     }
153     db_.reset(db);
154   }
155 
WriteThread()156   void WriteThread() {
157     std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed));
158     std::uniform_real_distribution<double> dist(0, 1);
159 
160     auto random_string = [](std::mt19937& r, int len) {
161       std::uniform_int_distribution<int> char_dist('a', 'z');
162       std::string ret;
163       for (int i = 0; i < len; ++i) {
164         ret += static_cast<char>(char_dist(r));
165       }
166       return ret;
167     };
168 
169     while (!stop_.load(std::memory_order_relaxed)) {
170       std::string prefix;
171       prefix.resize(kPrefixSize);
172       for (int i = 0; i < kPrefixSize; ++i) {
173         prefix[i] = key_prefix_[i].load(std::memory_order_relaxed);
174       }
175       auto key = prefix + random_string(rng, FLAGS_key_size - kPrefixSize);
176       auto value = random_string(rng, FLAGS_value_size);
177       WriteOptions woptions;
178       woptions.sync = dist(rng) < FLAGS_sync_probability;
179       auto s = db_->Put(woptions, key, value);
180       if (!s.ok()) {
181         fprintf(stderr, "Write to DB failed: %s\n", s.ToString().c_str());
182         std::abort();
183       }
184     }
185   }
186 
IteratorHoldThread()187   void IteratorHoldThread() {
188     while (!stop_.load(std::memory_order_relaxed)) {
189       std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
190       Env::Default()->SleepForMicroseconds(FLAGS_iterator_hold_sec * 1000 *
191                                            1000LL);
192       for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
193       }
194       if (!iterator->status().ok()) {
195         fprintf(stderr, "Iterator statuts not OK: %s\n",
196                 iterator->status().ToString().c_str());
197         std::abort();
198       }
199     }
200   }
201 
PrefixMutatorThread()202   void PrefixMutatorThread() {
203     std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed));
204     std::uniform_real_distribution<double> dist(0, 1);
205     std::uniform_int_distribution<int> char_dist('a', 'z');
206     while (!stop_.load(std::memory_order_relaxed)) {
207       Env::Default()->SleepForMicroseconds(static_cast<int>(
208                                            FLAGS_prefix_mutate_period_sec *
209                                            1000 * 1000LL));
210       if (dist(rng) < FLAGS_first_char_mutate_probability) {
211         key_prefix_[0].store(static_cast<char>(char_dist(rng)), std::memory_order_relaxed);
212       }
213       if (dist(rng) < FLAGS_second_char_mutate_probability) {
214         key_prefix_[1].store(static_cast<char>(char_dist(rng)), std::memory_order_relaxed);
215       }
216       if (dist(rng) < FLAGS_third_char_mutate_probability) {
217         key_prefix_[2].store(static_cast<char>(char_dist(rng)), std::memory_order_relaxed);
218       }
219     }
220   }
221 
Run()222   int Run() {
223     threads_.emplace_back([&]() { WriteThread(); });
224     threads_.emplace_back([&]() { PrefixMutatorThread(); });
225     threads_.emplace_back([&]() { IteratorHoldThread(); });
226 
227     if (FLAGS_runtime_sec == -1) {
228       // infinite runtime, until we get killed
229       while (true) {
230         Env::Default()->SleepForMicroseconds(1000 * 1000);
231       }
232     }
233 
234     Env::Default()->SleepForMicroseconds(FLAGS_runtime_sec * 1000 * 1000);
235 
236     stop_.store(true, std::memory_order_relaxed);
237     for (auto& t : threads_) {
238       t.join();
239     }
240     threads_.clear();
241 
242 // Skip checking for leaked files in ROCKSDB_LITE since we don't have access to
243 // function GetLiveFilesMetaData
244 #ifndef ROCKSDB_LITE
245     // let's see if we leaked some files
246     db_->PauseBackgroundWork();
247     std::vector<LiveFileMetaData> metadata;
248     db_->GetLiveFilesMetaData(&metadata);
249     std::set<uint64_t> sst_file_numbers;
250     for (const auto& file : metadata) {
251       uint64_t number;
252       FileType type;
253       if (!ParseFileName(file.name, &number, "LOG", &type)) {
254         continue;
255       }
256       if (type == kTableFile) {
257         sst_file_numbers.insert(number);
258       }
259     }
260 
261     std::vector<std::string> children;
262     Env::Default()->GetChildren(FLAGS_db, &children);
263     for (const auto& child : children) {
264       uint64_t number;
265       FileType type;
266       if (!ParseFileName(child, &number, "LOG", &type)) {
267         continue;
268       }
269       if (type == kTableFile) {
270         if (sst_file_numbers.find(number) == sst_file_numbers.end()) {
271           fprintf(stderr,
272                   "Found a table file in DB path that should have been "
273                   "deleted: %s\n",
274                   child.c_str());
275           std::abort();
276         }
277       }
278     }
279     db_->ContinueBackgroundWork();
280 #endif  // !ROCKSDB_LITE
281 
282     return 0;
283   }
284 
285  private:
286   // each key is prepended with this prefix. we occasionally change it. third
287   // letter is changed more frequently than second, which is changed more
288   // frequently than the first one.
289   std::atomic<char> key_prefix_[kPrefixSize];
290   std::atomic<bool> stop_;
291   std::vector<port::Thread> threads_;
292   std::unique_ptr<DB> db_;
293 };
294 
295 }  // namespace ROCKSDB_NAMESPACE
296 
main(int argc,char ** argv)297 int main(int argc, char** argv) {
298   SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
299                   " [OPTIONS]...");
300   ParseCommandLineFlags(&argc, &argv, true);
301   ROCKSDB_NAMESPACE::WriteStress write_stress;
302   return write_stress.Run();
303 }
304 
305 #endif  // GFLAGS
306