1#!/usr/bin/env python
2# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
3from __future__ import absolute_import, division, print_function, unicode_literals
4
5import os
6import sys
7import time
8import random
9import tempfile
10import subprocess
11import shutil
12import argparse
13
14# params overwrite priority:
15#   for default:
16#       default_params < {blackbox,whitebox}_default_params < args
17#   for simple:
18#       default_params < {blackbox,whitebox}_default_params <
19#       simple_default_params <
20#       {blackbox,whitebox}_simple_default_params < args
21#   for cf_consistency:
22#       default_params < {blackbox,whitebox}_default_params <
23#       cf_consistency_params < args
24#   for txn:
25#       default_params < {blackbox,whitebox}_default_params < txn_params < args
26
27expected_values_file = tempfile.NamedTemporaryFile()
28
29default_params = {
30    "acquire_snapshot_one_in": 10000,
31    "block_size": 16384,
32    "bloom_bits": lambda: random.choice([random.randint(0,19),
33                                         random.lognormvariate(2.3, 1.3)]),
34    "cache_index_and_filter_blocks": lambda: random.randint(0, 1),
35    "cache_size": 1048576,
36    "checkpoint_one_in": 1000000,
37    "compression_type": lambda: random.choice(
38        ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress", "zstd"]),
39    "bottommost_compression_type": lambda:
40        "disable" if random.randint(0, 1) == 0 else
41        random.choice(
42            ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress",
43             "zstd"]),
44    "checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64"]),
45    "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
46    "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
47    "clear_column_family_one_in": 0,
48    "compact_files_one_in": 1000000,
49    "compact_range_one_in": 1000000,
50    "delpercent": 4,
51    "delrangepercent": 1,
52    "destroy_db_initially": 0,
53    "enable_pipelined_write": lambda: random.randint(0, 1),
54    "expected_values_path": expected_values_file.name,
55    "flush_one_in": 1000000,
56    "get_live_files_one_in": 1000000,
57    # Note: the following two are intentionally disabled as the corresponding
58    # APIs are not guaranteed to succeed.
59    "get_sorted_wal_files_one_in": 0,
60    "get_current_wal_file_one_in": 0,
61    # Temporarily disable hash index
62    "index_type": lambda: random.choice([0,2]),
63    "max_background_compactions": 20,
64    "max_bytes_for_level_base": 10485760,
65    "max_key": 100000000,
66    "max_write_buffer_number": 3,
67    "mmap_read": lambda: random.randint(0, 1),
68    "nooverwritepercent": 1,
69    "open_files": lambda : random.choice([-1, 500000]),
70    "partition_filters": lambda: random.randint(0, 1),
71    "pause_background_one_in": 1000000,
72    "prefixpercent": 5,
73    "progress_reports": 0,
74    "readpercent": 45,
75    "recycle_log_file_num": lambda: random.randint(0, 1),
76    "reopen": 20,
77    "snapshot_hold_ops": 100000,
78    "long_running_snapshots": lambda: random.randint(0, 1),
79    "subcompactions": lambda: random.randint(1, 4),
80    "target_file_size_base": 2097152,
81    "target_file_size_multiplier": 2,
82    "use_direct_reads": lambda: random.randint(0, 1),
83    "use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
84    "use_full_merge_v1": lambda: random.randint(0, 1),
85    "use_merge": lambda: random.randint(0, 1),
86    "verify_checksum": 1,
87    "write_buffer_size": 4 * 1024 * 1024,
88    "writepercent": 35,
89    "format_version": lambda: random.choice([2, 3, 4, 5, 5]),
90    "index_block_restart_interval": lambda: random.choice(range(1, 16)),
91    "use_multiget" : lambda: random.randint(0, 1),
92    "periodic_compaction_seconds" :
93        lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
94    "compaction_ttl" : lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
95    # Test small max_manifest_file_size in a smaller chance, as most of the
96    # time we wnat manifest history to be preserved to help debug
97    "max_manifest_file_size" : lambda : random.choice(
98        [t * 16384 if t < 3 else 1024 * 1024 * 1024 for t in range(1, 30)]),
99    # Sync mode might make test runs slower so running it in a smaller chance
100    "sync" : lambda : random.choice(
101        [1 if t == 0 else 0 for t in range(0, 20)]),
102    # Disable compation_readahead_size because the test is not passing.
103    #"compaction_readahead_size" : lambda : random.choice(
104    #    [0, 0, 1024 * 1024]),
105    "db_write_buffer_size" : lambda: random.choice(
106        [0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024]),
107    "avoid_unnecessary_blocking_io" : random.randint(0, 1),
108    "write_dbid_to_manifest" : random.randint(0, 1),
109    "max_write_batch_group_size_bytes" : lambda: random.choice(
110        [16, 64, 1024 * 1024, 16 * 1024 * 1024]),
111    "level_compaction_dynamic_level_bytes" : True,
112    "verify_checksum_one_in": 1000000,
113    "verify_db_one_in": 100000,
114    "continuous_verification_interval" : 0,
115    "max_key_len": 3,
116    "key_len_percent_dist": "1,30,69"
117}
118
119_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
120
121
122def get_dbname(test_name):
123    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
124    if test_tmpdir is None or test_tmpdir == "":
125        dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name)
126    else:
127        dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name
128        shutil.rmtree(dbname, True)
129        os.mkdir(dbname)
130    return dbname
131
132
133def is_direct_io_supported(dbname):
134    with tempfile.NamedTemporaryFile(dir=dbname) as f:
135        try:
136            os.open(f.name, os.O_DIRECT)
137        except BaseException:
138            return False
139        return True
140
141
142blackbox_default_params = {
143    # total time for this script to test db_stress
144    "duration": 6000,
145    # time for one db_stress instance to run
146    "interval": 120,
147    # since we will be killing anyway, use large value for ops_per_thread
148    "ops_per_thread": 100000000,
149    "set_options_one_in": 10000,
150    "test_batches_snapshots": 1,
151}
152
153whitebox_default_params = {
154    "duration": 10000,
155    "log2_keys_per_lock": 10,
156    "ops_per_thread": 200000,
157    "random_kill_odd": 888887,
158    "test_batches_snapshots": lambda: random.randint(0, 1),
159}
160
161simple_default_params = {
162    "allow_concurrent_memtable_write": lambda: random.randint(0, 1),
163    "column_families": 1,
164    "max_background_compactions": 1,
165    "max_bytes_for_level_base": 67108864,
166    "memtablerep": "skip_list",
167    "prefixpercent": 0,
168    "readpercent": 50,
169    "prefix_size" : -1,
170    "target_file_size_base": 16777216,
171    "target_file_size_multiplier": 1,
172    "test_batches_snapshots": 0,
173    "write_buffer_size": 32 * 1024 * 1024,
174    "level_compaction_dynamic_level_bytes": False,
175}
176
177blackbox_simple_default_params = {
178    "open_files": -1,
179    "set_options_one_in": 0,
180}
181
182whitebox_simple_default_params = {}
183
184cf_consistency_params = {
185    "disable_wal": lambda: random.randint(0, 1),
186    "reopen": 0,
187    "test_cf_consistency": 1,
188    # use small value for write_buffer_size so that RocksDB triggers flush
189    # more frequently
190    "write_buffer_size": 1024 * 1024,
191    "enable_pipelined_write": lambda: random.randint(0, 1),
192}
193
194txn_params = {
195    "use_txn" : 1,
196    # Avoid lambda to set it once for the entire test
197    "txn_write_policy": random.randint(0, 2),
198    "unordered_write": random.randint(0, 1),
199    "disable_wal": 0,
200    # OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns
201    "checkpoint_one_in": 0,
202    # pipeline write is not currnetly compatible with WritePrepared txns
203    "enable_pipelined_write": 0,
204}
205
206def finalize_and_sanitize(src_params):
207    dest_params = dict([(k,  v() if callable(v) else v)
208                        for (k, v) in src_params.items()])
209    if dest_params.get("compression_type") != "zstd" or \
210            dest_params.get("compression_max_dict_bytes") == 0:
211        dest_params["compression_zstd_max_train_bytes"] = 0
212    if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
213        dest_params["memtablerep"] = "skip_list"
214    if dest_params["mmap_read"] == 1 or not is_direct_io_supported(
215            dest_params["db"]):
216        dest_params["use_direct_io_for_flush_and_compaction"] = 0
217        dest_params["use_direct_reads"] = 0
218    # DeleteRange is not currnetly compatible with Txns
219    if dest_params.get("test_batches_snapshots") == 1 or \
220            dest_params.get("use_txn") == 1:
221        dest_params["delpercent"] += dest_params["delrangepercent"]
222        dest_params["delrangepercent"] = 0
223    # Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb
224    if dest_params.get("unordered_write", 0) == 1:
225        dest_params["txn_write_policy"] = 1
226        dest_params["allow_concurrent_memtable_write"] = 1
227    if dest_params.get("disable_wal", 0) == 1:
228        dest_params["atomic_flush"] = 1
229        dest_params["sync"] = 0
230    if dest_params.get("open_files", 1) != -1:
231        # Compaction TTL and periodic compactions are only compatible
232        # with open_files = -1
233        dest_params["compaction_ttl"] = 0
234        dest_params["periodic_compaction_seconds"] = 0
235    if dest_params.get("compaction_style", 0) == 2:
236        # Disable compaction TTL in FIFO compaction, because right
237        # now assertion failures are triggered.
238        dest_params["compaction_ttl"] = 0
239        dest_params["periodic_compaction_seconds"] = 0
240    if dest_params["partition_filters"] == 1:
241        if dest_params["index_type"] != 2:
242            dest_params["partition_filters"] = 0
243        else:
244            dest_params["use_block_based_filter"] = 0
245    if dest_params.get("atomic_flush", 0) == 1:
246        # disable pipelined write when atomic flush is used.
247        dest_params["enable_pipelined_write"] = 0
248    return dest_params
249
250def gen_cmd_params(args):
251    params = {}
252
253    params.update(default_params)
254    if args.test_type == 'blackbox':
255        params.update(blackbox_default_params)
256    if args.test_type == 'whitebox':
257        params.update(whitebox_default_params)
258    if args.simple:
259        params.update(simple_default_params)
260        if args.test_type == 'blackbox':
261            params.update(blackbox_simple_default_params)
262        if args.test_type == 'whitebox':
263            params.update(whitebox_simple_default_params)
264    if args.cf_consistency:
265        params.update(cf_consistency_params)
266    if args.txn:
267        params.update(txn_params)
268
269    for k, v in vars(args).items():
270        if v is not None:
271            params[k] = v
272    return params
273
274
275def gen_cmd(params, unknown_params):
276    finalzied_params = finalize_and_sanitize(params)
277    cmd = ['./db_stress'] + [
278        '--{0}={1}'.format(k, v)
279        for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
280        if k not in set(['test_type', 'simple', 'duration', 'interval',
281                         'random_kill_odd', 'cf_consistency', 'txn'])
282        and v is not None] + unknown_params
283    return cmd
284
285
286# This script runs and kills db_stress multiple times. It checks consistency
287# in case of unsafe crashes in RocksDB.
288def blackbox_crash_main(args, unknown_args):
289    cmd_params = gen_cmd_params(args)
290    dbname = get_dbname('blackbox')
291    exit_time = time.time() + cmd_params['duration']
292
293    print("Running blackbox-crash-test with \n"
294          + "interval_between_crash=" + str(cmd_params['interval']) + "\n"
295          + "total-duration=" + str(cmd_params['duration']) + "\n")
296
297    while time.time() < exit_time:
298        run_had_errors = False
299        killtime = time.time() + cmd_params['interval']
300
301        cmd = gen_cmd(dict(
302            list(cmd_params.items())
303            + list({'db': dbname}.items())), unknown_args)
304
305        child = subprocess.Popen(cmd, stderr=subprocess.PIPE)
306        print("Running db_stress with pid=%d: %s\n\n"
307              % (child.pid, ' '.join(cmd)))
308
309        stop_early = False
310        while time.time() < killtime:
311            if child.poll() is not None:
312                print("WARNING: db_stress ended before kill: exitcode=%d\n"
313                      % child.returncode)
314                stop_early = True
315                break
316            time.sleep(1)
317
318        if not stop_early:
319            if child.poll() is not None:
320                print("WARNING: db_stress ended before kill: exitcode=%d\n"
321                      % child.returncode)
322            else:
323                child.kill()
324                print("KILLED %d\n" % child.pid)
325                time.sleep(1)  # time to stabilize after a kill
326
327        while True:
328            line = child.stderr.readline().strip().decode('utf-8')
329            if line == '':
330                break
331            elif not line.startswith('WARNING'):
332                run_had_errors = True
333                print('stderr has error message:')
334                print('***' + line + '***')
335
336        if run_had_errors:
337            sys.exit(2)
338
339        time.sleep(1)  # time to stabilize before the next run
340
341    # we need to clean up after ourselves -- only do this on test success
342    shutil.rmtree(dbname, True)
343
344
345# This python script runs db_stress multiple times. Some runs with
346# kill_random_test that causes rocksdb to crash at various points in code.
347def whitebox_crash_main(args, unknown_args):
348    cmd_params = gen_cmd_params(args)
349    dbname = get_dbname('whitebox')
350
351    cur_time = time.time()
352    exit_time = cur_time + cmd_params['duration']
353    half_time = cur_time + cmd_params['duration'] // 2
354
355    print("Running whitebox-crash-test with \n"
356          + "total-duration=" + str(cmd_params['duration']) + "\n")
357
358    total_check_mode = 4
359    check_mode = 0
360    kill_random_test = cmd_params['random_kill_odd']
361    kill_mode = 0
362
363    while time.time() < exit_time:
364        if check_mode == 0:
365            additional_opts = {
366                # use large ops per thread since we will kill it anyway
367                "ops_per_thread": 100 * cmd_params['ops_per_thread'],
368            }
369            # run with kill_random_test, with three modes.
370            # Mode 0 covers all kill points. Mode 1 covers less kill points but
371            # increases change of triggering them. Mode 2 covers even less
372            # frequent kill points and further increases triggering change.
373            if kill_mode == 0:
374                additional_opts.update({
375                    "kill_random_test": kill_random_test,
376                })
377            elif kill_mode == 1:
378                if cmd_params.get('disable_wal', 0) == 1:
379                    my_kill_odd = kill_random_test // 50 + 1
380                else:
381                    my_kill_odd = kill_random_test // 10 + 1
382                additional_opts.update({
383                    "kill_random_test": my_kill_odd,
384                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
385                    + "WritableFileWriter::WriteBuffered",
386                })
387            elif kill_mode == 2:
388                # TODO: May need to adjust random odds if kill_random_test
389                # is too small.
390                additional_opts.update({
391                    "kill_random_test": (kill_random_test // 5000 + 1),
392                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
393                    "WritableFileWriter::WriteBuffered,"
394                    "PosixMmapFile::Allocate,WritableFileWriter::Flush",
395                })
396            # Run kill mode 0, 1 and 2 by turn.
397            kill_mode = (kill_mode + 1) % 3
398        elif check_mode == 1:
399            # normal run with universal compaction mode
400            additional_opts = {
401                "kill_random_test": None,
402                "ops_per_thread": cmd_params['ops_per_thread'],
403                "compaction_style": 1,
404            }
405        elif check_mode == 2:
406            # normal run with FIFO compaction mode
407            # ops_per_thread is divided by 5 because FIFO compaction
408            # style is quite a bit slower on reads with lot of files
409            additional_opts = {
410                "kill_random_test": None,
411                "ops_per_thread": cmd_params['ops_per_thread'] // 5,
412                "compaction_style": 2,
413            }
414        else:
415            # normal run
416            additional_opts = {
417                "kill_random_test": None,
418                "ops_per_thread": cmd_params['ops_per_thread'],
419            }
420
421        cmd = gen_cmd(dict(list(cmd_params.items())
422            + list(additional_opts.items())
423            + list({'db': dbname}.items())), unknown_args)
424
425        print("Running:" + ' '.join(cmd) + "\n")  # noqa: E999 T25377293 Grandfathered in
426
427        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,
428                                 stderr=subprocess.STDOUT)
429        stdoutdata, stderrdata = popen.communicate()
430        if stdoutdata:
431            stdoutdata = stdoutdata.decode('utf-8')
432        if stderrdata:
433            stderrdata = stderrdata.decode('utf-8')
434        retncode = popen.returncode
435        msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format(
436               check_mode, additional_opts['kill_random_test'], retncode))
437        print(msg)
438        print(stdoutdata)
439
440        expected = False
441        if additional_opts['kill_random_test'] is None and (retncode == 0):
442            # we expect zero retncode if no kill option
443            expected = True
444        elif additional_opts['kill_random_test'] is not None and retncode <= 0:
445            # When kill option is given, the test MIGHT kill itself.
446            # If it does, negative retncode is expected. Otherwise 0.
447            expected = True
448
449        if not expected:
450            print("TEST FAILED. See kill option and exit code above!!!\n")
451            sys.exit(1)
452
453        stdoutdata = stdoutdata.lower()
454        errorcount = (stdoutdata.count('error') -
455                      stdoutdata.count('got errors 0 times'))
456        print("#times error occurred in output is " + str(errorcount) + "\n")
457
458        if (errorcount > 0):
459            print("TEST FAILED. Output has 'error'!!!\n")
460            sys.exit(2)
461        if (stdoutdata.find('fail') >= 0):
462            print("TEST FAILED. Output has 'fail'!!!\n")
463            sys.exit(2)
464
465        # First half of the duration, keep doing kill test. For the next half,
466        # try different modes.
467        if time.time() > half_time:
468            # we need to clean up after ourselves -- only do this on test
469            # success
470            shutil.rmtree(dbname, True)
471            os.mkdir(dbname)
472            cmd_params.pop('expected_values_path', None)
473            check_mode = (check_mode + 1) % total_check_mode
474
475        time.sleep(1)  # time to stabilize after a kill
476
477
478def main():
479    parser = argparse.ArgumentParser(description="This script runs and kills \
480        db_stress multiple times")
481    parser.add_argument("test_type", choices=["blackbox", "whitebox"])
482    parser.add_argument("--simple", action="store_true")
483    parser.add_argument("--cf_consistency", action='store_true')
484    parser.add_argument("--txn", action='store_true')
485
486    all_params = dict(list(default_params.items())
487                      + list(blackbox_default_params.items())
488                      + list(whitebox_default_params.items())
489                      + list(simple_default_params.items())
490                      + list(blackbox_simple_default_params.items())
491                      + list(whitebox_simple_default_params.items()))
492
493    for k, v in all_params.items():
494        parser.add_argument("--" + k, type=type(v() if callable(v) else v))
495    # unknown_args are passed directly to db_stress
496    args, unknown_args = parser.parse_known_args()
497
498    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
499    if test_tmpdir is not None and not os.path.isdir(test_tmpdir):
500        print('%s env var is set to a non-existent directory: %s' %
501                (_TEST_DIR_ENV_VAR, test_tmpdir))
502        sys.exit(1)
503
504    if args.test_type == 'blackbox':
505        blackbox_crash_main(args, unknown_args)
506    if args.test_type == 'whitebox':
507        whitebox_crash_main(args, unknown_args)
508
509if __name__ == '__main__':
510    main()
511