1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // The test uses an array to compare against values written to the database.
11 // Keys written to the array are in 1:1 correspondence to the actual values in
12 // the database according to the formula in the function GenerateValue.
13
14 // Space is reserved in the array from 0 to FLAGS_max_key and values are
15 // randomly written/deleted/read from those positions. During verification we
16 // compare all the positions in the array. To shorten/elongate the running
17 // time, you could change the settings: FLAGS_max_key, FLAGS_ops_per_thread,
18 // (sometimes also FLAGS_threads).
19 //
20 // NOTE that if FLAGS_test_batches_snapshots is set, the test will have
21 // different behavior. See comment of the flag for details.
22
23 #ifdef GFLAGS
24 #pragma once
25 #include <fcntl.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <sys/types.h>
29 #include <algorithm>
30 #include <array>
31 #include <chrono>
32 #include <cinttypes>
33 #include <exception>
34 #include <queue>
35 #include <thread>
36
37 #include "db/db_impl/db_impl.h"
38 #include "db/version_set.h"
39 #include "db_stress_tool/db_stress_env_wrapper.h"
40 #include "db_stress_tool/db_stress_listener.h"
41 #include "db_stress_tool/db_stress_shared_state.h"
42 #include "db_stress_tool/db_stress_test_base.h"
43 #include "hdfs/env_hdfs.h"
44 #include "logging/logging.h"
45 #include "monitoring/histogram.h"
46 #include "options/options_helper.h"
47 #include "port/port.h"
48 #include "rocksdb/cache.h"
49 #include "rocksdb/env.h"
50 #include "rocksdb/slice.h"
51 #include "rocksdb/slice_transform.h"
52 #include "rocksdb/statistics.h"
53 #include "rocksdb/utilities/backupable_db.h"
54 #include "rocksdb/utilities/checkpoint.h"
55 #include "rocksdb/utilities/db_ttl.h"
56 #include "rocksdb/utilities/debug.h"
57 #include "rocksdb/utilities/options_util.h"
58 #include "rocksdb/utilities/transaction.h"
59 #include "rocksdb/utilities/transaction_db.h"
60 #include "rocksdb/write_batch.h"
61 #include "util/coding.h"
62 #include "util/compression.h"
63 #include "util/crc32c.h"
64 #include "util/gflags_compat.h"
65 #include "util/mutexlock.h"
66 #include "util/random.h"
67 #include "util/string_util.h"
68 #include "utilities/blob_db/blob_db.h"
69 // SyncPoint is not supported in Released Windows Mode.
70 #if !(defined NDEBUG) || !defined(OS_WIN)
71 #include "test_util/sync_point.h"
72 #endif // !(defined NDEBUG) || !defined(OS_WIN)
73 #include "test_util/testutil.h"
74
75 #include "utilities/merge_operators.h"
76
77 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
78 using GFLAGS_NAMESPACE::RegisterFlagValidator;
79 using GFLAGS_NAMESPACE::SetUsageMessage;
80
81 DECLARE_uint64(seed);
82 DECLARE_bool(read_only);
83 DECLARE_int64(max_key);
84 DECLARE_double(hot_key_alpha);
85 DECLARE_int32(max_key_len);
86 DECLARE_string(key_len_percent_dist);
87 DECLARE_int32(key_window_scale_factor);
88 DECLARE_int32(column_families);
89 DECLARE_string(options_file);
90 DECLARE_int64(active_width);
91 DECLARE_bool(test_batches_snapshots);
92 DECLARE_bool(atomic_flush);
93 DECLARE_bool(test_cf_consistency);
94 DECLARE_int32(threads);
95 DECLARE_int32(ttl);
96 DECLARE_int32(value_size_mult);
97 DECLARE_int32(compaction_readahead_size);
98 DECLARE_bool(enable_pipelined_write);
99 DECLARE_bool(verify_before_write);
100 DECLARE_bool(histogram);
101 DECLARE_bool(destroy_db_initially);
102 DECLARE_bool(verbose);
103 DECLARE_bool(progress_reports);
104 DECLARE_uint64(db_write_buffer_size);
105 DECLARE_int32(write_buffer_size);
106 DECLARE_int32(max_write_buffer_number);
107 DECLARE_int32(min_write_buffer_number_to_merge);
108 DECLARE_int32(max_write_buffer_number_to_maintain);
109 DECLARE_int64(max_write_buffer_size_to_maintain);
110 DECLARE_double(memtable_prefix_bloom_size_ratio);
111 DECLARE_bool(memtable_whole_key_filtering);
112 DECLARE_int32(open_files);
113 DECLARE_int64(compressed_cache_size);
114 DECLARE_int32(compaction_style);
115 DECLARE_int32(level0_file_num_compaction_trigger);
116 DECLARE_int32(level0_slowdown_writes_trigger);
117 DECLARE_int32(level0_stop_writes_trigger);
118 DECLARE_int32(block_size);
119 DECLARE_int32(format_version);
120 DECLARE_int32(index_block_restart_interval);
121 DECLARE_int32(max_background_compactions);
122 DECLARE_int32(num_bottom_pri_threads);
123 DECLARE_int32(compaction_thread_pool_adjust_interval);
124 DECLARE_int32(compaction_thread_pool_variations);
125 DECLARE_int32(max_background_flushes);
126 DECLARE_int32(universal_size_ratio);
127 DECLARE_int32(universal_min_merge_width);
128 DECLARE_int32(universal_max_merge_width);
129 DECLARE_int32(universal_max_size_amplification_percent);
130 DECLARE_int32(clear_column_family_one_in);
131 DECLARE_int32(get_live_files_one_in);
132 DECLARE_int32(get_sorted_wal_files_one_in);
133 DECLARE_int32(get_current_wal_file_one_in);
134 DECLARE_int32(set_options_one_in);
135 DECLARE_int32(set_in_place_one_in);
136 DECLARE_int64(cache_size);
137 DECLARE_bool(cache_index_and_filter_blocks);
138 DECLARE_bool(use_clock_cache);
139 DECLARE_uint64(subcompactions);
140 DECLARE_uint64(periodic_compaction_seconds);
141 DECLARE_uint64(compaction_ttl);
142 DECLARE_bool(allow_concurrent_memtable_write);
143 DECLARE_bool(enable_write_thread_adaptive_yield);
144 DECLARE_int32(reopen);
145 DECLARE_double(bloom_bits);
146 DECLARE_bool(use_block_based_filter);
147 DECLARE_bool(partition_filters);
148 DECLARE_int32(index_type);
149 DECLARE_string(db);
150 DECLARE_string(secondaries_base);
151 DECLARE_bool(test_secondary);
152 DECLARE_string(expected_values_path);
153 DECLARE_bool(verify_checksum);
154 DECLARE_bool(mmap_read);
155 DECLARE_bool(mmap_write);
156 DECLARE_bool(use_direct_reads);
157 DECLARE_bool(use_direct_io_for_flush_and_compaction);
158 DECLARE_bool(statistics);
159 DECLARE_bool(sync);
160 DECLARE_bool(use_fsync);
161 DECLARE_int32(kill_random_test);
162 DECLARE_string(kill_prefix_blacklist);
163 DECLARE_bool(disable_wal);
164 DECLARE_uint64(recycle_log_file_num);
165 DECLARE_int64(target_file_size_base);
166 DECLARE_int32(target_file_size_multiplier);
167 DECLARE_uint64(max_bytes_for_level_base);
168 DECLARE_double(max_bytes_for_level_multiplier);
169 DECLARE_int32(range_deletion_width);
170 DECLARE_uint64(rate_limiter_bytes_per_sec);
171 DECLARE_bool(rate_limit_bg_reads);
172 DECLARE_uint64(sst_file_manager_bytes_per_sec);
173 DECLARE_uint64(sst_file_manager_bytes_per_truncate);
174 DECLARE_bool(use_txn);
175 DECLARE_uint64(txn_write_policy);
176 DECLARE_bool(unordered_write);
177 DECLARE_int32(backup_one_in);
178 DECLARE_int32(checkpoint_one_in);
179 DECLARE_int32(ingest_external_file_one_in);
180 DECLARE_int32(ingest_external_file_width);
181 DECLARE_int32(compact_files_one_in);
182 DECLARE_int32(compact_range_one_in);
183 DECLARE_int32(flush_one_in);
184 DECLARE_int32(pause_background_one_in);
185 DECLARE_int32(compact_range_width);
186 DECLARE_int32(acquire_snapshot_one_in);
187 DECLARE_bool(compare_full_db_state_snapshot);
188 DECLARE_uint64(snapshot_hold_ops);
189 DECLARE_bool(long_running_snapshots);
190 DECLARE_bool(use_multiget);
191 DECLARE_int32(readpercent);
192 DECLARE_int32(prefixpercent);
193 DECLARE_int32(writepercent);
194 DECLARE_int32(delpercent);
195 DECLARE_int32(delrangepercent);
196 DECLARE_int32(nooverwritepercent);
197 DECLARE_int32(iterpercent);
198 DECLARE_uint64(num_iterations);
199 DECLARE_string(compression_type);
200 DECLARE_string(bottommost_compression_type);
201 DECLARE_int32(compression_max_dict_bytes);
202 DECLARE_int32(compression_zstd_max_train_bytes);
203 DECLARE_string(checksum_type);
204 DECLARE_string(hdfs);
205 DECLARE_string(env_uri);
206 DECLARE_uint64(ops_per_thread);
207 DECLARE_uint64(log2_keys_per_lock);
208 DECLARE_uint64(max_manifest_file_size);
209 DECLARE_bool(in_place_update);
210 DECLARE_int32(secondary_catch_up_one_in);
211 DECLARE_string(memtablerep);
212 DECLARE_int32(prefix_size);
213 DECLARE_bool(use_merge);
214 DECLARE_bool(use_full_merge_v1);
215 DECLARE_int32(sync_wal_one_in);
216 DECLARE_bool(avoid_unnecessary_blocking_io);
217 DECLARE_bool(write_dbid_to_manifest);
218 DECLARE_uint64(max_write_batch_group_size_bytes);
219 DECLARE_bool(level_compaction_dynamic_level_bytes);
220 DECLARE_int32(verify_checksum_one_in);
221 DECLARE_int32(verify_db_one_in);
222 DECLARE_int32(continuous_verification_interval);
223
224 #ifndef ROCKSDB_LITE
225 DECLARE_bool(use_blob_db);
226 DECLARE_uint64(blob_db_min_blob_size);
227 DECLARE_uint64(blob_db_bytes_per_sync);
228 DECLARE_uint64(blob_db_file_size);
229 DECLARE_bool(blob_db_enable_gc);
230 DECLARE_double(blob_db_gc_cutoff);
231 #endif // !ROCKSDB_LITE
232 DECLARE_int32(approximate_size_one_in);
233
234 const long KB = 1024;
235 const int kRandomValueMaxFactor = 3;
236 const int kValueMaxLen = 100;
237
238 // wrapped posix or hdfs environment
239 extern ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env;
240
241 extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e;
242 extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e;
243 extern enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e;
244
245 enum RepFactory { kSkipList, kHashSkipList, kVectorRep };
246
StringToRepFactory(const char * ctype)247 inline enum RepFactory StringToRepFactory(const char* ctype) {
248 assert(ctype);
249
250 if (!strcasecmp(ctype, "skip_list"))
251 return kSkipList;
252 else if (!strcasecmp(ctype, "prefix_hash"))
253 return kHashSkipList;
254 else if (!strcasecmp(ctype, "vector"))
255 return kVectorRep;
256
257 fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
258 return kSkipList;
259 }
260
261 extern enum RepFactory FLAGS_rep_factory;
262
263 namespace ROCKSDB_NAMESPACE {
StringToCompressionType(const char * ctype)264 inline enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
265 const char* ctype) {
266 assert(ctype);
267
268 ROCKSDB_NAMESPACE::CompressionType ret_compression_type;
269
270 if (!strcasecmp(ctype, "disable")) {
271 ret_compression_type = ROCKSDB_NAMESPACE::kDisableCompressionOption;
272 } else if (!strcasecmp(ctype, "none")) {
273 ret_compression_type = ROCKSDB_NAMESPACE::kNoCompression;
274 } else if (!strcasecmp(ctype, "snappy")) {
275 ret_compression_type = ROCKSDB_NAMESPACE::kSnappyCompression;
276 } else if (!strcasecmp(ctype, "zlib")) {
277 ret_compression_type = ROCKSDB_NAMESPACE::kZlibCompression;
278 } else if (!strcasecmp(ctype, "bzip2")) {
279 ret_compression_type = ROCKSDB_NAMESPACE::kBZip2Compression;
280 } else if (!strcasecmp(ctype, "lz4")) {
281 ret_compression_type = ROCKSDB_NAMESPACE::kLZ4Compression;
282 } else if (!strcasecmp(ctype, "lz4hc")) {
283 ret_compression_type = ROCKSDB_NAMESPACE::kLZ4HCCompression;
284 } else if (!strcasecmp(ctype, "xpress")) {
285 ret_compression_type = ROCKSDB_NAMESPACE::kXpressCompression;
286 } else if (!strcasecmp(ctype, "zstd")) {
287 ret_compression_type = ROCKSDB_NAMESPACE::kZSTD;
288 } else {
289 fprintf(stderr, "Cannot parse compression type '%s'\n", ctype);
290 ret_compression_type =
291 ROCKSDB_NAMESPACE::kSnappyCompression; // default value
292 }
293 if (ret_compression_type != ROCKSDB_NAMESPACE::kDisableCompressionOption &&
294 !CompressionTypeSupported(ret_compression_type)) {
295 // Use no compression will be more portable but considering this is
296 // only a stress test and snappy is widely available. Use snappy here.
297 ret_compression_type = ROCKSDB_NAMESPACE::kSnappyCompression;
298 }
299 return ret_compression_type;
300 }
301
StringToChecksumType(const char * ctype)302 inline enum ROCKSDB_NAMESPACE::ChecksumType StringToChecksumType(
303 const char* ctype) {
304 assert(ctype);
305 auto iter = ROCKSDB_NAMESPACE::checksum_type_string_map.find(ctype);
306 if (iter != ROCKSDB_NAMESPACE::checksum_type_string_map.end()) {
307 return iter->second;
308 }
309 fprintf(stderr, "Cannot parse checksum type '%s'\n", ctype);
310 return ROCKSDB_NAMESPACE::kCRC32c;
311 }
312
ChecksumTypeToString(ROCKSDB_NAMESPACE::ChecksumType ctype)313 inline std::string ChecksumTypeToString(ROCKSDB_NAMESPACE::ChecksumType ctype) {
314 auto iter = std::find_if(
315 ROCKSDB_NAMESPACE::checksum_type_string_map.begin(),
316 ROCKSDB_NAMESPACE::checksum_type_string_map.end(),
317 [&](const std::pair<std::string, ROCKSDB_NAMESPACE::ChecksumType>&
318 name_and_enum_val) { return name_and_enum_val.second == ctype; });
319 assert(iter != ROCKSDB_NAMESPACE::checksum_type_string_map.end());
320 return iter->first;
321 }
322
SplitString(std::string src)323 inline std::vector<std::string> SplitString(std::string src) {
324 std::vector<std::string> ret;
325 if (src.empty()) {
326 return ret;
327 }
328 size_t pos = 0;
329 size_t pos_comma;
330 while ((pos_comma = src.find(',', pos)) != std::string::npos) {
331 ret.push_back(src.substr(pos, pos_comma - pos));
332 pos = pos_comma + 1;
333 }
334 ret.push_back(src.substr(pos, src.length()));
335 return ret;
336 }
337
338 #ifdef _MSC_VER
339 #pragma warning(push)
340 // truncation of constant value on static_cast
341 #pragma warning(disable : 4309)
342 #endif
GetNextPrefix(const ROCKSDB_NAMESPACE::Slice & src,std::string * v)343 inline bool GetNextPrefix(const ROCKSDB_NAMESPACE::Slice& src, std::string* v) {
344 std::string ret = src.ToString();
345 for (int i = static_cast<int>(ret.size()) - 1; i >= 0; i--) {
346 if (ret[i] != static_cast<char>(255)) {
347 ret[i] = ret[i] + 1;
348 break;
349 } else if (i != 0) {
350 ret[i] = 0;
351 } else {
352 // all FF. No next prefix
353 return false;
354 }
355 }
356 *v = ret;
357 return true;
358 }
359 #ifdef _MSC_VER
360 #pragma warning(pop)
361 #endif
362
363 // convert long to a big-endian slice key
GetStringFromInt(int64_t val)364 extern inline std::string GetStringFromInt(int64_t val) {
365 std::string little_endian_key;
366 std::string big_endian_key;
367 PutFixed64(&little_endian_key, val);
368 assert(little_endian_key.size() == sizeof(val));
369 big_endian_key.resize(sizeof(val));
370 for (size_t i = 0; i < sizeof(val); ++i) {
371 big_endian_key[i] = little_endian_key[sizeof(val) - 1 - i];
372 }
373 return big_endian_key;
374 }
375
376 // A struct for maintaining the parameters for generating variable length keys
377 struct KeyGenContext {
378 // Number of adjacent keys in one cycle of key lengths
379 uint64_t window;
380 // Number of keys of each possible length in a given window
381 std::vector<uint64_t> weights;
382 };
383 extern KeyGenContext key_gen_ctx;
384
385 // Generate a variable length key string from the given int64 val. The
386 // order of the keys is preserved. The key could be anywhere from 8 to
387 // max_key_len * 8 bytes.
388 // The algorithm picks the length based on the
389 // offset of the val within a configured window and the distribution of the
390 // number of keys of various lengths in that window. For example, if x, y, x are
391 // the weights assigned to each possible key length, the keys generated would be
392 // - {0}...{x-1}
393 // {(x-1),0}..{(x-1),(y-1)},{(x-1),(y-1),0}..{(x-1),(y-1),(z-1)} and so on.
394 // Additionally, a trailer of 0-7 bytes could be appended.
Key(int64_t val)395 extern inline std::string Key(int64_t val) {
396 uint64_t window = key_gen_ctx.window;
397 size_t levels = key_gen_ctx.weights.size();
398 std::string key;
399
400 for (size_t level = 0; level < levels; ++level) {
401 uint64_t weight = key_gen_ctx.weights[level];
402 uint64_t offset = static_cast<uint64_t>(val) % window;
403 uint64_t mult = static_cast<uint64_t>(val) / window;
404 uint64_t pfx = mult * weight + (offset >= weight ? weight - 1 : offset);
405 key.append(GetStringFromInt(pfx));
406 if (offset < weight) {
407 // Use the bottom 3 bits of offset as the number of trailing 'x's in the
408 // key. If the next key is going to be of the next level, then skip the
409 // trailer as it would break ordering. If the key length is already at max,
410 // skip the trailer.
411 if (offset < weight - 1 && level < levels - 1) {
412 size_t trailer_len = offset & 0x7;
413 key.append(trailer_len, 'x');
414 }
415 break;
416 }
417 val = offset - weight;
418 window -= weight;
419 }
420
421 return key;
422 }
423
424 // Given a string key, map it to an index into the expected values buffer
GetIntVal(std::string big_endian_key,uint64_t * key_p)425 extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) {
426 size_t size_key = big_endian_key.size();
427 std::vector<uint64_t> prefixes;
428
429 assert(size_key <= key_gen_ctx.weights.size() * sizeof(uint64_t));
430
431 // Pad with zeros to make it a multiple of 8. This function may be called
432 // with a prefix, in which case we return the first index that falls
433 // inside or outside that prefix, dependeing on whether the prefix is
434 // the start of upper bound of a scan
435 unsigned int pad = sizeof(uint64_t) - (size_key % sizeof(uint64_t));
436 if (pad < sizeof(uint64_t)) {
437 big_endian_key.append(pad, '\0');
438 size_key += pad;
439 }
440
441 std::string little_endian_key;
442 little_endian_key.resize(size_key);
443 for (size_t start = 0; start < size_key; start += sizeof(uint64_t)) {
444 size_t end = start + sizeof(uint64_t);
445 for (size_t i = 0; i < sizeof(uint64_t); ++i) {
446 little_endian_key[start + i] = big_endian_key[end - 1 - i];
447 }
448 Slice little_endian_slice =
449 Slice(&little_endian_key[start], sizeof(uint64_t));
450 uint64_t pfx;
451 if (!GetFixed64(&little_endian_slice, &pfx)) {
452 return false;
453 }
454 prefixes.emplace_back(pfx);
455 }
456
457 uint64_t key = 0;
458 for (size_t i = 0; i < prefixes.size(); ++i) {
459 uint64_t pfx = prefixes[i];
460 key += (pfx / key_gen_ctx.weights[i]) * key_gen_ctx.window +
461 pfx % key_gen_ctx.weights[i];
462 }
463 *key_p = key;
464 return true;
465 }
466
GetPrefixKeyCount(const std::string & prefix,const std::string & ub)467 extern inline uint64_t GetPrefixKeyCount(const std::string& prefix,
468 const std::string& ub) {
469 uint64_t start = 0;
470 uint64_t end = 0;
471
472 if (!GetIntVal(prefix, &start) || !GetIntVal(ub, &end)) {
473 return 0;
474 }
475
476 return end - start;
477 }
478
StringToHex(const std::string & str)479 extern inline std::string StringToHex(const std::string& str) {
480 std::string result = "0x";
481 result.append(Slice(str).ToString(true));
482 return result;
483 }
484
485 // Unified output format for double parameters
FormatDoubleParam(double param)486 extern inline std::string FormatDoubleParam(double param) {
487 return std::to_string(param);
488 }
489
490 // Make sure that double parameter is a value we can reproduce by
491 // re-inputting the value printed.
SanitizeDoubleParam(double * param)492 extern inline void SanitizeDoubleParam(double* param) {
493 *param = std::atof(FormatDoubleParam(*param).c_str());
494 }
495
496 extern void PoolSizeChangeThread(void* v);
497
498 extern void DbVerificationThread(void* v);
499
500 extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz);
501
502 extern int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration);
503
504 extern std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
505 uint64_t iteration);
506
507 extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz);
508
509 extern StressTest* CreateCfConsistencyStressTest();
510 extern StressTest* CreateBatchedOpsStressTest();
511 extern StressTest* CreateNonBatchedOpsStressTest();
512 extern void InitializeHotKeyGenerator(double alpha);
513 extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key);
514 } // namespace ROCKSDB_NAMESPACE
515 #endif // GFLAGS
516