1#!/usr/bin/env python 2# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. 3from __future__ import absolute_import, division, print_function, unicode_literals 4 5import os 6import sys 7import time 8import random 9import tempfile 10import subprocess 11import shutil 12import argparse 13 14# params overwrite priority: 15# for default: 16# default_params < {blackbox,whitebox}_default_params < args 17# for simple: 18# default_params < {blackbox,whitebox}_default_params < 19# simple_default_params < 20# {blackbox,whitebox}_simple_default_params < args 21# for cf_consistency: 22# default_params < {blackbox,whitebox}_default_params < 23# cf_consistency_params < args 24# for txn: 25# default_params < {blackbox,whitebox}_default_params < txn_params < args 26 27expected_values_file = tempfile.NamedTemporaryFile() 28 29default_params = { 30 "acquire_snapshot_one_in": 10000, 31 "block_size": 16384, 32 "bloom_bits": lambda: random.choice([random.randint(0,19), 33 random.lognormvariate(2.3, 1.3)]), 34 "cache_index_and_filter_blocks": lambda: random.randint(0, 1), 35 "cache_size": 1048576, 36 "checkpoint_one_in": 1000000, 37 "compression_type": lambda: random.choice( 38 ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress", "zstd"]), 39 "bottommost_compression_type": lambda: 40 "disable" if random.randint(0, 1) == 0 else 41 random.choice( 42 ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress", 43 "zstd"]), 44 "checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64"]), 45 "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1), 46 "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1), 47 "clear_column_family_one_in": 0, 48 "compact_files_one_in": 1000000, 49 "compact_range_one_in": 1000000, 50 "delpercent": 4, 51 "delrangepercent": 1, 52 "destroy_db_initially": 0, 53 "enable_pipelined_write": lambda: random.randint(0, 1), 54 "expected_values_path": expected_values_file.name, 55 "flush_one_in": 1000000, 56 "get_live_files_one_in": 1000000, 57 # Note: the following two are intentionally disabled as the corresponding 58 # APIs are not guaranteed to succeed. 59 "get_sorted_wal_files_one_in": 0, 60 "get_current_wal_file_one_in": 0, 61 # Temporarily disable hash index 62 "index_type": lambda: random.choice([0,2]), 63 "max_background_compactions": 20, 64 "max_bytes_for_level_base": 10485760, 65 "max_key": 100000000, 66 "max_write_buffer_number": 3, 67 "mmap_read": lambda: random.randint(0, 1), 68 "nooverwritepercent": 1, 69 "open_files": lambda : random.choice([-1, 500000]), 70 "partition_filters": lambda: random.randint(0, 1), 71 "pause_background_one_in": 1000000, 72 "prefixpercent": 5, 73 "progress_reports": 0, 74 "readpercent": 45, 75 "recycle_log_file_num": lambda: random.randint(0, 1), 76 "reopen": 20, 77 "snapshot_hold_ops": 100000, 78 "long_running_snapshots": lambda: random.randint(0, 1), 79 "subcompactions": lambda: random.randint(1, 4), 80 "target_file_size_base": 2097152, 81 "target_file_size_multiplier": 2, 82 "use_direct_reads": lambda: random.randint(0, 1), 83 "use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1), 84 "use_full_merge_v1": lambda: random.randint(0, 1), 85 "use_merge": lambda: random.randint(0, 1), 86 "verify_checksum": 1, 87 "write_buffer_size": 4 * 1024 * 1024, 88 "writepercent": 35, 89 "format_version": lambda: random.choice([2, 3, 4, 5, 5]), 90 "index_block_restart_interval": lambda: random.choice(range(1, 16)), 91 "use_multiget" : lambda: random.randint(0, 1), 92 "periodic_compaction_seconds" : 93 lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]), 94 "compaction_ttl" : lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]), 95 # Test small max_manifest_file_size in a smaller chance, as most of the 96 # time we wnat manifest history to be preserved to help debug 97 "max_manifest_file_size" : lambda : random.choice( 98 [t * 16384 if t < 3 else 1024 * 1024 * 1024 for t in range(1, 30)]), 99 # Sync mode might make test runs slower so running it in a smaller chance 100 "sync" : lambda : random.choice( 101 [1 if t == 0 else 0 for t in range(0, 20)]), 102 # Disable compation_readahead_size because the test is not passing. 103 #"compaction_readahead_size" : lambda : random.choice( 104 # [0, 0, 1024 * 1024]), 105 "db_write_buffer_size" : lambda: random.choice( 106 [0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024]), 107 "avoid_unnecessary_blocking_io" : random.randint(0, 1), 108 "write_dbid_to_manifest" : random.randint(0, 1), 109 "max_write_batch_group_size_bytes" : lambda: random.choice( 110 [16, 64, 1024 * 1024, 16 * 1024 * 1024]), 111 "level_compaction_dynamic_level_bytes" : True, 112 "verify_checksum_one_in": 1000000, 113 "verify_db_one_in": 100000, 114 "continuous_verification_interval" : 0, 115 "max_key_len": 3, 116 "key_len_percent_dist": "1,30,69" 117} 118 119_TEST_DIR_ENV_VAR = 'TEST_TMPDIR' 120 121 122def get_dbname(test_name): 123 test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR) 124 if test_tmpdir is None or test_tmpdir == "": 125 dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name) 126 else: 127 dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name 128 shutil.rmtree(dbname, True) 129 os.mkdir(dbname) 130 return dbname 131 132 133def is_direct_io_supported(dbname): 134 with tempfile.NamedTemporaryFile(dir=dbname) as f: 135 try: 136 os.open(f.name, os.O_DIRECT) 137 except BaseException: 138 return False 139 return True 140 141 142blackbox_default_params = { 143 # total time for this script to test db_stress 144 "duration": 6000, 145 # time for one db_stress instance to run 146 "interval": 120, 147 # since we will be killing anyway, use large value for ops_per_thread 148 "ops_per_thread": 100000000, 149 "set_options_one_in": 10000, 150 "test_batches_snapshots": 1, 151} 152 153whitebox_default_params = { 154 "duration": 10000, 155 "log2_keys_per_lock": 10, 156 "ops_per_thread": 200000, 157 "random_kill_odd": 888887, 158 "test_batches_snapshots": lambda: random.randint(0, 1), 159} 160 161simple_default_params = { 162 "allow_concurrent_memtable_write": lambda: random.randint(0, 1), 163 "column_families": 1, 164 "max_background_compactions": 1, 165 "max_bytes_for_level_base": 67108864, 166 "memtablerep": "skip_list", 167 "prefixpercent": 0, 168 "readpercent": 50, 169 "prefix_size" : -1, 170 "target_file_size_base": 16777216, 171 "target_file_size_multiplier": 1, 172 "test_batches_snapshots": 0, 173 "write_buffer_size": 32 * 1024 * 1024, 174 "level_compaction_dynamic_level_bytes": False, 175} 176 177blackbox_simple_default_params = { 178 "open_files": -1, 179 "set_options_one_in": 0, 180} 181 182whitebox_simple_default_params = {} 183 184cf_consistency_params = { 185 "disable_wal": lambda: random.randint(0, 1), 186 "reopen": 0, 187 "test_cf_consistency": 1, 188 # use small value for write_buffer_size so that RocksDB triggers flush 189 # more frequently 190 "write_buffer_size": 1024 * 1024, 191 "enable_pipelined_write": lambda: random.randint(0, 1), 192} 193 194txn_params = { 195 "use_txn" : 1, 196 # Avoid lambda to set it once for the entire test 197 "txn_write_policy": random.randint(0, 2), 198 "unordered_write": random.randint(0, 1), 199 "disable_wal": 0, 200 # OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns 201 "checkpoint_one_in": 0, 202 # pipeline write is not currnetly compatible with WritePrepared txns 203 "enable_pipelined_write": 0, 204} 205 206def finalize_and_sanitize(src_params): 207 dest_params = dict([(k, v() if callable(v) else v) 208 for (k, v) in src_params.items()]) 209 if dest_params.get("compression_type") != "zstd" or \ 210 dest_params.get("compression_max_dict_bytes") == 0: 211 dest_params["compression_zstd_max_train_bytes"] = 0 212 if dest_params.get("allow_concurrent_memtable_write", 1) == 1: 213 dest_params["memtablerep"] = "skip_list" 214 if dest_params["mmap_read"] == 1 or not is_direct_io_supported( 215 dest_params["db"]): 216 dest_params["use_direct_io_for_flush_and_compaction"] = 0 217 dest_params["use_direct_reads"] = 0 218 # DeleteRange is not currnetly compatible with Txns 219 if dest_params.get("test_batches_snapshots") == 1 or \ 220 dest_params.get("use_txn") == 1: 221 dest_params["delpercent"] += dest_params["delrangepercent"] 222 dest_params["delrangepercent"] = 0 223 # Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb 224 if dest_params.get("unordered_write", 0) == 1: 225 dest_params["txn_write_policy"] = 1 226 dest_params["allow_concurrent_memtable_write"] = 1 227 if dest_params.get("disable_wal", 0) == 1: 228 dest_params["atomic_flush"] = 1 229 dest_params["sync"] = 0 230 if dest_params.get("open_files", 1) != -1: 231 # Compaction TTL and periodic compactions are only compatible 232 # with open_files = -1 233 dest_params["compaction_ttl"] = 0 234 dest_params["periodic_compaction_seconds"] = 0 235 if dest_params.get("compaction_style", 0) == 2: 236 # Disable compaction TTL in FIFO compaction, because right 237 # now assertion failures are triggered. 238 dest_params["compaction_ttl"] = 0 239 dest_params["periodic_compaction_seconds"] = 0 240 if dest_params["partition_filters"] == 1: 241 if dest_params["index_type"] != 2: 242 dest_params["partition_filters"] = 0 243 else: 244 dest_params["use_block_based_filter"] = 0 245 if dest_params.get("atomic_flush", 0) == 1: 246 # disable pipelined write when atomic flush is used. 247 dest_params["enable_pipelined_write"] = 0 248 return dest_params 249 250def gen_cmd_params(args): 251 params = {} 252 253 params.update(default_params) 254 if args.test_type == 'blackbox': 255 params.update(blackbox_default_params) 256 if args.test_type == 'whitebox': 257 params.update(whitebox_default_params) 258 if args.simple: 259 params.update(simple_default_params) 260 if args.test_type == 'blackbox': 261 params.update(blackbox_simple_default_params) 262 if args.test_type == 'whitebox': 263 params.update(whitebox_simple_default_params) 264 if args.cf_consistency: 265 params.update(cf_consistency_params) 266 if args.txn: 267 params.update(txn_params) 268 269 for k, v in vars(args).items(): 270 if v is not None: 271 params[k] = v 272 return params 273 274 275def gen_cmd(params, unknown_params): 276 finalzied_params = finalize_and_sanitize(params) 277 cmd = ['./db_stress'] + [ 278 '--{0}={1}'.format(k, v) 279 for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)] 280 if k not in set(['test_type', 'simple', 'duration', 'interval', 281 'random_kill_odd', 'cf_consistency', 'txn']) 282 and v is not None] + unknown_params 283 return cmd 284 285 286# This script runs and kills db_stress multiple times. It checks consistency 287# in case of unsafe crashes in RocksDB. 288def blackbox_crash_main(args, unknown_args): 289 cmd_params = gen_cmd_params(args) 290 dbname = get_dbname('blackbox') 291 exit_time = time.time() + cmd_params['duration'] 292 293 print("Running blackbox-crash-test with \n" 294 + "interval_between_crash=" + str(cmd_params['interval']) + "\n" 295 + "total-duration=" + str(cmd_params['duration']) + "\n") 296 297 while time.time() < exit_time: 298 run_had_errors = False 299 killtime = time.time() + cmd_params['interval'] 300 301 cmd = gen_cmd(dict( 302 list(cmd_params.items()) 303 + list({'db': dbname}.items())), unknown_args) 304 305 child = subprocess.Popen(cmd, stderr=subprocess.PIPE) 306 print("Running db_stress with pid=%d: %s\n\n" 307 % (child.pid, ' '.join(cmd))) 308 309 stop_early = False 310 while time.time() < killtime: 311 if child.poll() is not None: 312 print("WARNING: db_stress ended before kill: exitcode=%d\n" 313 % child.returncode) 314 stop_early = True 315 break 316 time.sleep(1) 317 318 if not stop_early: 319 if child.poll() is not None: 320 print("WARNING: db_stress ended before kill: exitcode=%d\n" 321 % child.returncode) 322 else: 323 child.kill() 324 print("KILLED %d\n" % child.pid) 325 time.sleep(1) # time to stabilize after a kill 326 327 while True: 328 line = child.stderr.readline().strip().decode('utf-8') 329 if line == '': 330 break 331 elif not line.startswith('WARNING'): 332 run_had_errors = True 333 print('stderr has error message:') 334 print('***' + line + '***') 335 336 if run_had_errors: 337 sys.exit(2) 338 339 time.sleep(1) # time to stabilize before the next run 340 341 # we need to clean up after ourselves -- only do this on test success 342 shutil.rmtree(dbname, True) 343 344 345# This python script runs db_stress multiple times. Some runs with 346# kill_random_test that causes rocksdb to crash at various points in code. 347def whitebox_crash_main(args, unknown_args): 348 cmd_params = gen_cmd_params(args) 349 dbname = get_dbname('whitebox') 350 351 cur_time = time.time() 352 exit_time = cur_time + cmd_params['duration'] 353 half_time = cur_time + cmd_params['duration'] // 2 354 355 print("Running whitebox-crash-test with \n" 356 + "total-duration=" + str(cmd_params['duration']) + "\n") 357 358 total_check_mode = 4 359 check_mode = 0 360 kill_random_test = cmd_params['random_kill_odd'] 361 kill_mode = 0 362 363 while time.time() < exit_time: 364 if check_mode == 0: 365 additional_opts = { 366 # use large ops per thread since we will kill it anyway 367 "ops_per_thread": 100 * cmd_params['ops_per_thread'], 368 } 369 # run with kill_random_test, with three modes. 370 # Mode 0 covers all kill points. Mode 1 covers less kill points but 371 # increases change of triggering them. Mode 2 covers even less 372 # frequent kill points and further increases triggering change. 373 if kill_mode == 0: 374 additional_opts.update({ 375 "kill_random_test": kill_random_test, 376 }) 377 elif kill_mode == 1: 378 if cmd_params.get('disable_wal', 0) == 1: 379 my_kill_odd = kill_random_test // 50 + 1 380 else: 381 my_kill_odd = kill_random_test // 10 + 1 382 additional_opts.update({ 383 "kill_random_test": my_kill_odd, 384 "kill_prefix_blacklist": "WritableFileWriter::Append," 385 + "WritableFileWriter::WriteBuffered", 386 }) 387 elif kill_mode == 2: 388 # TODO: May need to adjust random odds if kill_random_test 389 # is too small. 390 additional_opts.update({ 391 "kill_random_test": (kill_random_test // 5000 + 1), 392 "kill_prefix_blacklist": "WritableFileWriter::Append," 393 "WritableFileWriter::WriteBuffered," 394 "PosixMmapFile::Allocate,WritableFileWriter::Flush", 395 }) 396 # Run kill mode 0, 1 and 2 by turn. 397 kill_mode = (kill_mode + 1) % 3 398 elif check_mode == 1: 399 # normal run with universal compaction mode 400 additional_opts = { 401 "kill_random_test": None, 402 "ops_per_thread": cmd_params['ops_per_thread'], 403 "compaction_style": 1, 404 } 405 elif check_mode == 2: 406 # normal run with FIFO compaction mode 407 # ops_per_thread is divided by 5 because FIFO compaction 408 # style is quite a bit slower on reads with lot of files 409 additional_opts = { 410 "kill_random_test": None, 411 "ops_per_thread": cmd_params['ops_per_thread'] // 5, 412 "compaction_style": 2, 413 } 414 else: 415 # normal run 416 additional_opts = { 417 "kill_random_test": None, 418 "ops_per_thread": cmd_params['ops_per_thread'], 419 } 420 421 cmd = gen_cmd(dict(list(cmd_params.items()) 422 + list(additional_opts.items()) 423 + list({'db': dbname}.items())), unknown_args) 424 425 print("Running:" + ' '.join(cmd) + "\n") # noqa: E999 T25377293 Grandfathered in 426 427 popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, 428 stderr=subprocess.STDOUT) 429 stdoutdata, stderrdata = popen.communicate() 430 if stdoutdata: 431 stdoutdata = stdoutdata.decode('utf-8') 432 if stderrdata: 433 stderrdata = stderrdata.decode('utf-8') 434 retncode = popen.returncode 435 msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format( 436 check_mode, additional_opts['kill_random_test'], retncode)) 437 print(msg) 438 print(stdoutdata) 439 440 expected = False 441 if additional_opts['kill_random_test'] is None and (retncode == 0): 442 # we expect zero retncode if no kill option 443 expected = True 444 elif additional_opts['kill_random_test'] is not None and retncode <= 0: 445 # When kill option is given, the test MIGHT kill itself. 446 # If it does, negative retncode is expected. Otherwise 0. 447 expected = True 448 449 if not expected: 450 print("TEST FAILED. See kill option and exit code above!!!\n") 451 sys.exit(1) 452 453 stdoutdata = stdoutdata.lower() 454 errorcount = (stdoutdata.count('error') - 455 stdoutdata.count('got errors 0 times')) 456 print("#times error occurred in output is " + str(errorcount) + "\n") 457 458 if (errorcount > 0): 459 print("TEST FAILED. Output has 'error'!!!\n") 460 sys.exit(2) 461 if (stdoutdata.find('fail') >= 0): 462 print("TEST FAILED. Output has 'fail'!!!\n") 463 sys.exit(2) 464 465 # First half of the duration, keep doing kill test. For the next half, 466 # try different modes. 467 if time.time() > half_time: 468 # we need to clean up after ourselves -- only do this on test 469 # success 470 shutil.rmtree(dbname, True) 471 os.mkdir(dbname) 472 cmd_params.pop('expected_values_path', None) 473 check_mode = (check_mode + 1) % total_check_mode 474 475 time.sleep(1) # time to stabilize after a kill 476 477 478def main(): 479 parser = argparse.ArgumentParser(description="This script runs and kills \ 480 db_stress multiple times") 481 parser.add_argument("test_type", choices=["blackbox", "whitebox"]) 482 parser.add_argument("--simple", action="store_true") 483 parser.add_argument("--cf_consistency", action='store_true') 484 parser.add_argument("--txn", action='store_true') 485 486 all_params = dict(list(default_params.items()) 487 + list(blackbox_default_params.items()) 488 + list(whitebox_default_params.items()) 489 + list(simple_default_params.items()) 490 + list(blackbox_simple_default_params.items()) 491 + list(whitebox_simple_default_params.items())) 492 493 for k, v in all_params.items(): 494 parser.add_argument("--" + k, type=type(v() if callable(v) else v)) 495 # unknown_args are passed directly to db_stress 496 args, unknown_args = parser.parse_known_args() 497 498 test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR) 499 if test_tmpdir is not None and not os.path.isdir(test_tmpdir): 500 print('%s env var is set to a non-existent directory: %s' % 501 (_TEST_DIR_ENV_VAR, test_tmpdir)) 502 sys.exit(1) 503 504 if args.test_type == 'blackbox': 505 blackbox_crash_main(args, unknown_args) 506 if args.test_type == 'whitebox': 507 whitebox_crash_main(args, unknown_args) 508 509if __name__ == '__main__': 510 main() 511