1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 //
7 // The goal of this tool is to be a simple stress test with focus on catching:
8 // * bugs in compaction/flush processes, especially the ones that cause
9 // assertion errors
10 // * bugs in the code that deletes obsolete files
11 //
12 // There are two parts of the test:
13 // * write_stress, a binary that writes to the database
14 // * write_stress_runner.py, a script that invokes and kills write_stress
15 //
16 // Here are some interesting parts of write_stress:
17 // * Runs with very high concurrency of compactions and flushes (32 threads
18 // total) and tries to create a huge amount of small files
19 // * The keys written to the database are not uniformly distributed -- there is
20 // a 3-character prefix that mutates occasionally (in prefix mutator thread), in
21 // such a way that the first character mutates slower than second, which mutates
22 // slower than third character. That way, the compaction stress tests some
23 // interesting compaction features like trivial moves and bottommost level
24 // calculation
25 // * There is a thread that creates an iterator, holds it for couple of seconds
26 // and then iterates over all keys. This is supposed to test RocksDB's abilities
27 // to keep the files alive when there are references to them.
28 // * Some writes trigger WAL sync. This is stress testing our WAL sync code.
29 // * At the end of the run, we make sure that we didn't leak any of the sst
30 // files
31 //
32 // write_stress_runner.py changes the mode in which we run write_stress and also
33 // kills and restarts it. There are some interesting characteristics:
34 // * At the beginning we divide the full test runtime into smaller parts --
35 // shorter runtimes (couple of seconds) and longer runtimes (100, 1000) seconds
36 // * The first time we run write_stress, we destroy the old DB. Every next time
37 // during the test, we use the same DB.
38 // * We can run in kill mode or clean-restart mode. Kill mode kills the
39 // write_stress violently.
40 // * We can run in mode where delete_obsolete_files_with_fullscan is true or
41 // false
42 // * We can run with low_open_files mode turned on or off. When it's turned on,
43 // we configure table cache to only hold a couple of files -- that way we need
44 // to reopen files every time we access them.
45 //
46 // Another goal was to create a stress test without a lot of parameters. So
47 // tools/write_stress_runner.py should only take one parameter -- runtime_sec
48 // and it should figure out everything else on its own.
49
50 #include <cstdio>
51
52 #ifndef GFLAGS
main()53 int main() {
54 fprintf(stderr, "Please install gflags to run rocksdb tools\n");
55 return 1;
56 }
57 #else
58
59 #include <atomic>
60 #include <cinttypes>
61 #include <random>
62 #include <set>
63 #include <string>
64 #include <thread>
65
66 #include "file/filename.h"
67 #include "port/port.h"
68 #include "rocksdb/db.h"
69 #include "rocksdb/env.h"
70 #include "rocksdb/options.h"
71 #include "rocksdb/slice.h"
72 #include "util/gflags_compat.h"
73
74 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
75 using GFLAGS_NAMESPACE::RegisterFlagValidator;
76 using GFLAGS_NAMESPACE::SetUsageMessage;
77
78 DEFINE_int32(key_size, 10, "Key size");
79 DEFINE_int32(value_size, 100, "Value size");
80 DEFINE_string(db, "", "Use the db with the following name.");
81 DEFINE_bool(destroy_db, true,
82 "Destroy the existing DB before running the test");
83
84 DEFINE_int32(runtime_sec, 10 * 60, "How long are we running for, in seconds");
85 DEFINE_int32(seed, 139, "Random seed");
86
87 DEFINE_double(prefix_mutate_period_sec, 1.0,
88 "How often are we going to mutate the prefix");
89 DEFINE_double(first_char_mutate_probability, 0.1,
90 "How likely are we to mutate the first char every period");
91 DEFINE_double(second_char_mutate_probability, 0.2,
92 "How likely are we to mutate the second char every period");
93 DEFINE_double(third_char_mutate_probability, 0.5,
94 "How likely are we to mutate the third char every period");
95
96 DEFINE_int32(iterator_hold_sec, 5,
97 "How long will the iterator hold files before it gets destroyed");
98
99 DEFINE_double(sync_probability, 0.01, "How often are we syncing writes");
100 DEFINE_bool(delete_obsolete_files_with_fullscan, false,
101 "If true, we delete obsolete files after each compaction/flush "
102 "using GetChildren() API");
103 DEFINE_bool(low_open_files_mode, false,
104 "If true, we set max_open_files to 20, so that every file access "
105 "needs to reopen it");
106
107 namespace ROCKSDB_NAMESPACE {
108
109 static const int kPrefixSize = 3;
110
111 class WriteStress {
112 public:
WriteStress()113 WriteStress() : stop_(false) {
114 // initialize key_prefix
115 for (int i = 0; i < kPrefixSize; ++i) {
116 key_prefix_[i].store('a');
117 }
118
119 // Choose a location for the test database if none given with --db=<path>
120 if (FLAGS_db.empty()) {
121 std::string default_db_path;
122 Env::Default()->GetTestDirectory(&default_db_path);
123 default_db_path += "/write_stress";
124 FLAGS_db = default_db_path;
125 }
126
127 Options options;
128 if (FLAGS_destroy_db) {
129 DestroyDB(FLAGS_db, options); // ignore
130 }
131
132 // make the LSM tree deep, so that we have many concurrent flushes and
133 // compactions
134 options.create_if_missing = true;
135 options.write_buffer_size = 256 * 1024; // 256k
136 options.max_bytes_for_level_base = 1 * 1024 * 1024; // 1MB
137 options.target_file_size_base = 100 * 1024; // 100k
138 options.max_write_buffer_number = 16;
139 options.max_background_compactions = 16;
140 options.max_background_flushes = 16;
141 options.max_open_files = FLAGS_low_open_files_mode ? 20 : -1;
142 if (FLAGS_delete_obsolete_files_with_fullscan) {
143 options.delete_obsolete_files_period_micros = 0;
144 }
145
146 // open DB
147 DB* db;
148 Status s = DB::Open(options, FLAGS_db, &db);
149 if (!s.ok()) {
150 fprintf(stderr, "Can't open database: %s\n", s.ToString().c_str());
151 std::abort();
152 }
153 db_.reset(db);
154 }
155
WriteThread()156 void WriteThread() {
157 std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed));
158 std::uniform_real_distribution<double> dist(0, 1);
159
160 auto random_string = [](std::mt19937& r, int len) {
161 std::uniform_int_distribution<int> char_dist('a', 'z');
162 std::string ret;
163 for (int i = 0; i < len; ++i) {
164 ret += static_cast<char>(char_dist(r));
165 }
166 return ret;
167 };
168
169 while (!stop_.load(std::memory_order_relaxed)) {
170 std::string prefix;
171 prefix.resize(kPrefixSize);
172 for (int i = 0; i < kPrefixSize; ++i) {
173 prefix[i] = key_prefix_[i].load(std::memory_order_relaxed);
174 }
175 auto key = prefix + random_string(rng, FLAGS_key_size - kPrefixSize);
176 auto value = random_string(rng, FLAGS_value_size);
177 WriteOptions woptions;
178 woptions.sync = dist(rng) < FLAGS_sync_probability;
179 auto s = db_->Put(woptions, key, value);
180 if (!s.ok()) {
181 fprintf(stderr, "Write to DB failed: %s\n", s.ToString().c_str());
182 std::abort();
183 }
184 }
185 }
186
IteratorHoldThread()187 void IteratorHoldThread() {
188 while (!stop_.load(std::memory_order_relaxed)) {
189 std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
190 Env::Default()->SleepForMicroseconds(FLAGS_iterator_hold_sec * 1000 *
191 1000LL);
192 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
193 }
194 if (!iterator->status().ok()) {
195 fprintf(stderr, "Iterator statuts not OK: %s\n",
196 iterator->status().ToString().c_str());
197 std::abort();
198 }
199 }
200 }
201
PrefixMutatorThread()202 void PrefixMutatorThread() {
203 std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed));
204 std::uniform_real_distribution<double> dist(0, 1);
205 std::uniform_int_distribution<int> char_dist('a', 'z');
206 while (!stop_.load(std::memory_order_relaxed)) {
207 Env::Default()->SleepForMicroseconds(static_cast<int>(
208 FLAGS_prefix_mutate_period_sec *
209 1000 * 1000LL));
210 if (dist(rng) < FLAGS_first_char_mutate_probability) {
211 key_prefix_[0].store(static_cast<char>(char_dist(rng)), std::memory_order_relaxed);
212 }
213 if (dist(rng) < FLAGS_second_char_mutate_probability) {
214 key_prefix_[1].store(static_cast<char>(char_dist(rng)), std::memory_order_relaxed);
215 }
216 if (dist(rng) < FLAGS_third_char_mutate_probability) {
217 key_prefix_[2].store(static_cast<char>(char_dist(rng)), std::memory_order_relaxed);
218 }
219 }
220 }
221
Run()222 int Run() {
223 threads_.emplace_back([&]() { WriteThread(); });
224 threads_.emplace_back([&]() { PrefixMutatorThread(); });
225 threads_.emplace_back([&]() { IteratorHoldThread(); });
226
227 if (FLAGS_runtime_sec == -1) {
228 // infinite runtime, until we get killed
229 while (true) {
230 Env::Default()->SleepForMicroseconds(1000 * 1000);
231 }
232 }
233
234 Env::Default()->SleepForMicroseconds(FLAGS_runtime_sec * 1000 * 1000);
235
236 stop_.store(true, std::memory_order_relaxed);
237 for (auto& t : threads_) {
238 t.join();
239 }
240 threads_.clear();
241
242 // Skip checking for leaked files in ROCKSDB_LITE since we don't have access to
243 // function GetLiveFilesMetaData
244 #ifndef ROCKSDB_LITE
245 // let's see if we leaked some files
246 db_->PauseBackgroundWork();
247 std::vector<LiveFileMetaData> metadata;
248 db_->GetLiveFilesMetaData(&metadata);
249 std::set<uint64_t> sst_file_numbers;
250 for (const auto& file : metadata) {
251 uint64_t number;
252 FileType type;
253 if (!ParseFileName(file.name, &number, "LOG", &type)) {
254 continue;
255 }
256 if (type == kTableFile) {
257 sst_file_numbers.insert(number);
258 }
259 }
260
261 std::vector<std::string> children;
262 Env::Default()->GetChildren(FLAGS_db, &children);
263 for (const auto& child : children) {
264 uint64_t number;
265 FileType type;
266 if (!ParseFileName(child, &number, "LOG", &type)) {
267 continue;
268 }
269 if (type == kTableFile) {
270 if (sst_file_numbers.find(number) == sst_file_numbers.end()) {
271 fprintf(stderr,
272 "Found a table file in DB path that should have been "
273 "deleted: %s\n",
274 child.c_str());
275 std::abort();
276 }
277 }
278 }
279 db_->ContinueBackgroundWork();
280 #endif // !ROCKSDB_LITE
281
282 return 0;
283 }
284
285 private:
286 // each key is prepended with this prefix. we occasionally change it. third
287 // letter is changed more frequently than second, which is changed more
288 // frequently than the first one.
289 std::atomic<char> key_prefix_[kPrefixSize];
290 std::atomic<bool> stop_;
291 std::vector<port::Thread> threads_;
292 std::unique_ptr<DB> db_;
293 };
294
295 } // namespace ROCKSDB_NAMESPACE
296
main(int argc,char ** argv)297 int main(int argc, char** argv) {
298 SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
299 " [OPTIONS]...");
300 ParseCommandLineFlags(&argc, &argv, true);
301 ROCKSDB_NAMESPACE::WriteStress write_stress;
302 return write_stress.Run();
303 }
304
305 #endif // GFLAGS
306