1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // The test uses an array to compare against values written to the database.
11 // Keys written to the array are in 1:1 correspondence to the actual values in
12 // the database according to the formula in the function GenerateValue.
13 
14 // Space is reserved in the array from 0 to FLAGS_max_key and values are
15 // randomly written/deleted/read from those positions. During verification we
16 // compare all the positions in the array. To shorten/elongate the running
17 // time, you could change the settings: FLAGS_max_key, FLAGS_ops_per_thread,
18 // (sometimes also FLAGS_threads).
19 //
20 // NOTE that if FLAGS_test_batches_snapshots is set, the test will have
21 // different behavior. See comment of the flag for details.
22 
23 #ifdef GFLAGS
24 #pragma once
25 #include <fcntl.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <sys/types.h>
29 #include <algorithm>
30 #include <array>
31 #include <chrono>
32 #include <cinttypes>
33 #include <exception>
34 #include <queue>
35 #include <thread>
36 
37 #include "db/db_impl/db_impl.h"
38 #include "db/version_set.h"
39 #include "db_stress_tool/db_stress_env_wrapper.h"
40 #include "db_stress_tool/db_stress_listener.h"
41 #include "db_stress_tool/db_stress_shared_state.h"
42 #include "db_stress_tool/db_stress_test_base.h"
43 #include "hdfs/env_hdfs.h"
44 #include "logging/logging.h"
45 #include "monitoring/histogram.h"
46 #include "options/options_helper.h"
47 #include "port/port.h"
48 #include "rocksdb/cache.h"
49 #include "rocksdb/env.h"
50 #include "rocksdb/slice.h"
51 #include "rocksdb/slice_transform.h"
52 #include "rocksdb/statistics.h"
53 #include "rocksdb/utilities/backupable_db.h"
54 #include "rocksdb/utilities/checkpoint.h"
55 #include "rocksdb/utilities/db_ttl.h"
56 #include "rocksdb/utilities/debug.h"
57 #include "rocksdb/utilities/options_util.h"
58 #include "rocksdb/utilities/transaction.h"
59 #include "rocksdb/utilities/transaction_db.h"
60 #include "rocksdb/write_batch.h"
61 #include "util/coding.h"
62 #include "util/compression.h"
63 #include "util/crc32c.h"
64 #include "util/gflags_compat.h"
65 #include "util/mutexlock.h"
66 #include "util/random.h"
67 #include "util/string_util.h"
68 #include "utilities/blob_db/blob_db.h"
69 // SyncPoint is not supported in Released Windows Mode.
70 #if !(defined NDEBUG) || !defined(OS_WIN)
71 #include "test_util/sync_point.h"
72 #endif  // !(defined NDEBUG) || !defined(OS_WIN)
73 #include "test_util/testutil.h"
74 
75 #include "utilities/merge_operators.h"
76 
77 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
78 using GFLAGS_NAMESPACE::RegisterFlagValidator;
79 using GFLAGS_NAMESPACE::SetUsageMessage;
80 
81 DECLARE_uint64(seed);
82 DECLARE_bool(read_only);
83 DECLARE_int64(max_key);
84 DECLARE_double(hot_key_alpha);
85 DECLARE_int32(max_key_len);
86 DECLARE_string(key_len_percent_dist);
87 DECLARE_int32(key_window_scale_factor);
88 DECLARE_int32(column_families);
89 DECLARE_string(options_file);
90 DECLARE_int64(active_width);
91 DECLARE_bool(test_batches_snapshots);
92 DECLARE_bool(atomic_flush);
93 DECLARE_bool(test_cf_consistency);
94 DECLARE_int32(threads);
95 DECLARE_int32(ttl);
96 DECLARE_int32(value_size_mult);
97 DECLARE_int32(compaction_readahead_size);
98 DECLARE_bool(enable_pipelined_write);
99 DECLARE_bool(verify_before_write);
100 DECLARE_bool(histogram);
101 DECLARE_bool(destroy_db_initially);
102 DECLARE_bool(verbose);
103 DECLARE_bool(progress_reports);
104 DECLARE_uint64(db_write_buffer_size);
105 DECLARE_int32(write_buffer_size);
106 DECLARE_int32(max_write_buffer_number);
107 DECLARE_int32(min_write_buffer_number_to_merge);
108 DECLARE_int32(max_write_buffer_number_to_maintain);
109 DECLARE_int64(max_write_buffer_size_to_maintain);
110 DECLARE_double(memtable_prefix_bloom_size_ratio);
111 DECLARE_bool(memtable_whole_key_filtering);
112 DECLARE_int32(open_files);
113 DECLARE_int64(compressed_cache_size);
114 DECLARE_int32(compaction_style);
115 DECLARE_int32(level0_file_num_compaction_trigger);
116 DECLARE_int32(level0_slowdown_writes_trigger);
117 DECLARE_int32(level0_stop_writes_trigger);
118 DECLARE_int32(block_size);
119 DECLARE_int32(format_version);
120 DECLARE_int32(index_block_restart_interval);
121 DECLARE_int32(max_background_compactions);
122 DECLARE_int32(num_bottom_pri_threads);
123 DECLARE_int32(compaction_thread_pool_adjust_interval);
124 DECLARE_int32(compaction_thread_pool_variations);
125 DECLARE_int32(max_background_flushes);
126 DECLARE_int32(universal_size_ratio);
127 DECLARE_int32(universal_min_merge_width);
128 DECLARE_int32(universal_max_merge_width);
129 DECLARE_int32(universal_max_size_amplification_percent);
130 DECLARE_int32(clear_column_family_one_in);
131 DECLARE_int32(get_live_files_one_in);
132 DECLARE_int32(get_sorted_wal_files_one_in);
133 DECLARE_int32(get_current_wal_file_one_in);
134 DECLARE_int32(set_options_one_in);
135 DECLARE_int32(set_in_place_one_in);
136 DECLARE_int64(cache_size);
137 DECLARE_bool(cache_index_and_filter_blocks);
138 DECLARE_bool(use_clock_cache);
139 DECLARE_uint64(subcompactions);
140 DECLARE_uint64(periodic_compaction_seconds);
141 DECLARE_uint64(compaction_ttl);
142 DECLARE_bool(allow_concurrent_memtable_write);
143 DECLARE_bool(enable_write_thread_adaptive_yield);
144 DECLARE_int32(reopen);
145 DECLARE_double(bloom_bits);
146 DECLARE_bool(use_block_based_filter);
147 DECLARE_bool(partition_filters);
148 DECLARE_int32(index_type);
149 DECLARE_string(db);
150 DECLARE_string(secondaries_base);
151 DECLARE_bool(test_secondary);
152 DECLARE_string(expected_values_path);
153 DECLARE_bool(verify_checksum);
154 DECLARE_bool(mmap_read);
155 DECLARE_bool(mmap_write);
156 DECLARE_bool(use_direct_reads);
157 DECLARE_bool(use_direct_io_for_flush_and_compaction);
158 DECLARE_bool(statistics);
159 DECLARE_bool(sync);
160 DECLARE_bool(use_fsync);
161 DECLARE_int32(kill_random_test);
162 DECLARE_string(kill_prefix_blacklist);
163 DECLARE_bool(disable_wal);
164 DECLARE_uint64(recycle_log_file_num);
165 DECLARE_int64(target_file_size_base);
166 DECLARE_int32(target_file_size_multiplier);
167 DECLARE_uint64(max_bytes_for_level_base);
168 DECLARE_double(max_bytes_for_level_multiplier);
169 DECLARE_int32(range_deletion_width);
170 DECLARE_uint64(rate_limiter_bytes_per_sec);
171 DECLARE_bool(rate_limit_bg_reads);
172 DECLARE_uint64(sst_file_manager_bytes_per_sec);
173 DECLARE_uint64(sst_file_manager_bytes_per_truncate);
174 DECLARE_bool(use_txn);
175 DECLARE_uint64(txn_write_policy);
176 DECLARE_bool(unordered_write);
177 DECLARE_int32(backup_one_in);
178 DECLARE_int32(checkpoint_one_in);
179 DECLARE_int32(ingest_external_file_one_in);
180 DECLARE_int32(ingest_external_file_width);
181 DECLARE_int32(compact_files_one_in);
182 DECLARE_int32(compact_range_one_in);
183 DECLARE_int32(flush_one_in);
184 DECLARE_int32(pause_background_one_in);
185 DECLARE_int32(compact_range_width);
186 DECLARE_int32(acquire_snapshot_one_in);
187 DECLARE_bool(compare_full_db_state_snapshot);
188 DECLARE_uint64(snapshot_hold_ops);
189 DECLARE_bool(long_running_snapshots);
190 DECLARE_bool(use_multiget);
191 DECLARE_int32(readpercent);
192 DECLARE_int32(prefixpercent);
193 DECLARE_int32(writepercent);
194 DECLARE_int32(delpercent);
195 DECLARE_int32(delrangepercent);
196 DECLARE_int32(nooverwritepercent);
197 DECLARE_int32(iterpercent);
198 DECLARE_uint64(num_iterations);
199 DECLARE_string(compression_type);
200 DECLARE_string(bottommost_compression_type);
201 DECLARE_int32(compression_max_dict_bytes);
202 DECLARE_int32(compression_zstd_max_train_bytes);
203 DECLARE_string(checksum_type);
204 DECLARE_string(hdfs);
205 DECLARE_string(env_uri);
206 DECLARE_uint64(ops_per_thread);
207 DECLARE_uint64(log2_keys_per_lock);
208 DECLARE_uint64(max_manifest_file_size);
209 DECLARE_bool(in_place_update);
210 DECLARE_int32(secondary_catch_up_one_in);
211 DECLARE_string(memtablerep);
212 DECLARE_int32(prefix_size);
213 DECLARE_bool(use_merge);
214 DECLARE_bool(use_full_merge_v1);
215 DECLARE_int32(sync_wal_one_in);
216 DECLARE_bool(avoid_unnecessary_blocking_io);
217 DECLARE_bool(write_dbid_to_manifest);
218 DECLARE_uint64(max_write_batch_group_size_bytes);
219 DECLARE_bool(level_compaction_dynamic_level_bytes);
220 DECLARE_int32(verify_checksum_one_in);
221 DECLARE_int32(verify_db_one_in);
222 DECLARE_int32(continuous_verification_interval);
223 
224 #ifndef ROCKSDB_LITE
225 DECLARE_bool(use_blob_db);
226 DECLARE_uint64(blob_db_min_blob_size);
227 DECLARE_uint64(blob_db_bytes_per_sync);
228 DECLARE_uint64(blob_db_file_size);
229 DECLARE_bool(blob_db_enable_gc);
230 DECLARE_double(blob_db_gc_cutoff);
231 #endif  // !ROCKSDB_LITE
232 DECLARE_int32(approximate_size_one_in);
233 
234 const long KB = 1024;
235 const int kRandomValueMaxFactor = 3;
236 const int kValueMaxLen = 100;
237 
238 // wrapped posix or hdfs environment
239 extern ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env;
240 
241 extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e;
242 extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e;
243 extern enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e;
244 
245 enum RepFactory { kSkipList, kHashSkipList, kVectorRep };
246 
StringToRepFactory(const char * ctype)247 inline enum RepFactory StringToRepFactory(const char* ctype) {
248   assert(ctype);
249 
250   if (!strcasecmp(ctype, "skip_list"))
251     return kSkipList;
252   else if (!strcasecmp(ctype, "prefix_hash"))
253     return kHashSkipList;
254   else if (!strcasecmp(ctype, "vector"))
255     return kVectorRep;
256 
257   fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
258   return kSkipList;
259 }
260 
261 extern enum RepFactory FLAGS_rep_factory;
262 
263 namespace ROCKSDB_NAMESPACE {
StringToCompressionType(const char * ctype)264 inline enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
265     const char* ctype) {
266   assert(ctype);
267 
268   ROCKSDB_NAMESPACE::CompressionType ret_compression_type;
269 
270   if (!strcasecmp(ctype, "disable")) {
271     ret_compression_type = ROCKSDB_NAMESPACE::kDisableCompressionOption;
272   } else if (!strcasecmp(ctype, "none")) {
273     ret_compression_type = ROCKSDB_NAMESPACE::kNoCompression;
274   } else if (!strcasecmp(ctype, "snappy")) {
275     ret_compression_type = ROCKSDB_NAMESPACE::kSnappyCompression;
276   } else if (!strcasecmp(ctype, "zlib")) {
277     ret_compression_type = ROCKSDB_NAMESPACE::kZlibCompression;
278   } else if (!strcasecmp(ctype, "bzip2")) {
279     ret_compression_type = ROCKSDB_NAMESPACE::kBZip2Compression;
280   } else if (!strcasecmp(ctype, "lz4")) {
281     ret_compression_type = ROCKSDB_NAMESPACE::kLZ4Compression;
282   } else if (!strcasecmp(ctype, "lz4hc")) {
283     ret_compression_type = ROCKSDB_NAMESPACE::kLZ4HCCompression;
284   } else if (!strcasecmp(ctype, "xpress")) {
285     ret_compression_type = ROCKSDB_NAMESPACE::kXpressCompression;
286   } else if (!strcasecmp(ctype, "zstd")) {
287     ret_compression_type = ROCKSDB_NAMESPACE::kZSTD;
288   } else {
289     fprintf(stderr, "Cannot parse compression type '%s'\n", ctype);
290     ret_compression_type =
291         ROCKSDB_NAMESPACE::kSnappyCompression;  // default value
292   }
293   if (ret_compression_type != ROCKSDB_NAMESPACE::kDisableCompressionOption &&
294       !CompressionTypeSupported(ret_compression_type)) {
295     // Use no compression will be more portable but considering this is
296     // only a stress test and snappy is widely available. Use snappy here.
297     ret_compression_type = ROCKSDB_NAMESPACE::kSnappyCompression;
298   }
299   return ret_compression_type;
300 }
301 
StringToChecksumType(const char * ctype)302 inline enum ROCKSDB_NAMESPACE::ChecksumType StringToChecksumType(
303     const char* ctype) {
304   assert(ctype);
305   auto iter = ROCKSDB_NAMESPACE::checksum_type_string_map.find(ctype);
306   if (iter != ROCKSDB_NAMESPACE::checksum_type_string_map.end()) {
307     return iter->second;
308   }
309   fprintf(stderr, "Cannot parse checksum type '%s'\n", ctype);
310   return ROCKSDB_NAMESPACE::kCRC32c;
311 }
312 
ChecksumTypeToString(ROCKSDB_NAMESPACE::ChecksumType ctype)313 inline std::string ChecksumTypeToString(ROCKSDB_NAMESPACE::ChecksumType ctype) {
314   auto iter = std::find_if(
315       ROCKSDB_NAMESPACE::checksum_type_string_map.begin(),
316       ROCKSDB_NAMESPACE::checksum_type_string_map.end(),
317       [&](const std::pair<std::string, ROCKSDB_NAMESPACE::ChecksumType>&
318               name_and_enum_val) { return name_and_enum_val.second == ctype; });
319   assert(iter != ROCKSDB_NAMESPACE::checksum_type_string_map.end());
320   return iter->first;
321 }
322 
SplitString(std::string src)323 inline std::vector<std::string> SplitString(std::string src) {
324   std::vector<std::string> ret;
325   if (src.empty()) {
326     return ret;
327   }
328   size_t pos = 0;
329   size_t pos_comma;
330   while ((pos_comma = src.find(',', pos)) != std::string::npos) {
331     ret.push_back(src.substr(pos, pos_comma - pos));
332     pos = pos_comma + 1;
333   }
334   ret.push_back(src.substr(pos, src.length()));
335   return ret;
336 }
337 
338 #ifdef _MSC_VER
339 #pragma warning(push)
340 // truncation of constant value on static_cast
341 #pragma warning(disable : 4309)
342 #endif
GetNextPrefix(const ROCKSDB_NAMESPACE::Slice & src,std::string * v)343 inline bool GetNextPrefix(const ROCKSDB_NAMESPACE::Slice& src, std::string* v) {
344   std::string ret = src.ToString();
345   for (int i = static_cast<int>(ret.size()) - 1; i >= 0; i--) {
346     if (ret[i] != static_cast<char>(255)) {
347       ret[i] = ret[i] + 1;
348       break;
349     } else if (i != 0) {
350       ret[i] = 0;
351     } else {
352       // all FF. No next prefix
353       return false;
354     }
355   }
356   *v = ret;
357   return true;
358 }
359 #ifdef _MSC_VER
360 #pragma warning(pop)
361 #endif
362 
363 // convert long to a big-endian slice key
GetStringFromInt(int64_t val)364 extern inline std::string GetStringFromInt(int64_t val) {
365   std::string little_endian_key;
366   std::string big_endian_key;
367   PutFixed64(&little_endian_key, val);
368   assert(little_endian_key.size() == sizeof(val));
369   big_endian_key.resize(sizeof(val));
370   for (size_t i = 0; i < sizeof(val); ++i) {
371     big_endian_key[i] = little_endian_key[sizeof(val) - 1 - i];
372   }
373   return big_endian_key;
374 }
375 
376 // A struct for maintaining the parameters for generating variable length keys
377 struct KeyGenContext {
378   // Number of adjacent keys in one cycle of key lengths
379   uint64_t window;
380   // Number of keys of each possible length in a given window
381   std::vector<uint64_t> weights;
382 };
383 extern KeyGenContext key_gen_ctx;
384 
385 // Generate a variable length key string from the given int64 val. The
386 // order of the keys is preserved. The key could be anywhere from 8 to
387 // max_key_len * 8 bytes.
388 // The algorithm picks the length based on the
389 // offset of the val within a configured window and the distribution of the
390 // number of keys of various lengths in that window. For example, if x, y, x are
391 // the weights assigned to each possible key length, the keys generated would be
392 // - {0}...{x-1}
393 // {(x-1),0}..{(x-1),(y-1)},{(x-1),(y-1),0}..{(x-1),(y-1),(z-1)} and so on.
394 // Additionally, a trailer of 0-7 bytes could be appended.
Key(int64_t val)395 extern inline std::string Key(int64_t val) {
396   uint64_t window = key_gen_ctx.window;
397   size_t levels = key_gen_ctx.weights.size();
398   std::string key;
399 
400   for (size_t level = 0; level < levels; ++level) {
401     uint64_t weight = key_gen_ctx.weights[level];
402     uint64_t offset = static_cast<uint64_t>(val) % window;
403     uint64_t mult = static_cast<uint64_t>(val) / window;
404     uint64_t pfx = mult * weight + (offset >= weight ? weight - 1 : offset);
405     key.append(GetStringFromInt(pfx));
406     if (offset < weight) {
407       // Use the bottom 3 bits of offset as the number of trailing 'x's in the
408       // key. If the next key is going to be of the next level, then skip the
409       // trailer as it would break ordering. If the key length is already at max,
410       // skip the trailer.
411       if (offset < weight - 1 && level < levels - 1) {
412         size_t trailer_len = offset & 0x7;
413         key.append(trailer_len, 'x');
414       }
415       break;
416     }
417     val = offset - weight;
418     window -= weight;
419   }
420 
421   return key;
422 }
423 
424 // Given a string key, map it to an index into the expected values buffer
GetIntVal(std::string big_endian_key,uint64_t * key_p)425 extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) {
426   size_t size_key = big_endian_key.size();
427   std::vector<uint64_t> prefixes;
428 
429   assert(size_key <= key_gen_ctx.weights.size() * sizeof(uint64_t));
430 
431   // Pad with zeros to make it a multiple of 8. This function may be called
432   // with a prefix, in which case we return the first index that falls
433   // inside or outside that prefix, dependeing on whether the prefix is
434   // the start of upper bound of a scan
435   unsigned int pad = sizeof(uint64_t) - (size_key % sizeof(uint64_t));
436   if (pad < sizeof(uint64_t)) {
437     big_endian_key.append(pad, '\0');
438     size_key += pad;
439   }
440 
441   std::string little_endian_key;
442   little_endian_key.resize(size_key);
443   for (size_t start = 0; start < size_key; start += sizeof(uint64_t)) {
444     size_t end = start + sizeof(uint64_t);
445     for (size_t i = 0; i < sizeof(uint64_t); ++i) {
446       little_endian_key[start + i] = big_endian_key[end - 1 - i];
447     }
448     Slice little_endian_slice =
449         Slice(&little_endian_key[start], sizeof(uint64_t));
450     uint64_t pfx;
451     if (!GetFixed64(&little_endian_slice, &pfx)) {
452       return false;
453     }
454     prefixes.emplace_back(pfx);
455   }
456 
457   uint64_t key = 0;
458   for (size_t i = 0; i < prefixes.size(); ++i) {
459     uint64_t pfx = prefixes[i];
460     key += (pfx / key_gen_ctx.weights[i]) * key_gen_ctx.window +
461            pfx % key_gen_ctx.weights[i];
462   }
463   *key_p = key;
464   return true;
465 }
466 
GetPrefixKeyCount(const std::string & prefix,const std::string & ub)467 extern inline uint64_t GetPrefixKeyCount(const std::string& prefix,
468                                          const std::string& ub) {
469   uint64_t start = 0;
470   uint64_t end = 0;
471 
472   if (!GetIntVal(prefix, &start) || !GetIntVal(ub, &end)) {
473     return 0;
474   }
475 
476   return end - start;
477 }
478 
StringToHex(const std::string & str)479 extern inline std::string StringToHex(const std::string& str) {
480   std::string result = "0x";
481   result.append(Slice(str).ToString(true));
482   return result;
483 }
484 
485 // Unified output format for double parameters
FormatDoubleParam(double param)486 extern inline std::string FormatDoubleParam(double param) {
487   return std::to_string(param);
488 }
489 
490 // Make sure that double parameter is a value we can reproduce by
491 // re-inputting the value printed.
SanitizeDoubleParam(double * param)492 extern inline void SanitizeDoubleParam(double* param) {
493   *param = std::atof(FormatDoubleParam(*param).c_str());
494 }
495 
496 extern void PoolSizeChangeThread(void* v);
497 
498 extern void DbVerificationThread(void* v);
499 
500 extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz);
501 
502 extern int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration);
503 
504 extern std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
505                                           uint64_t iteration);
506 
507 extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz);
508 
509 extern StressTest* CreateCfConsistencyStressTest();
510 extern StressTest* CreateBatchedOpsStressTest();
511 extern StressTest* CreateNonBatchedOpsStressTest();
512 extern void InitializeHotKeyGenerator(double alpha);
513 extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key);
514 }  // namespace ROCKSDB_NAMESPACE
515 #endif  // GFLAGS
516