1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10
11 #ifdef GFLAGS
12 #include "db_stress_tool/db_stress_common.h"
13 #include "db_stress_tool/db_stress_driver.h"
14 #include "rocksdb/convenience.h"
15 #include "rocksdb/sst_file_manager.h"
16
17 namespace ROCKSDB_NAMESPACE {
StressTest()18 StressTest::StressTest()
19 : cache_(NewCache(FLAGS_cache_size)),
20 compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
21 filter_policy_(FLAGS_bloom_bits >= 0
22 ? FLAGS_use_block_based_filter
23 ? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
24 : NewBloomFilterPolicy(FLAGS_bloom_bits, false)
25 : nullptr),
26 db_(nullptr),
27 #ifndef ROCKSDB_LITE
28 txn_db_(nullptr),
29 #endif
30 new_column_family_name_(1),
31 num_times_reopened_(0),
32 db_preload_finished_(false),
33 cmp_db_(nullptr) {
34 if (FLAGS_destroy_db_initially) {
35 std::vector<std::string> files;
36 db_stress_env->GetChildren(FLAGS_db, &files);
37 for (unsigned int i = 0; i < files.size(); i++) {
38 if (Slice(files[i]).starts_with("heap-")) {
39 db_stress_env->DeleteFile(FLAGS_db + "/" + files[i]);
40 }
41 }
42
43 Options options;
44 options.env = db_stress_env;
45 // Remove files without preserving manfiest files
46 #ifndef ROCKSDB_LITE
47 const Status s = !FLAGS_use_blob_db
48 ? DestroyDB(FLAGS_db, options)
49 : blob_db::DestroyBlobDB(FLAGS_db, options,
50 blob_db::BlobDBOptions());
51 #else
52 const Status s = DestroyDB(FLAGS_db, options);
53 #endif // !ROCKSDB_LITE
54
55 if (!s.ok()) {
56 fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str());
57 exit(1);
58 }
59 }
60 }
61
~StressTest()62 StressTest::~StressTest() {
63 for (auto cf : column_families_) {
64 delete cf;
65 }
66 column_families_.clear();
67 delete db_;
68
69 assert(secondaries_.size() == secondary_cfh_lists_.size());
70 size_t n = secondaries_.size();
71 for (size_t i = 0; i != n; ++i) {
72 for (auto* cf : secondary_cfh_lists_[i]) {
73 delete cf;
74 }
75 secondary_cfh_lists_[i].clear();
76 delete secondaries_[i];
77 }
78 secondaries_.clear();
79
80 for (auto* cf : cmp_cfhs_) {
81 delete cf;
82 }
83 cmp_cfhs_.clear();
84 delete cmp_db_;
85 }
86
NewCache(size_t capacity)87 std::shared_ptr<Cache> StressTest::NewCache(size_t capacity) {
88 if (capacity <= 0) {
89 return nullptr;
90 }
91 if (FLAGS_use_clock_cache) {
92 auto cache = NewClockCache((size_t)capacity);
93 if (!cache) {
94 fprintf(stderr, "Clock cache not supported.");
95 exit(1);
96 }
97 return cache;
98 } else {
99 return NewLRUCache((size_t)capacity);
100 }
101 }
102
BuildOptionsTable()103 bool StressTest::BuildOptionsTable() {
104 if (FLAGS_set_options_one_in <= 0) {
105 return true;
106 }
107
108 std::unordered_map<std::string, std::vector<std::string>> options_tbl = {
109 {"write_buffer_size",
110 {ToString(options_.write_buffer_size),
111 ToString(options_.write_buffer_size * 2),
112 ToString(options_.write_buffer_size * 4)}},
113 {"max_write_buffer_number",
114 {ToString(options_.max_write_buffer_number),
115 ToString(options_.max_write_buffer_number * 2),
116 ToString(options_.max_write_buffer_number * 4)}},
117 {"arena_block_size",
118 {
119 ToString(options_.arena_block_size),
120 ToString(options_.write_buffer_size / 4),
121 ToString(options_.write_buffer_size / 8),
122 }},
123 {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}},
124 {"max_successive_merges", {"0", "2", "4"}},
125 {"inplace_update_num_locks", {"100", "200", "300"}},
126 // TODO(ljin): enable test for this option
127 // {"disable_auto_compactions", {"100", "200", "300"}},
128 {"soft_rate_limit", {"0", "0.5", "0.9"}},
129 {"hard_rate_limit", {"0", "1.1", "2.0"}},
130 {"level0_file_num_compaction_trigger",
131 {
132 ToString(options_.level0_file_num_compaction_trigger),
133 ToString(options_.level0_file_num_compaction_trigger + 2),
134 ToString(options_.level0_file_num_compaction_trigger + 4),
135 }},
136 {"level0_slowdown_writes_trigger",
137 {
138 ToString(options_.level0_slowdown_writes_trigger),
139 ToString(options_.level0_slowdown_writes_trigger + 2),
140 ToString(options_.level0_slowdown_writes_trigger + 4),
141 }},
142 {"level0_stop_writes_trigger",
143 {
144 ToString(options_.level0_stop_writes_trigger),
145 ToString(options_.level0_stop_writes_trigger + 2),
146 ToString(options_.level0_stop_writes_trigger + 4),
147 }},
148 {"max_compaction_bytes",
149 {
150 ToString(options_.target_file_size_base * 5),
151 ToString(options_.target_file_size_base * 15),
152 ToString(options_.target_file_size_base * 100),
153 }},
154 {"target_file_size_base",
155 {
156 ToString(options_.target_file_size_base),
157 ToString(options_.target_file_size_base * 2),
158 ToString(options_.target_file_size_base * 4),
159 }},
160 {"target_file_size_multiplier",
161 {
162 ToString(options_.target_file_size_multiplier),
163 "1",
164 "2",
165 }},
166 {"max_bytes_for_level_base",
167 {
168 ToString(options_.max_bytes_for_level_base / 2),
169 ToString(options_.max_bytes_for_level_base),
170 ToString(options_.max_bytes_for_level_base * 2),
171 }},
172 {"max_bytes_for_level_multiplier",
173 {
174 ToString(options_.max_bytes_for_level_multiplier),
175 "1",
176 "2",
177 }},
178 {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
179 };
180
181 options_table_ = std::move(options_tbl);
182
183 for (const auto& iter : options_table_) {
184 options_index_.push_back(iter.first);
185 }
186 return true;
187 }
188
InitDb()189 void StressTest::InitDb() {
190 uint64_t now = db_stress_env->NowMicros();
191 fprintf(stdout, "%s Initializing db_stress\n",
192 db_stress_env->TimeToString(now / 1000000).c_str());
193 PrintEnv();
194 Open();
195 BuildOptionsTable();
196 }
197
InitReadonlyDb(SharedState * shared)198 void StressTest::InitReadonlyDb(SharedState* shared) {
199 uint64_t now = db_stress_env->NowMicros();
200 fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
201 db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
202 PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
203 }
204
VerifySecondaries()205 bool StressTest::VerifySecondaries() {
206 #ifndef ROCKSDB_LITE
207 if (FLAGS_test_secondary) {
208 uint64_t now = db_stress_env->NowMicros();
209 fprintf(
210 stdout, "%s Start to verify secondaries against primary\n",
211 db_stress_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
212 }
213 for (size_t k = 0; k != secondaries_.size(); ++k) {
214 Status s = secondaries_[k]->TryCatchUpWithPrimary();
215 if (!s.ok()) {
216 fprintf(stderr, "Secondary failed to catch up with primary\n");
217 return false;
218 }
219 ReadOptions ropts;
220 ropts.total_order_seek = true;
221 // Verify only the default column family since the primary may have
222 // dropped other column families after most recent reopen.
223 std::unique_ptr<Iterator> iter1(db_->NewIterator(ropts));
224 std::unique_ptr<Iterator> iter2(secondaries_[k]->NewIterator(ropts));
225 for (iter1->SeekToFirst(), iter2->SeekToFirst();
226 iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
227 if (iter1->key().compare(iter2->key()) != 0 ||
228 iter1->value().compare(iter2->value())) {
229 fprintf(stderr,
230 "Secondary %d contains different data from "
231 "primary.\nPrimary: %s : %s\nSecondary: %s : %s\n",
232 static_cast<int>(k),
233 iter1->key().ToString(/*hex=*/true).c_str(),
234 iter1->value().ToString(/*hex=*/true).c_str(),
235 iter2->key().ToString(/*hex=*/true).c_str(),
236 iter2->value().ToString(/*hex=*/true).c_str());
237 return false;
238 }
239 }
240 if (iter1->Valid() && !iter2->Valid()) {
241 fprintf(stderr,
242 "Secondary %d record count is smaller than that of primary\n",
243 static_cast<int>(k));
244 return false;
245 } else if (!iter1->Valid() && iter2->Valid()) {
246 fprintf(stderr,
247 "Secondary %d record count is larger than that of primary\n",
248 static_cast<int>(k));
249 return false;
250 }
251 }
252 if (FLAGS_test_secondary) {
253 uint64_t now = db_stress_env->NowMicros();
254 fprintf(
255 stdout, "%s Verification of secondaries succeeded\n",
256 db_stress_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
257 }
258 #endif // ROCKSDB_LITE
259 return true;
260 }
261
AssertSame(DB * db,ColumnFamilyHandle * cf,ThreadState::SnapshotState & snap_state)262 Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
263 ThreadState::SnapshotState& snap_state) {
264 Status s;
265 if (cf->GetName() != snap_state.cf_at_name) {
266 return s;
267 }
268 ReadOptions ropt;
269 ropt.snapshot = snap_state.snapshot;
270 PinnableSlice exp_v(&snap_state.value);
271 exp_v.PinSelf();
272 PinnableSlice v;
273 s = db->Get(ropt, cf, snap_state.key, &v);
274 if (!s.ok() && !s.IsNotFound()) {
275 return s;
276 }
277 if (snap_state.status != s) {
278 return Status::Corruption(
279 "The snapshot gave inconsistent results for key " +
280 ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) +
281 " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() +
282 ") vs. (" + s.ToString() + ")");
283 }
284 if (s.ok()) {
285 if (exp_v != v) {
286 return Status::Corruption("The snapshot gave inconsistent values: (" +
287 exp_v.ToString() + ") vs. (" + v.ToString() +
288 ")");
289 }
290 }
291 if (snap_state.key_vec != nullptr) {
292 // When `prefix_extractor` is set, seeking to beginning and scanning
293 // across prefixes are only supported with `total_order_seek` set.
294 ropt.total_order_seek = true;
295 std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
296 std::unique_ptr<std::vector<bool>> tmp_bitvec(
297 new std::vector<bool>(FLAGS_max_key));
298 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
299 uint64_t key_val;
300 if (GetIntVal(iterator->key().ToString(), &key_val)) {
301 (*tmp_bitvec.get())[key_val] = true;
302 }
303 }
304 if (!std::equal(snap_state.key_vec->begin(), snap_state.key_vec->end(),
305 tmp_bitvec.get()->begin())) {
306 return Status::Corruption("Found inconsistent keys at this snapshot");
307 }
308 }
309 return Status::OK();
310 }
311
VerificationAbort(SharedState * shared,std::string msg,Status s) const312 void StressTest::VerificationAbort(SharedState* shared, std::string msg,
313 Status s) const {
314 fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(),
315 s.ToString().c_str());
316 shared->SetVerificationFailure();
317 }
318
VerificationAbort(SharedState * shared,std::string msg,int cf,int64_t key) const319 void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
320 int64_t key) const {
321 fprintf(stderr,
322 "Verification failed for column family %d key %" PRIi64 ": %s\n", cf,
323 key, msg.c_str());
324 shared->SetVerificationFailure();
325 }
326
PrintStatistics()327 void StressTest::PrintStatistics() {
328 if (dbstats) {
329 fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
330 }
331 if (dbstats_secondaries) {
332 fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
333 dbstats_secondaries->ToString().c_str());
334 }
335 }
336
337 // Currently PreloadDb has to be single-threaded.
PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,SharedState * shared)338 void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
339 SharedState* shared) {
340 WriteOptions write_opts;
341 write_opts.disableWAL = FLAGS_disable_wal;
342 if (FLAGS_sync) {
343 write_opts.sync = true;
344 }
345 char value[100];
346 int cf_idx = 0;
347 Status s;
348 for (auto cfh : column_families_) {
349 for (int64_t k = 0; k != number_of_keys; ++k) {
350 std::string key_str = Key(k);
351 Slice key = key_str;
352 size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
353 Slice v(value, sz);
354 shared->Put(cf_idx, k, 0, true /* pending */);
355
356 if (FLAGS_use_merge) {
357 if (!FLAGS_use_txn) {
358 s = db_->Merge(write_opts, cfh, key, v);
359 } else {
360 #ifndef ROCKSDB_LITE
361 Transaction* txn;
362 s = NewTxn(write_opts, &txn);
363 if (s.ok()) {
364 s = txn->Merge(cfh, key, v);
365 if (s.ok()) {
366 s = CommitTxn(txn);
367 }
368 }
369 #endif
370 }
371 } else {
372 if (!FLAGS_use_txn) {
373 s = db_->Put(write_opts, cfh, key, v);
374 } else {
375 #ifndef ROCKSDB_LITE
376 Transaction* txn;
377 s = NewTxn(write_opts, &txn);
378 if (s.ok()) {
379 s = txn->Put(cfh, key, v);
380 if (s.ok()) {
381 s = CommitTxn(txn);
382 }
383 }
384 #endif
385 }
386 }
387
388 shared->Put(cf_idx, k, 0, false /* pending */);
389 if (!s.ok()) {
390 break;
391 }
392 }
393 if (!s.ok()) {
394 break;
395 }
396 ++cf_idx;
397 }
398 if (s.ok()) {
399 s = db_->Flush(FlushOptions(), column_families_);
400 }
401 if (s.ok()) {
402 for (auto cf : column_families_) {
403 delete cf;
404 }
405 column_families_.clear();
406 delete db_;
407 db_ = nullptr;
408 #ifndef ROCKSDB_LITE
409 txn_db_ = nullptr;
410 #endif
411
412 db_preload_finished_.store(true);
413 auto now = db_stress_env->NowMicros();
414 fprintf(stdout, "%s Reopening database in read-only\n",
415 db_stress_env->TimeToString(now / 1000000).c_str());
416 // Reopen as read-only, can ignore all options related to updates
417 Open();
418 } else {
419 fprintf(stderr, "Failed to preload db");
420 exit(1);
421 }
422 }
423
SetOptions(ThreadState * thread)424 Status StressTest::SetOptions(ThreadState* thread) {
425 assert(FLAGS_set_options_one_in > 0);
426 std::unordered_map<std::string, std::string> opts;
427 std::string name =
428 options_index_[thread->rand.Next() % options_index_.size()];
429 int value_idx = thread->rand.Next() % options_table_[name].size();
430 if (name == "soft_rate_limit" || name == "hard_rate_limit") {
431 opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx];
432 opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx];
433 } else if (name == "level0_file_num_compaction_trigger" ||
434 name == "level0_slowdown_writes_trigger" ||
435 name == "level0_stop_writes_trigger") {
436 opts["level0_file_num_compaction_trigger"] =
437 options_table_["level0_file_num_compaction_trigger"][value_idx];
438 opts["level0_slowdown_writes_trigger"] =
439 options_table_["level0_slowdown_writes_trigger"][value_idx];
440 opts["level0_stop_writes_trigger"] =
441 options_table_["level0_stop_writes_trigger"][value_idx];
442 } else {
443 opts[name] = options_table_[name][value_idx];
444 }
445
446 int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
447 auto cfh = column_families_[rand_cf_idx];
448 return db_->SetOptions(cfh, opts);
449 }
450
451 #ifndef ROCKSDB_LITE
NewTxn(WriteOptions & write_opts,Transaction ** txn)452 Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
453 if (!FLAGS_use_txn) {
454 return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
455 }
456 static std::atomic<uint64_t> txn_id = {0};
457 TransactionOptions txn_options;
458 *txn = txn_db_->BeginTransaction(write_opts, txn_options);
459 auto istr = std::to_string(txn_id.fetch_add(1));
460 Status s = (*txn)->SetName("xid" + istr);
461 return s;
462 }
463
CommitTxn(Transaction * txn)464 Status StressTest::CommitTxn(Transaction* txn) {
465 if (!FLAGS_use_txn) {
466 return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
467 }
468 Status s = txn->Prepare();
469 if (s.ok()) {
470 s = txn->Commit();
471 }
472 delete txn;
473 return s;
474 }
475
RollbackTxn(Transaction * txn)476 Status StressTest::RollbackTxn(Transaction* txn) {
477 if (!FLAGS_use_txn) {
478 return Status::InvalidArgument(
479 "RollbackTxn when FLAGS_use_txn is not"
480 " set");
481 }
482 Status s = txn->Rollback();
483 delete txn;
484 return s;
485 }
486 #endif
487
OperateDb(ThreadState * thread)488 void StressTest::OperateDb(ThreadState* thread) {
489 ReadOptions read_opts(FLAGS_verify_checksum, true);
490 WriteOptions write_opts;
491 auto shared = thread->shared;
492 char value[100];
493 std::string from_db;
494 if (FLAGS_sync) {
495 write_opts.sync = true;
496 }
497 write_opts.disableWAL = FLAGS_disable_wal;
498 const int prefixBound = static_cast<int>(FLAGS_readpercent) +
499 static_cast<int>(FLAGS_prefixpercent);
500 const int writeBound = prefixBound + static_cast<int>(FLAGS_writepercent);
501 const int delBound = writeBound + static_cast<int>(FLAGS_delpercent);
502 const int delRangeBound = delBound + static_cast<int>(FLAGS_delrangepercent);
503 const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
504
505 thread->stats.Start();
506 for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
507 if (thread->shared->HasVerificationFailedYet() ||
508 thread->shared->ShouldStopTest()) {
509 break;
510 }
511 if (open_cnt != 0) {
512 thread->stats.FinishedSingleOp();
513 MutexLock l(thread->shared->GetMutex());
514 while (!thread->snapshot_queue.empty()) {
515 db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
516 delete thread->snapshot_queue.front().second.key_vec;
517 thread->snapshot_queue.pop();
518 }
519 thread->shared->IncVotedReopen();
520 if (thread->shared->AllVotedReopen()) {
521 thread->shared->GetStressTest()->Reopen(thread);
522 thread->shared->GetCondVar()->SignalAll();
523 } else {
524 thread->shared->GetCondVar()->Wait();
525 }
526 // Commenting this out as we don't want to reset stats on each open.
527 // thread->stats.Start();
528 }
529
530 for (uint64_t i = 0; i < ops_per_open; i++) {
531 if (thread->shared->HasVerificationFailedYet()) {
532 break;
533 }
534
535 // Change Options
536 if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) {
537 SetOptions(thread);
538 }
539
540 if (thread->rand.OneInOpt(FLAGS_set_in_place_one_in)) {
541 options_.inplace_update_support ^= options_.inplace_update_support;
542 }
543
544 if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 &&
545 thread->rand.OneIn(FLAGS_verify_db_one_in)) {
546 ContinuouslyVerifyDb(thread);
547 if (thread->shared->ShouldStopTest()) {
548 break;
549 }
550 }
551
552 MaybeClearOneColumnFamily(thread);
553
554 if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
555 Status s = db_->SyncWAL();
556 if (!s.ok() && !s.IsNotSupported()) {
557 fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str());
558 }
559 }
560
561 int rand_column_family = thread->rand.Next() % FLAGS_column_families;
562 ColumnFamilyHandle* column_family = column_families_[rand_column_family];
563
564 if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
565 TestCompactFiles(thread, column_family);
566 }
567
568 int64_t rand_key = GenerateOneKey(thread, i);
569 std::string keystr = Key(rand_key);
570 Slice key = keystr;
571 std::unique_ptr<MutexLock> lock;
572 if (ShouldAcquireMutexOnKey()) {
573 lock.reset(new MutexLock(
574 shared->GetMutexForKey(rand_column_family, rand_key)));
575 }
576
577 if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
578 TestCompactRange(thread, rand_key, key, column_family);
579 if (thread->shared->HasVerificationFailedYet()) {
580 break;
581 }
582 }
583
584 std::vector<int> rand_column_families =
585 GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
586
587 if (thread->rand.OneInOpt(FLAGS_flush_one_in)) {
588 Status status = TestFlush(rand_column_families);
589 if (!status.ok()) {
590 fprintf(stdout, "Unable to perform Flush(): %s\n",
591 status.ToString().c_str());
592 }
593 }
594
595 #ifndef ROCKSDB_LITE
596 // Verify GetLiveFiles with a 1 in N chance.
597 if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in)) {
598 Status status = VerifyGetLiveFiles();
599 if (!status.ok()) {
600 VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status);
601 }
602 }
603
604 // Verify GetSortedWalFiles with a 1 in N chance.
605 if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) {
606 Status status = VerifyGetSortedWalFiles();
607 if (!status.ok()) {
608 VerificationAbort(shared, "VerifyGetSortedWalFiles status not OK",
609 status);
610 }
611 }
612
613 // Verify GetCurrentWalFile with a 1 in N chance.
614 if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) {
615 Status status = VerifyGetCurrentWalFile();
616 if (!status.ok()) {
617 VerificationAbort(shared, "VerifyGetCurrentWalFile status not OK",
618 status);
619 }
620 }
621 #endif // !ROCKSDB_LITE
622
623 if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) {
624 Status status = TestPauseBackground(thread);
625 if (!status.ok()) {
626 VerificationAbort(
627 shared, "Pause/ContinueBackgroundWork status not OK", status);
628 }
629 }
630
631 #ifndef ROCKSDB_LITE
632 if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) {
633 Status status = db_->VerifyChecksum();
634 if (!status.ok()) {
635 VerificationAbort(shared, "VerifyChecksum status not OK", status);
636 }
637 }
638 #endif
639
640 std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
641
642 if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) {
643 TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
644 }
645
646 if (thread->rand.OneInOpt(FLAGS_backup_one_in)) {
647 Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
648 if (!s.ok()) {
649 VerificationAbort(shared, "Backup/restore gave inconsistent state",
650 s);
651 }
652 }
653
654 if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) {
655 Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
656 if (!s.ok()) {
657 VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
658 }
659 }
660
661 #ifndef ROCKSDB_LITE
662 if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) {
663 Status s =
664 TestApproximateSize(thread, i, rand_column_families, rand_keys);
665 if (!s.ok()) {
666 VerificationAbort(shared, "ApproximateSize Failed", s);
667 }
668 }
669 #endif // !ROCKSDB_LITE
670 if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) {
671 TestAcquireSnapshot(thread, rand_column_family, keystr, i);
672 }
673
674 /*always*/ {
675 Status s = MaybeReleaseSnapshots(thread, i);
676 if (!s.ok()) {
677 VerificationAbort(shared, "Snapshot gave inconsistent state", s);
678 }
679 }
680
681 int prob_op = thread->rand.Uniform(100);
682 // Reset this in case we pick something other than a read op. We don't
683 // want to use a stale value when deciding at the beginning of the loop
684 // whether to vote to reopen
685 if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
686 assert(0 <= prob_op);
687 // OPERATION read
688 if (FLAGS_use_multiget) {
689 // Leave room for one more iteration of the loop with a single key
690 // batch. This is to ensure that each thread does exactly the same
691 // number of ops
692 int multiget_batch_size = static_cast<int>(
693 std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
694 FLAGS_ops_per_thread - i - 1));
695 // If its the last iteration, ensure that multiget_batch_size is 1
696 multiget_batch_size = std::max(multiget_batch_size, 1);
697 rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
698 TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
699 i += multiget_batch_size - 1;
700 } else {
701 TestGet(thread, read_opts, rand_column_families, rand_keys);
702 }
703 } else if (prob_op < prefixBound) {
704 assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
705 // OPERATION prefix scan
706 // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
707 // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
708 // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
709 // prefix
710 TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
711 } else if (prob_op < writeBound) {
712 assert(prefixBound <= prob_op);
713 // OPERATION write
714 TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
715 value, lock);
716 } else if (prob_op < delBound) {
717 assert(writeBound <= prob_op);
718 // OPERATION delete
719 TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
720 } else if (prob_op < delRangeBound) {
721 assert(delBound <= prob_op);
722 // OPERATION delete range
723 TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
724 lock);
725 } else {
726 assert(delRangeBound <= prob_op);
727 // OPERATION iterate
728 int num_seeks = static_cast<int>(
729 std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
730 FLAGS_ops_per_thread - i - 1));
731 rand_keys = GenerateNKeys(thread, num_seeks, i);
732 i += num_seeks - 1;
733 TestIterate(thread, read_opts, rand_column_families, rand_keys);
734 }
735 thread->stats.FinishedSingleOp();
736 #ifndef ROCKSDB_LITE
737 uint32_t tid = thread->tid;
738 assert(secondaries_.empty() ||
739 static_cast<size_t>(tid) < secondaries_.size());
740 if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) {
741 Status s = secondaries_[tid]->TryCatchUpWithPrimary();
742 if (!s.ok()) {
743 VerificationAbort(shared, "Secondary instance failed to catch up", s);
744 break;
745 }
746 }
747 #endif
748 }
749 }
750 while (!thread->snapshot_queue.empty()) {
751 db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
752 delete thread->snapshot_queue.front().second.key_vec;
753 thread->snapshot_queue.pop();
754 }
755
756 thread->stats.Stop();
757 }
758
759 #ifndef ROCKSDB_LITE
760 // Generated a list of keys that close to boundaries of SST keys.
761 // If there isn't any SST file in the DB, return empty list.
GetWhiteBoxKeys(ThreadState * thread,DB * db,ColumnFamilyHandle * cfh,size_t num_keys)762 std::vector<std::string> StressTest::GetWhiteBoxKeys(ThreadState* thread,
763 DB* db,
764 ColumnFamilyHandle* cfh,
765 size_t num_keys) {
766 ColumnFamilyMetaData cfmd;
767 db->GetColumnFamilyMetaData(cfh, &cfmd);
768 std::vector<std::string> boundaries;
769 for (const LevelMetaData& lmd : cfmd.levels) {
770 for (const SstFileMetaData& sfmd : lmd.files) {
771 boundaries.push_back(sfmd.smallestkey);
772 boundaries.push_back(sfmd.largestkey);
773 }
774 }
775 if (boundaries.empty()) {
776 return {};
777 }
778
779 std::vector<std::string> ret;
780 for (size_t j = 0; j < num_keys; j++) {
781 std::string k =
782 boundaries[thread->rand.Uniform(static_cast<int>(boundaries.size()))];
783 if (thread->rand.OneIn(3)) {
784 // Reduce one byte from the string
785 for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
786 uint8_t cur = k[i];
787 if (cur > 0) {
788 k[i] = static_cast<char>(cur - 1);
789 break;
790 } else if (i > 0) {
791 k[i] = 0xFFu;
792 }
793 }
794 } else if (thread->rand.OneIn(2)) {
795 // Add one byte to the string
796 for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
797 uint8_t cur = k[i];
798 if (cur < 255) {
799 k[i] = static_cast<char>(cur + 1);
800 break;
801 } else if (i > 0) {
802 k[i] = 0x00;
803 }
804 }
805 }
806 ret.push_back(k);
807 }
808 return ret;
809 }
810 #endif // !ROCKSDB_LITE
811
812 // Given a key K, this creates an iterator which scans to K and then
813 // does a random sequence of Next/Prev operations.
TestIterate(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)814 Status StressTest::TestIterate(ThreadState* thread,
815 const ReadOptions& read_opts,
816 const std::vector<int>& rand_column_families,
817 const std::vector<int64_t>& rand_keys) {
818 Status s;
819 const Snapshot* snapshot = db_->GetSnapshot();
820 ReadOptions readoptionscopy = read_opts;
821 readoptionscopy.snapshot = snapshot;
822
823 bool expect_total_order = false;
824 if (thread->rand.OneIn(16)) {
825 // When prefix extractor is used, it's useful to cover total order seek.
826 readoptionscopy.total_order_seek = true;
827 expect_total_order = true;
828 } else if (thread->rand.OneIn(4)) {
829 readoptionscopy.total_order_seek = false;
830 readoptionscopy.auto_prefix_mode = true;
831 expect_total_order = true;
832 } else if (options_.prefix_extractor.get() == nullptr) {
833 expect_total_order = true;
834 }
835
836 std::string upper_bound_str;
837 Slice upper_bound;
838 if (thread->rand.OneIn(16)) {
839 // in 1/16 chance, set a iterator upper bound
840 int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
841 upper_bound_str = Key(rand_upper_key);
842 upper_bound = Slice(upper_bound_str);
843 // uppder_bound can be smaller than seek key, but the query itself
844 // should not crash either.
845 readoptionscopy.iterate_upper_bound = &upper_bound;
846 }
847 std::string lower_bound_str;
848 Slice lower_bound;
849 if (thread->rand.OneIn(16)) {
850 // in 1/16 chance, enable iterator lower bound
851 int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
852 lower_bound_str = Key(rand_lower_key);
853 lower_bound = Slice(lower_bound_str);
854 // uppder_bound can be smaller than seek key, but the query itself
855 // should not crash either.
856 readoptionscopy.iterate_lower_bound = &lower_bound;
857 }
858
859 auto cfh = column_families_[rand_column_families[0]];
860 std::unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, cfh));
861
862 std::vector<std::string> key_str;
863 if (thread->rand.OneIn(16)) {
864 // Generate keys close to lower or upper bound of SST files.
865 key_str = GetWhiteBoxKeys(thread, db_, cfh, rand_keys.size());
866 }
867 if (key_str.empty()) {
868 // If key string is not geneerated using white block keys,
869 // Use randomized key passe in.
870 for (int64_t rkey : rand_keys) {
871 key_str.push_back(Key(rkey));
872 }
873 }
874
875 std::string op_logs;
876 const size_t kOpLogsLimit = 10000;
877
878 for (const std::string& skey : key_str) {
879 if (op_logs.size() > kOpLogsLimit) {
880 // Shouldn't take too much memory for the history log. Clear it.
881 op_logs = "(cleared...)\n";
882 }
883
884 Slice key = skey;
885
886 if (readoptionscopy.iterate_upper_bound != nullptr &&
887 thread->rand.OneIn(2)) {
888 // 1/2 chance, change the upper bound.
889 // It is possible that it is changed without first use, but there is no
890 // problem with that.
891 int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
892 upper_bound_str = Key(rand_upper_key);
893 upper_bound = Slice(upper_bound_str);
894 } else if (readoptionscopy.iterate_lower_bound != nullptr &&
895 thread->rand.OneIn(4)) {
896 // 1/4 chance, change the lower bound.
897 // It is possible that it is changed without first use, but there is no
898 // problem with that.
899 int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
900 lower_bound_str = Key(rand_lower_key);
901 lower_bound = Slice(lower_bound_str);
902 }
903
904 // Record some options to op_logs;
905 op_logs += "total_order_seek: ";
906 op_logs += (readoptionscopy.total_order_seek ? "1 " : "0 ");
907 op_logs += "auto_prefix_mode: ";
908 op_logs += (readoptionscopy.auto_prefix_mode ? "1 " : "0 ");
909 if (readoptionscopy.iterate_upper_bound != nullptr) {
910 op_logs += "ub: " + upper_bound.ToString(true) + " ";
911 }
912 if (readoptionscopy.iterate_lower_bound != nullptr) {
913 op_logs += "lb: " + lower_bound.ToString(true) + " ";
914 }
915
916 // Set up an iterator and does the same without bounds and with total
917 // order seek and compare the results. This is to identify bugs related
918 // to bounds, prefix extractor or reseeking. Sometimes we are comparing
919 // iterators with the same set-up, and it doesn't hurt to check them
920 // to be equal.
921 ReadOptions cmp_ro;
922 cmp_ro.snapshot = snapshot;
923 cmp_ro.total_order_seek = true;
924 ColumnFamilyHandle* cmp_cfh =
925 GetControlCfh(thread, rand_column_families[0]);
926 std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
927 bool diverged = false;
928
929 bool support_seek_first_or_last = expect_total_order;
930
931 LastIterateOp last_op;
932 if (support_seek_first_or_last && thread->rand.OneIn(100)) {
933 iter->SeekToFirst();
934 cmp_iter->SeekToFirst();
935 last_op = kLastOpSeekToFirst;
936 op_logs += "STF ";
937 } else if (support_seek_first_or_last && thread->rand.OneIn(100)) {
938 iter->SeekToLast();
939 cmp_iter->SeekToLast();
940 last_op = kLastOpSeekToLast;
941 op_logs += "STL ";
942 } else if (thread->rand.OneIn(8)) {
943 iter->SeekForPrev(key);
944 cmp_iter->SeekForPrev(key);
945 last_op = kLastOpSeekForPrev;
946 op_logs += "SFP " + key.ToString(true) + " ";
947 } else {
948 iter->Seek(key);
949 cmp_iter->Seek(key);
950 last_op = kLastOpSeek;
951 op_logs += "S " + key.ToString(true) + " ";
952 }
953 VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(), cmp_iter.get(),
954 last_op, key, op_logs, &diverged);
955
956 bool no_reverse =
957 (FLAGS_memtablerep == "prefix_hash" && !expect_total_order);
958 for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
959 if (no_reverse || thread->rand.OneIn(2)) {
960 iter->Next();
961 if (!diverged) {
962 assert(cmp_iter->Valid());
963 cmp_iter->Next();
964 }
965 op_logs += "N";
966 } else {
967 iter->Prev();
968 if (!diverged) {
969 assert(cmp_iter->Valid());
970 cmp_iter->Prev();
971 }
972 op_logs += "P";
973 }
974 last_op = kLastOpNextOrPrev;
975 VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(),
976 cmp_iter.get(), last_op, key, op_logs, &diverged);
977 }
978
979 if (s.ok()) {
980 thread->stats.AddIterations(1);
981 } else {
982 fprintf(stderr, "TestIterate error: %s\n", s.ToString().c_str());
983 thread->stats.AddErrors(1);
984 break;
985 }
986
987 op_logs += "; ";
988 }
989
990 db_->ReleaseSnapshot(snapshot);
991
992 return s;
993 }
994
995 #ifndef ROCKSDB_LITE
996 // Test the return status of GetLiveFiles.
VerifyGetLiveFiles() const997 Status StressTest::VerifyGetLiveFiles() const {
998 std::vector<std::string> live_file;
999 uint64_t manifest_size = 0;
1000 return db_->GetLiveFiles(live_file, &manifest_size);
1001 }
1002
1003 // Test the return status of GetSortedWalFiles.
VerifyGetSortedWalFiles() const1004 Status StressTest::VerifyGetSortedWalFiles() const {
1005 VectorLogPtr log_ptr;
1006 return db_->GetSortedWalFiles(log_ptr);
1007 }
1008
1009 // Test the return status of GetCurrentWalFile.
VerifyGetCurrentWalFile() const1010 Status StressTest::VerifyGetCurrentWalFile() const {
1011 std::unique_ptr<LogFile> cur_wal_file;
1012 return db_->GetCurrentWalFile(&cur_wal_file);
1013 }
1014 #endif // !ROCKSDB_LITE
1015
1016 // Compare the two iterator, iter and cmp_iter are in the same position,
1017 // unless iter might be made invalidate or undefined because of
1018 // upper or lower bounds, or prefix extractor.
1019 // Will flag failure if the verification fails.
1020 // diverged = true if the two iterator is already diverged.
1021 // True if verification passed, false if not.
VerifyIterator(ThreadState * thread,ColumnFamilyHandle * cmp_cfh,const ReadOptions & ro,Iterator * iter,Iterator * cmp_iter,LastIterateOp op,const Slice & seek_key,const std::string & op_logs,bool * diverged)1022 void StressTest::VerifyIterator(ThreadState* thread,
1023 ColumnFamilyHandle* cmp_cfh,
1024 const ReadOptions& ro, Iterator* iter,
1025 Iterator* cmp_iter, LastIterateOp op,
1026 const Slice& seek_key,
1027 const std::string& op_logs, bool* diverged) {
1028 if (*diverged) {
1029 return;
1030 }
1031
1032 if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) {
1033 // SeekToFirst() with lower bound is not well defined.
1034 *diverged = true;
1035 return;
1036 } else if (op == kLastOpSeekToLast && ro.iterate_upper_bound != nullptr) {
1037 // SeekToLast() with higher bound is not well defined.
1038 *diverged = true;
1039 return;
1040 } else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr &&
1041 (options_.comparator->Compare(*ro.iterate_lower_bound, seek_key) >=
1042 0 ||
1043 (ro.iterate_upper_bound != nullptr &&
1044 options_.comparator->Compare(*ro.iterate_lower_bound,
1045 *ro.iterate_upper_bound) >= 0))) {
1046 // Lower bound behavior is not well defined if it is larger than
1047 // seek key or upper bound. Disable the check for now.
1048 *diverged = true;
1049 return;
1050 } else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr &&
1051 (options_.comparator->Compare(*ro.iterate_upper_bound, seek_key) <=
1052 0 ||
1053 (ro.iterate_lower_bound != nullptr &&
1054 options_.comparator->Compare(*ro.iterate_lower_bound,
1055 *ro.iterate_upper_bound) >= 0))) {
1056 // Uppder bound behavior is not well defined if it is smaller than
1057 // seek key or lower bound. Disable the check for now.
1058 *diverged = true;
1059 return;
1060 }
1061
1062 const SliceTransform* pe = (ro.total_order_seek || ro.auto_prefix_mode)
1063 ? nullptr
1064 : options_.prefix_extractor.get();
1065 const Comparator* cmp = options_.comparator;
1066
1067 if (iter->Valid() && !cmp_iter->Valid()) {
1068 if (pe != nullptr) {
1069 if (!pe->InDomain(seek_key)) {
1070 // Prefix seek a non-in-domain key is undefined. Skip checking for
1071 // this scenario.
1072 *diverged = true;
1073 return;
1074 } else if (!pe->InDomain(iter->key())) {
1075 // out of range is iterator key is not in domain anymore.
1076 *diverged = true;
1077 return;
1078 } else if (pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1079 *diverged = true;
1080 return;
1081 }
1082 }
1083 fprintf(stderr,
1084 "Control interator is invalid but iterator has key %s "
1085 "%s\n",
1086 iter->key().ToString(true).c_str(), op_logs.c_str());
1087
1088 *diverged = true;
1089 } else if (cmp_iter->Valid()) {
1090 // Iterator is not valid. It can be legimate if it has already been
1091 // out of upper or lower bound, or filtered out by prefix iterator.
1092 const Slice& total_order_key = cmp_iter->key();
1093
1094 if (pe != nullptr) {
1095 if (!pe->InDomain(seek_key)) {
1096 // Prefix seek a non-in-domain key is undefined. Skip checking for
1097 // this scenario.
1098 *diverged = true;
1099 return;
1100 }
1101
1102 if (!pe->InDomain(total_order_key) ||
1103 pe->Transform(total_order_key) != pe->Transform(seek_key)) {
1104 // If the prefix is exhausted, the only thing needs to check
1105 // is the iterator isn't return a position in prefix.
1106 // Either way, checking can stop from here.
1107 *diverged = true;
1108 if (!iter->Valid() || !pe->InDomain(iter->key()) ||
1109 pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1110 return;
1111 }
1112 fprintf(stderr,
1113 "Iterator stays in prefix but contol doesn't"
1114 " iterator key %s control iterator key %s %s\n",
1115 iter->key().ToString(true).c_str(),
1116 cmp_iter->key().ToString(true).c_str(), op_logs.c_str());
1117 }
1118 }
1119 // Check upper or lower bounds.
1120 if (!*diverged) {
1121 if ((iter->Valid() && iter->key() != cmp_iter->key()) ||
1122 (!iter->Valid() &&
1123 (ro.iterate_upper_bound == nullptr ||
1124 cmp->Compare(total_order_key, *ro.iterate_upper_bound) < 0) &&
1125 (ro.iterate_lower_bound == nullptr ||
1126 cmp->Compare(total_order_key, *ro.iterate_lower_bound) > 0))) {
1127 fprintf(stderr,
1128 "Iterator diverged from control iterator which"
1129 " has value %s %s\n",
1130 total_order_key.ToString(true).c_str(), op_logs.c_str());
1131 if (iter->Valid()) {
1132 fprintf(stderr, "iterator has value %s\n",
1133 iter->key().ToString(true).c_str());
1134 } else {
1135 fprintf(stderr, "iterator is not valid\n");
1136 }
1137 *diverged = true;
1138 }
1139 }
1140 }
1141 if (*diverged) {
1142 fprintf(stderr, "Control CF %s\n", cmp_cfh->GetName().c_str());
1143 thread->stats.AddErrors(1);
1144 // Fail fast to preserve the DB state.
1145 thread->shared->SetVerificationFailure();
1146 }
1147 }
1148
1149 #ifdef ROCKSDB_LITE
TestBackupRestore(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1150 Status StressTest::TestBackupRestore(
1151 ThreadState* /* thread */,
1152 const std::vector<int>& /* rand_column_families */,
1153 const std::vector<int64_t>& /* rand_keys */) {
1154 assert(false);
1155 fprintf(stderr,
1156 "RocksDB lite does not support "
1157 "TestBackupRestore\n");
1158 std::terminate();
1159 }
1160
TestCheckpoint(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1161 Status StressTest::TestCheckpoint(
1162 ThreadState* /* thread */,
1163 const std::vector<int>& /* rand_column_families */,
1164 const std::vector<int64_t>& /* rand_keys */) {
1165 assert(false);
1166 fprintf(stderr,
1167 "RocksDB lite does not support "
1168 "TestCheckpoint\n");
1169 std::terminate();
1170 }
1171
TestCompactFiles(ThreadState *,ColumnFamilyHandle *)1172 void StressTest::TestCompactFiles(ThreadState* /* thread */,
1173 ColumnFamilyHandle* /* column_family */) {
1174 assert(false);
1175 fprintf(stderr,
1176 "RocksDB lite does not support "
1177 "CompactFiles\n");
1178 std::terminate();
1179 }
1180 #else // ROCKSDB_LITE
TestBackupRestore(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1181 Status StressTest::TestBackupRestore(
1182 ThreadState* thread, const std::vector<int>& rand_column_families,
1183 const std::vector<int64_t>& rand_keys) {
1184 // Note the column families chosen by `rand_column_families` cannot be
1185 // dropped while the locks for `rand_keys` are held. So we should not have
1186 // to worry about accessing those column families throughout this function.
1187 assert(rand_column_families.size() == rand_keys.size());
1188 std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
1189 std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
1190 BackupableDBOptions backup_opts(backup_dir);
1191 BackupEngine* backup_engine = nullptr;
1192 Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1193 if (s.ok()) {
1194 s = backup_engine->CreateNewBackup(db_);
1195 }
1196 if (s.ok()) {
1197 delete backup_engine;
1198 backup_engine = nullptr;
1199 s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1200 }
1201 if (s.ok()) {
1202 s = backup_engine->RestoreDBFromLatestBackup(restore_dir /* db_dir */,
1203 restore_dir /* wal_dir */);
1204 }
1205 if (s.ok()) {
1206 s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */);
1207 }
1208 DB* restored_db = nullptr;
1209 std::vector<ColumnFamilyHandle*> restored_cf_handles;
1210 if (s.ok()) {
1211 Options restore_options(options_);
1212 restore_options.listeners.clear();
1213 std::vector<ColumnFamilyDescriptor> cf_descriptors;
1214 // TODO(ajkr): `column_family_names_` is not safe to access here when
1215 // `clear_column_family_one_in != 0`. But we can't easily switch to
1216 // `ListColumnFamilies` to get names because it won't necessarily give
1217 // the same order as `column_family_names_`.
1218 assert(FLAGS_clear_column_family_one_in == 0);
1219 for (auto name : column_family_names_) {
1220 cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
1221 }
1222 s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
1223 &restored_cf_handles, &restored_db);
1224 }
1225 // for simplicity, currently only verifies existence/non-existence of a few
1226 // keys
1227 for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
1228 std::string key_str = Key(rand_keys[i]);
1229 Slice key = key_str;
1230 std::string restored_value;
1231 Status get_status = restored_db->Get(
1232 ReadOptions(), restored_cf_handles[rand_column_families[i]], key,
1233 &restored_value);
1234 bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[i]);
1235 if (get_status.ok()) {
1236 if (!exists) {
1237 s = Status::Corruption("key exists in restore but not in original db");
1238 }
1239 } else if (get_status.IsNotFound()) {
1240 if (exists) {
1241 s = Status::Corruption("key exists in original db but not in restore");
1242 }
1243 } else {
1244 s = get_status;
1245 }
1246 }
1247 if (backup_engine != nullptr) {
1248 delete backup_engine;
1249 backup_engine = nullptr;
1250 }
1251 if (restored_db != nullptr) {
1252 for (auto* cf_handle : restored_cf_handles) {
1253 restored_db->DestroyColumnFamilyHandle(cf_handle);
1254 }
1255 delete restored_db;
1256 restored_db = nullptr;
1257 }
1258 if (!s.ok()) {
1259 fprintf(stderr, "A backup/restore operation failed with: %s\n",
1260 s.ToString().c_str());
1261 }
1262 return s;
1263 }
1264
1265 #ifndef ROCKSDB_LITE
TestApproximateSize(ThreadState * thread,uint64_t iteration,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1266 Status StressTest::TestApproximateSize(
1267 ThreadState* thread, uint64_t iteration,
1268 const std::vector<int>& rand_column_families,
1269 const std::vector<int64_t>& rand_keys) {
1270 // rand_keys likely only has one key. Just use the first one.
1271 assert(!rand_keys.empty());
1272 assert(!rand_column_families.empty());
1273 int64_t key1 = rand_keys[0];
1274 int64_t key2;
1275 if (thread->rand.OneIn(2)) {
1276 // Two totally random keys. This tends to cover large ranges.
1277 key2 = GenerateOneKey(thread, iteration);
1278 if (key2 < key1) {
1279 std::swap(key1, key2);
1280 }
1281 } else {
1282 // Unless users pass a very large FLAGS_max_key, it we should not worry
1283 // about overflow. It is for testing, so we skip the overflow checking
1284 // for simplicity.
1285 key2 = key1 + static_cast<int64_t>(thread->rand.Uniform(1000));
1286 }
1287 std::string key1_str = Key(key1);
1288 std::string key2_str = Key(key2);
1289 Range range{Slice(key1_str), Slice(key2_str)};
1290 SizeApproximationOptions sao;
1291 sao.include_memtabtles = thread->rand.OneIn(2);
1292 if (sao.include_memtabtles) {
1293 sao.include_files = thread->rand.OneIn(2);
1294 }
1295 if (thread->rand.OneIn(2)) {
1296 if (thread->rand.OneIn(2)) {
1297 sao.files_size_error_margin = 0.0;
1298 } else {
1299 sao.files_size_error_margin =
1300 static_cast<double>(thread->rand.Uniform(3));
1301 }
1302 }
1303 uint64_t result;
1304 return db_->GetApproximateSizes(
1305 sao, column_families_[rand_column_families[0]], &range, 1, &result);
1306 }
1307 #endif // ROCKSDB_LITE
1308
TestCheckpoint(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1309 Status StressTest::TestCheckpoint(ThreadState* thread,
1310 const std::vector<int>& rand_column_families,
1311 const std::vector<int64_t>& rand_keys) {
1312 // Note the column families chosen by `rand_column_families` cannot be
1313 // dropped while the locks for `rand_keys` are held. So we should not have
1314 // to worry about accessing those column families throughout this function.
1315 assert(rand_column_families.size() == rand_keys.size());
1316 std::string checkpoint_dir =
1317 FLAGS_db + "/.checkpoint" + ToString(thread->tid);
1318 Options tmp_opts(options_);
1319 tmp_opts.listeners.clear();
1320 tmp_opts.env = db_stress_env->target();
1321
1322 DestroyDB(checkpoint_dir, tmp_opts);
1323
1324 Checkpoint* checkpoint = nullptr;
1325 Status s = Checkpoint::Create(db_, &checkpoint);
1326 if (s.ok()) {
1327 s = checkpoint->CreateCheckpoint(checkpoint_dir);
1328 }
1329 std::vector<ColumnFamilyHandle*> cf_handles;
1330 DB* checkpoint_db = nullptr;
1331 if (s.ok()) {
1332 delete checkpoint;
1333 checkpoint = nullptr;
1334 Options options(options_);
1335 options.listeners.clear();
1336 std::vector<ColumnFamilyDescriptor> cf_descs;
1337 // TODO(ajkr): `column_family_names_` is not safe to access here when
1338 // `clear_column_family_one_in != 0`. But we can't easily switch to
1339 // `ListColumnFamilies` to get names because it won't necessarily give
1340 // the same order as `column_family_names_`.
1341 if (FLAGS_clear_column_family_one_in == 0) {
1342 for (const auto& name : column_family_names_) {
1343 cf_descs.emplace_back(name, ColumnFamilyOptions(options));
1344 }
1345 s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
1346 &cf_handles, &checkpoint_db);
1347 }
1348 }
1349 if (checkpoint_db != nullptr) {
1350 for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
1351 std::string key_str = Key(rand_keys[i]);
1352 Slice key = key_str;
1353 std::string value;
1354 Status get_status = checkpoint_db->Get(
1355 ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
1356 bool exists =
1357 thread->shared->Exists(rand_column_families[i], rand_keys[i]);
1358 if (get_status.ok()) {
1359 if (!exists) {
1360 s = Status::Corruption(
1361 "key exists in checkpoint but not in original db");
1362 }
1363 } else if (get_status.IsNotFound()) {
1364 if (exists) {
1365 s = Status::Corruption(
1366 "key exists in original db but not in checkpoint");
1367 }
1368 } else {
1369 s = get_status;
1370 }
1371 }
1372 for (auto cfh : cf_handles) {
1373 delete cfh;
1374 }
1375 cf_handles.clear();
1376 delete checkpoint_db;
1377 checkpoint_db = nullptr;
1378 }
1379
1380 DestroyDB(checkpoint_dir, tmp_opts);
1381
1382 if (!s.ok()) {
1383 fprintf(stderr, "A checkpoint operation failed with: %s\n",
1384 s.ToString().c_str());
1385 }
1386 return s;
1387 }
1388
TestCompactFiles(ThreadState * thread,ColumnFamilyHandle * column_family)1389 void StressTest::TestCompactFiles(ThreadState* thread,
1390 ColumnFamilyHandle* column_family) {
1391 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
1392 db_->GetColumnFamilyMetaData(column_family, &cf_meta_data);
1393
1394 // Randomly compact up to three consecutive files from a level
1395 const int kMaxRetry = 3;
1396 for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
1397 size_t random_level =
1398 thread->rand.Uniform(static_cast<int>(cf_meta_data.levels.size()));
1399
1400 const auto& files = cf_meta_data.levels[random_level].files;
1401 if (files.size() > 0) {
1402 size_t random_file_index =
1403 thread->rand.Uniform(static_cast<int>(files.size()));
1404 if (files[random_file_index].being_compacted) {
1405 // Retry as the selected file is currently being compacted
1406 continue;
1407 }
1408
1409 std::vector<std::string> input_files;
1410 input_files.push_back(files[random_file_index].name);
1411 if (random_file_index > 0 &&
1412 !files[random_file_index - 1].being_compacted) {
1413 input_files.push_back(files[random_file_index - 1].name);
1414 }
1415 if (random_file_index + 1 < files.size() &&
1416 !files[random_file_index + 1].being_compacted) {
1417 input_files.push_back(files[random_file_index + 1].name);
1418 }
1419
1420 size_t output_level =
1421 std::min(random_level + 1, cf_meta_data.levels.size() - 1);
1422 auto s = db_->CompactFiles(CompactionOptions(), column_family,
1423 input_files, static_cast<int>(output_level));
1424 if (!s.ok()) {
1425 fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
1426 s.ToString().c_str());
1427 thread->stats.AddNumCompactFilesFailed(1);
1428 } else {
1429 thread->stats.AddNumCompactFilesSucceed(1);
1430 }
1431 break;
1432 }
1433 }
1434 }
1435 #endif // ROCKSDB_LITE
1436
TestFlush(const std::vector<int> & rand_column_families)1437 Status StressTest::TestFlush(const std::vector<int>& rand_column_families) {
1438 FlushOptions flush_opts;
1439 std::vector<ColumnFamilyHandle*> cfhs;
1440 std::for_each(rand_column_families.begin(), rand_column_families.end(),
1441 [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
1442 return db_->Flush(flush_opts, cfhs);
1443 }
1444
TestPauseBackground(ThreadState * thread)1445 Status StressTest::TestPauseBackground(ThreadState* thread) {
1446 Status status = db_->PauseBackgroundWork();
1447 if (!status.ok()) {
1448 return status;
1449 }
1450 // To avoid stalling/deadlocking ourself in this thread, just
1451 // sleep here during pause and let other threads do db operations.
1452 // Sleep up to ~16 seconds (2**24 microseconds), but very skewed
1453 // toward short pause. (1 chance in 25 of pausing >= 1s;
1454 // 1 chance in 625 of pausing full 16s.)
1455 int pwr2_micros =
1456 std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
1457 db_stress_env->SleepForMicroseconds(1 << pwr2_micros);
1458 return db_->ContinueBackgroundWork();
1459 }
1460
TestAcquireSnapshot(ThreadState * thread,int rand_column_family,const std::string & keystr,uint64_t i)1461 void StressTest::TestAcquireSnapshot(ThreadState* thread,
1462 int rand_column_family,
1463 const std::string& keystr, uint64_t i) {
1464 Slice key = keystr;
1465 ColumnFamilyHandle* column_family = column_families_[rand_column_family];
1466 #ifndef ROCKSDB_LITE
1467 auto db_impl = reinterpret_cast<DBImpl*>(db_->GetRootDB());
1468 const bool ww_snapshot = thread->rand.OneIn(10);
1469 const Snapshot* snapshot =
1470 ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
1471 : db_->GetSnapshot();
1472 #else
1473 const Snapshot* snapshot = db_->GetSnapshot();
1474 #endif // !ROCKSDB_LITE
1475 ReadOptions ropt;
1476 ropt.snapshot = snapshot;
1477 std::string value_at;
1478 // When taking a snapshot, we also read a key from that snapshot. We
1479 // will later read the same key before releasing the snapshot and
1480 // verify that the results are the same.
1481 auto status_at = db_->Get(ropt, column_family, key, &value_at);
1482 std::vector<bool>* key_vec = nullptr;
1483
1484 if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
1485 key_vec = new std::vector<bool>(FLAGS_max_key);
1486 // When `prefix_extractor` is set, seeking to beginning and scanning
1487 // across prefixes are only supported with `total_order_seek` set.
1488 ropt.total_order_seek = true;
1489 std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
1490 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
1491 uint64_t key_val;
1492 if (GetIntVal(iterator->key().ToString(), &key_val)) {
1493 (*key_vec)[key_val] = true;
1494 }
1495 }
1496 }
1497
1498 ThreadState::SnapshotState snap_state = {
1499 snapshot, rand_column_family, column_family->GetName(),
1500 keystr, status_at, value_at,
1501 key_vec};
1502 uint64_t hold_for = FLAGS_snapshot_hold_ops;
1503 if (FLAGS_long_running_snapshots) {
1504 // Hold 10% of snapshots for 10x more
1505 if (thread->rand.OneIn(10)) {
1506 assert(hold_for < port::kMaxInt64 / 10);
1507 hold_for *= 10;
1508 // Hold 1% of snapshots for 100x more
1509 if (thread->rand.OneIn(10)) {
1510 assert(hold_for < port::kMaxInt64 / 10);
1511 hold_for *= 10;
1512 }
1513 }
1514 }
1515 uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for);
1516 thread->snapshot_queue.emplace(release_at, snap_state);
1517 }
1518
MaybeReleaseSnapshots(ThreadState * thread,uint64_t i)1519 Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) {
1520 while (!thread->snapshot_queue.empty() &&
1521 i >= thread->snapshot_queue.front().first) {
1522 auto snap_state = thread->snapshot_queue.front().second;
1523 assert(snap_state.snapshot);
1524 // Note: this is unsafe as the cf might be dropped concurrently. But
1525 // it is ok since unclean cf drop is cunnrently not supported by write
1526 // prepared transactions.
1527 Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
1528 db_->ReleaseSnapshot(snap_state.snapshot);
1529 delete snap_state.key_vec;
1530 thread->snapshot_queue.pop();
1531 if (!s.ok()) {
1532 return s;
1533 }
1534 }
1535 return Status::OK();
1536 }
1537
TestCompactRange(ThreadState * thread,int64_t rand_key,const Slice & start_key,ColumnFamilyHandle * column_family)1538 void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
1539 const Slice& start_key,
1540 ColumnFamilyHandle* column_family) {
1541 int64_t end_key_num;
1542 if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) {
1543 end_key_num = port::kMaxInt64;
1544 } else {
1545 end_key_num = FLAGS_compact_range_width + rand_key;
1546 }
1547 std::string end_key_buf = Key(end_key_num);
1548 Slice end_key(end_key_buf);
1549
1550 CompactRangeOptions cro;
1551 cro.exclusive_manual_compaction = static_cast<bool>(thread->rand.Next() % 2);
1552 cro.change_level = static_cast<bool>(thread->rand.Next() % 2);
1553 std::vector<BottommostLevelCompaction> bottom_level_styles = {
1554 BottommostLevelCompaction::kSkip,
1555 BottommostLevelCompaction::kIfHaveCompactionFilter,
1556 BottommostLevelCompaction::kForce,
1557 BottommostLevelCompaction::kForceOptimized};
1558 cro.bottommost_level_compaction =
1559 bottom_level_styles[thread->rand.Next() %
1560 static_cast<uint32_t>(bottom_level_styles.size())];
1561 cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2);
1562 cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4);
1563
1564 const Snapshot* pre_snapshot = nullptr;
1565 uint32_t pre_hash = 0;
1566 if (thread->rand.OneIn(2)) {
1567 // Do some validation by declaring a snapshot and compare the data before
1568 // and after the compaction
1569 pre_snapshot = db_->GetSnapshot();
1570 pre_hash =
1571 GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
1572 }
1573
1574 Status status = db_->CompactRange(cro, column_family, &start_key, &end_key);
1575
1576 if (!status.ok()) {
1577 fprintf(stdout, "Unable to perform CompactRange(): %s\n",
1578 status.ToString().c_str());
1579 }
1580
1581 if (pre_snapshot != nullptr) {
1582 uint32_t post_hash =
1583 GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
1584 if (pre_hash != post_hash) {
1585 fprintf(stderr,
1586 "Data hash different before and after compact range "
1587 "start_key %s end_key %s\n",
1588 start_key.ToString(true).c_str(), end_key.ToString(true).c_str());
1589 thread->stats.AddErrors(1);
1590 // Fail fast to preserve the DB state.
1591 thread->shared->SetVerificationFailure();
1592 }
1593 db_->ReleaseSnapshot(pre_snapshot);
1594 }
1595 }
1596
GetRangeHash(ThreadState * thread,const Snapshot * snapshot,ColumnFamilyHandle * column_family,const Slice & start_key,const Slice & end_key)1597 uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
1598 ColumnFamilyHandle* column_family,
1599 const Slice& start_key,
1600 const Slice& end_key) {
1601 const std::string kCrcCalculatorSepearator = ";";
1602 uint32_t crc = 0;
1603 ReadOptions ro;
1604 ro.snapshot = snapshot;
1605 ro.total_order_seek = true;
1606 std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family));
1607 for (it->Seek(start_key);
1608 it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0;
1609 it->Next()) {
1610 crc = crc32c::Extend(crc, it->key().data(), it->key().size());
1611 crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
1612 crc = crc32c::Extend(crc, it->value().data(), it->value().size());
1613 crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
1614 }
1615 if (!it->status().ok()) {
1616 fprintf(stderr, "Iterator non-OK when calculating range CRC: %s\n",
1617 it->status().ToString().c_str());
1618 thread->stats.AddErrors(1);
1619 // Fail fast to preserve the DB state.
1620 thread->shared->SetVerificationFailure();
1621 }
1622 return crc;
1623 }
1624
PrintEnv() const1625 void StressTest::PrintEnv() const {
1626 fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
1627 kMinorVersion);
1628 fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
1629 fprintf(stdout, "TransactionDB : %s\n",
1630 FLAGS_use_txn ? "true" : "false");
1631 #ifndef ROCKSDB_LITE
1632 fprintf(stdout, "BlobDB : %s\n",
1633 FLAGS_use_blob_db ? "true" : "false");
1634 #endif // !ROCKSDB_LITE
1635 fprintf(stdout, "Read only mode : %s\n",
1636 FLAGS_read_only ? "true" : "false");
1637 fprintf(stdout, "Atomic flush : %s\n",
1638 FLAGS_atomic_flush ? "true" : "false");
1639 fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
1640 if (!FLAGS_test_batches_snapshots) {
1641 fprintf(stdout, "Clear CFs one in : %d\n",
1642 FLAGS_clear_column_family_one_in);
1643 }
1644 fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
1645 fprintf(stdout, "Ops per thread : %lu\n",
1646 (unsigned long)FLAGS_ops_per_thread);
1647 std::string ttl_state("unused");
1648 if (FLAGS_ttl > 0) {
1649 ttl_state = NumberToString(FLAGS_ttl);
1650 }
1651 fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
1652 fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
1653 fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
1654 fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
1655 fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
1656 fprintf(stdout, "Delete range percentage : %d%%\n", FLAGS_delrangepercent);
1657 fprintf(stdout, "No overwrite percentage : %d%%\n",
1658 FLAGS_nooverwritepercent);
1659 fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
1660 fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
1661 FLAGS_db_write_buffer_size);
1662 fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
1663 fprintf(stdout, "Iterations : %lu\n",
1664 (unsigned long)FLAGS_num_iterations);
1665 fprintf(stdout, "Max key : %lu\n",
1666 (unsigned long)FLAGS_max_key);
1667 fprintf(stdout, "Ratio #ops/#keys : %f\n",
1668 (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
1669 fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen);
1670 fprintf(stdout, "Batches/snapshots : %d\n",
1671 FLAGS_test_batches_snapshots);
1672 fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update);
1673 fprintf(stdout, "Num keys per lock : %d\n",
1674 1 << FLAGS_log2_keys_per_lock);
1675 std::string compression = CompressionTypeToString(compression_type_e);
1676 fprintf(stdout, "Compression : %s\n", compression.c_str());
1677 std::string bottommost_compression =
1678 CompressionTypeToString(bottommost_compression_type_e);
1679 fprintf(stdout, "Bottommost Compression : %s\n",
1680 bottommost_compression.c_str());
1681 std::string checksum = ChecksumTypeToString(checksum_type_e);
1682 fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
1683 fprintf(stdout, "Bloom bits / key : %s\n",
1684 FormatDoubleParam(FLAGS_bloom_bits).c_str());
1685 fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
1686 FLAGS_subcompactions);
1687 fprintf(stdout, "Use MultiGet : %s\n",
1688 FLAGS_use_multiget ? "true" : "false");
1689
1690 const char* memtablerep = "";
1691 switch (FLAGS_rep_factory) {
1692 case kSkipList:
1693 memtablerep = "skip_list";
1694 break;
1695 case kHashSkipList:
1696 memtablerep = "prefix_hash";
1697 break;
1698 case kVectorRep:
1699 memtablerep = "vector";
1700 break;
1701 }
1702
1703 fprintf(stdout, "Memtablerep : %s\n", memtablerep);
1704
1705 fprintf(stdout, "Test kill odd : %d\n", rocksdb_kill_odds);
1706 if (!rocksdb_kill_prefix_blacklist.empty()) {
1707 fprintf(stdout, "Skipping kill points prefixes:\n");
1708 for (auto& p : rocksdb_kill_prefix_blacklist) {
1709 fprintf(stdout, " %s\n", p.c_str());
1710 }
1711 }
1712 fprintf(stdout, "Periodic Compaction Secs : %" PRIu64 "\n",
1713 FLAGS_periodic_compaction_seconds);
1714 fprintf(stdout, "Compaction TTL : %" PRIu64 "\n",
1715 FLAGS_compaction_ttl);
1716 fprintf(stdout, "Background Purge : %d\n",
1717 static_cast<int>(FLAGS_avoid_unnecessary_blocking_io));
1718 fprintf(stdout, "Write DB ID to manifest : %d\n",
1719 static_cast<int>(FLAGS_write_dbid_to_manifest));
1720 fprintf(stdout, "Max Write Batch Group Size: %" PRIu64 "\n",
1721 FLAGS_max_write_batch_group_size_bytes);
1722 fprintf(stdout, "Use dynamic level : %d\n",
1723 static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
1724
1725 fprintf(stdout, "------------------------------------------------\n");
1726 }
1727
Open()1728 void StressTest::Open() {
1729 assert(db_ == nullptr);
1730 #ifndef ROCKSDB_LITE
1731 assert(txn_db_ == nullptr);
1732 #endif
1733 if (FLAGS_options_file.empty()) {
1734 BlockBasedTableOptions block_based_options;
1735 block_based_options.block_cache = cache_;
1736 block_based_options.cache_index_and_filter_blocks =
1737 FLAGS_cache_index_and_filter_blocks;
1738 block_based_options.block_cache_compressed = compressed_cache_;
1739 block_based_options.checksum = checksum_type_e;
1740 block_based_options.block_size = FLAGS_block_size;
1741 block_based_options.format_version =
1742 static_cast<uint32_t>(FLAGS_format_version);
1743 block_based_options.index_block_restart_interval =
1744 static_cast<int32_t>(FLAGS_index_block_restart_interval);
1745 block_based_options.filter_policy = filter_policy_;
1746 block_based_options.partition_filters = FLAGS_partition_filters;
1747 block_based_options.index_type =
1748 static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
1749 options_.table_factory.reset(
1750 NewBlockBasedTableFactory(block_based_options));
1751 options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
1752 options_.write_buffer_size = FLAGS_write_buffer_size;
1753 options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
1754 options_.min_write_buffer_number_to_merge =
1755 FLAGS_min_write_buffer_number_to_merge;
1756 options_.max_write_buffer_number_to_maintain =
1757 FLAGS_max_write_buffer_number_to_maintain;
1758 options_.max_write_buffer_size_to_maintain =
1759 FLAGS_max_write_buffer_size_to_maintain;
1760 options_.memtable_prefix_bloom_size_ratio =
1761 FLAGS_memtable_prefix_bloom_size_ratio;
1762 options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
1763 options_.max_background_compactions = FLAGS_max_background_compactions;
1764 options_.max_background_flushes = FLAGS_max_background_flushes;
1765 options_.compaction_style =
1766 static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
1767 if (FLAGS_prefix_size >= 0) {
1768 options_.prefix_extractor.reset(
1769 NewFixedPrefixTransform(FLAGS_prefix_size));
1770 }
1771 options_.max_open_files = FLAGS_open_files;
1772 options_.statistics = dbstats;
1773 options_.env = db_stress_env;
1774 options_.use_fsync = FLAGS_use_fsync;
1775 options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
1776 options_.allow_mmap_reads = FLAGS_mmap_read;
1777 options_.allow_mmap_writes = FLAGS_mmap_write;
1778 options_.use_direct_reads = FLAGS_use_direct_reads;
1779 options_.use_direct_io_for_flush_and_compaction =
1780 FLAGS_use_direct_io_for_flush_and_compaction;
1781 options_.recycle_log_file_num =
1782 static_cast<size_t>(FLAGS_recycle_log_file_num);
1783 options_.target_file_size_base = FLAGS_target_file_size_base;
1784 options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
1785 options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
1786 options_.max_bytes_for_level_multiplier =
1787 FLAGS_max_bytes_for_level_multiplier;
1788 options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
1789 options_.level0_slowdown_writes_trigger =
1790 FLAGS_level0_slowdown_writes_trigger;
1791 options_.level0_file_num_compaction_trigger =
1792 FLAGS_level0_file_num_compaction_trigger;
1793 options_.compression = compression_type_e;
1794 options_.bottommost_compression = bottommost_compression_type_e;
1795 options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
1796 options_.compression_opts.zstd_max_train_bytes =
1797 FLAGS_compression_zstd_max_train_bytes;
1798 options_.create_if_missing = true;
1799 options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
1800 options_.inplace_update_support = FLAGS_in_place_update;
1801 options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
1802 options_.allow_concurrent_memtable_write =
1803 FLAGS_allow_concurrent_memtable_write;
1804 options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
1805 options_.ttl = FLAGS_compaction_ttl;
1806 options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
1807 options_.enable_write_thread_adaptive_yield =
1808 FLAGS_enable_write_thread_adaptive_yield;
1809 options_.compaction_options_universal.size_ratio =
1810 FLAGS_universal_size_ratio;
1811 options_.compaction_options_universal.min_merge_width =
1812 FLAGS_universal_min_merge_width;
1813 options_.compaction_options_universal.max_merge_width =
1814 FLAGS_universal_max_merge_width;
1815 options_.compaction_options_universal.max_size_amplification_percent =
1816 FLAGS_universal_max_size_amplification_percent;
1817 options_.atomic_flush = FLAGS_atomic_flush;
1818 options_.avoid_unnecessary_blocking_io =
1819 FLAGS_avoid_unnecessary_blocking_io;
1820 options_.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
1821 options_.max_write_batch_group_size_bytes =
1822 FLAGS_max_write_batch_group_size_bytes;
1823 options_.level_compaction_dynamic_level_bytes =
1824 FLAGS_level_compaction_dynamic_level_bytes;
1825 } else {
1826 #ifdef ROCKSDB_LITE
1827 fprintf(stderr, "--options_file not supported in lite mode\n");
1828 exit(1);
1829 #else
1830 DBOptions db_options;
1831 std::vector<ColumnFamilyDescriptor> cf_descriptors;
1832 Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
1833 &db_options, &cf_descriptors);
1834 db_options.env = new DbStressEnvWrapper(db_stress_env);
1835 if (!s.ok()) {
1836 fprintf(stderr, "Unable to load options file %s --- %s\n",
1837 FLAGS_options_file.c_str(), s.ToString().c_str());
1838 exit(1);
1839 }
1840 options_ = Options(db_options, cf_descriptors[0].options);
1841 #endif // ROCKSDB_LITE
1842 }
1843
1844 if (FLAGS_rate_limiter_bytes_per_sec > 0) {
1845 options_.rate_limiter.reset(NewGenericRateLimiter(
1846 FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
1847 10 /* fairness */,
1848 FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
1849 : RateLimiter::Mode::kWritesOnly));
1850 if (FLAGS_rate_limit_bg_reads) {
1851 options_.new_table_reader_for_compaction_inputs = true;
1852 }
1853 }
1854 if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
1855 FLAGS_sst_file_manager_bytes_per_truncate > 0) {
1856 Status status;
1857 options_.sst_file_manager.reset(NewSstFileManager(
1858 db_stress_env, options_.info_log, "" /* trash_dir */,
1859 static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
1860 true /* delete_existing_trash */, &status,
1861 0.25 /* max_trash_db_ratio */,
1862 FLAGS_sst_file_manager_bytes_per_truncate));
1863 if (!status.ok()) {
1864 fprintf(stderr, "SstFileManager creation failed: %s\n",
1865 status.ToString().c_str());
1866 exit(1);
1867 }
1868 }
1869
1870 if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
1871 fprintf(stderr,
1872 "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
1873 exit(1);
1874 }
1875 if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
1876 fprintf(stderr,
1877 "WARNING: prefix_size is non-zero but "
1878 "memtablerep != prefix_hash\n");
1879 }
1880 switch (FLAGS_rep_factory) {
1881 case kSkipList:
1882 // no need to do anything
1883 break;
1884 #ifndef ROCKSDB_LITE
1885 case kHashSkipList:
1886 options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
1887 break;
1888 case kVectorRep:
1889 options_.memtable_factory.reset(new VectorRepFactory());
1890 break;
1891 #else
1892 default:
1893 fprintf(stderr,
1894 "RocksdbLite only supports skip list mem table. Skip "
1895 "--rep_factory\n");
1896 #endif // ROCKSDB_LITE
1897 }
1898
1899 if (FLAGS_use_full_merge_v1) {
1900 options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
1901 } else {
1902 options_.merge_operator = MergeOperators::CreatePutOperator();
1903 }
1904
1905 fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
1906
1907 Status s;
1908 if (FLAGS_ttl == -1) {
1909 std::vector<std::string> existing_column_families;
1910 s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
1911 &existing_column_families); // ignore errors
1912 if (!s.ok()) {
1913 // DB doesn't exist
1914 assert(existing_column_families.empty());
1915 assert(column_family_names_.empty());
1916 column_family_names_.push_back(kDefaultColumnFamilyName);
1917 } else if (column_family_names_.empty()) {
1918 // this is the first call to the function Open()
1919 column_family_names_ = existing_column_families;
1920 } else {
1921 // this is a reopen. just assert that existing column_family_names are
1922 // equivalent to what we remember
1923 auto sorted_cfn = column_family_names_;
1924 std::sort(sorted_cfn.begin(), sorted_cfn.end());
1925 std::sort(existing_column_families.begin(),
1926 existing_column_families.end());
1927 if (sorted_cfn != existing_column_families) {
1928 fprintf(stderr, "Expected column families differ from the existing:\n");
1929 fprintf(stderr, "Expected: {");
1930 for (auto cf : sorted_cfn) {
1931 fprintf(stderr, "%s ", cf.c_str());
1932 }
1933 fprintf(stderr, "}\n");
1934 fprintf(stderr, "Existing: {");
1935 for (auto cf : existing_column_families) {
1936 fprintf(stderr, "%s ", cf.c_str());
1937 }
1938 fprintf(stderr, "}\n");
1939 }
1940 assert(sorted_cfn == existing_column_families);
1941 }
1942 std::vector<ColumnFamilyDescriptor> cf_descriptors;
1943 for (auto name : column_family_names_) {
1944 if (name != kDefaultColumnFamilyName) {
1945 new_column_family_name_ =
1946 std::max(new_column_family_name_.load(), std::stoi(name) + 1);
1947 }
1948 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
1949 }
1950 while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
1951 std::string name = ToString(new_column_family_name_.load());
1952 new_column_family_name_++;
1953 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
1954 column_family_names_.push_back(name);
1955 }
1956 options_.listeners.clear();
1957 options_.listeners.emplace_back(
1958 new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
1959 options_.create_missing_column_families = true;
1960 if (!FLAGS_use_txn) {
1961 #ifndef ROCKSDB_LITE
1962 if (FLAGS_use_blob_db) {
1963 blob_db::BlobDBOptions blob_db_options;
1964 blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
1965 blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
1966 blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
1967 blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
1968 blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
1969
1970 blob_db::BlobDB* blob_db = nullptr;
1971 s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
1972 cf_descriptors, &column_families_, &blob_db);
1973 if (s.ok()) {
1974 db_ = blob_db;
1975 }
1976 } else
1977 #endif // !ROCKSDB_LITE
1978 {
1979 if (db_preload_finished_.load() && FLAGS_read_only) {
1980 s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors,
1981 &column_families_, &db_);
1982 } else {
1983 s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
1984 &column_families_, &db_);
1985 }
1986 }
1987 } else {
1988 #ifndef ROCKSDB_LITE
1989 TransactionDBOptions txn_db_options;
1990 assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
1991 txn_db_options.write_policy =
1992 static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
1993 if (FLAGS_unordered_write) {
1994 assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED);
1995 options_.unordered_write = true;
1996 options_.two_write_queues = true;
1997 txn_db_options.skip_concurrency_control = true;
1998 }
1999 s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
2000 cf_descriptors, &column_families_, &txn_db_);
2001 if (!s.ok()) {
2002 fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
2003 s.ToString().c_str());
2004 fflush(stderr);
2005 }
2006 assert(s.ok());
2007 db_ = txn_db_;
2008 // after a crash, rollback to commit recovered transactions
2009 std::vector<Transaction*> trans;
2010 txn_db_->GetAllPreparedTransactions(&trans);
2011 Random rand(static_cast<uint32_t>(FLAGS_seed));
2012 for (auto txn : trans) {
2013 if (rand.OneIn(2)) {
2014 s = txn->Commit();
2015 assert(s.ok());
2016 } else {
2017 s = txn->Rollback();
2018 assert(s.ok());
2019 }
2020 delete txn;
2021 }
2022 trans.clear();
2023 txn_db_->GetAllPreparedTransactions(&trans);
2024 assert(trans.size() == 0);
2025 #endif
2026 }
2027 assert(!s.ok() || column_families_.size() ==
2028 static_cast<size_t>(FLAGS_column_families));
2029
2030 if (FLAGS_test_secondary) {
2031 #ifndef ROCKSDB_LITE
2032 secondaries_.resize(FLAGS_threads);
2033 std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2034 secondary_cfh_lists_.clear();
2035 secondary_cfh_lists_.resize(FLAGS_threads);
2036 Options tmp_opts;
2037 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2038 tmp_opts.max_open_files = -1;
2039 tmp_opts.statistics = dbstats_secondaries;
2040 tmp_opts.env = db_stress_env;
2041 for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2042 const std::string secondary_path =
2043 FLAGS_secondaries_base + "/" + std::to_string(i);
2044 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2045 cf_descriptors, &secondary_cfh_lists_[i],
2046 &secondaries_[i]);
2047 if (!s.ok()) {
2048 break;
2049 }
2050 }
2051 assert(s.ok());
2052 #else
2053 fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
2054 exit(1);
2055 #endif
2056 }
2057 if (FLAGS_continuous_verification_interval > 0 && !cmp_db_) {
2058 Options tmp_opts;
2059 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2060 tmp_opts.max_open_files = -1;
2061 tmp_opts.env = db_stress_env;
2062 std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
2063 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2064 cf_descriptors, &cmp_cfhs_, &cmp_db_);
2065 assert(!s.ok() ||
2066 cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
2067 }
2068 } else {
2069 #ifndef ROCKSDB_LITE
2070 DBWithTTL* db_with_ttl;
2071 s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
2072 db_ = db_with_ttl;
2073 if (FLAGS_test_secondary) {
2074 secondaries_.resize(FLAGS_threads);
2075 std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2076 Options tmp_opts;
2077 tmp_opts.env = options_.env;
2078 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2079 tmp_opts.max_open_files = -1;
2080 for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2081 const std::string secondary_path =
2082 FLAGS_secondaries_base + "/" + std::to_string(i);
2083 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2084 &secondaries_[i]);
2085 if (!s.ok()) {
2086 break;
2087 }
2088 }
2089 }
2090 #else
2091 fprintf(stderr, "TTL is not supported in RocksDBLite\n");
2092 exit(1);
2093 #endif
2094 }
2095 if (!s.ok()) {
2096 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2097 exit(1);
2098 }
2099 }
2100
Reopen(ThreadState * thread)2101 void StressTest::Reopen(ThreadState* thread) {
2102 #ifndef ROCKSDB_LITE
2103 // BG jobs in WritePrepared must be canceled first because i) they can access
2104 // the db via a callbac ii) they hold on to a snapshot and the upcoming
2105 // ::Close would complain about it.
2106 const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0;
2107 bool bg_canceled = false;
2108 if (write_prepared || thread->rand.OneIn(2)) {
2109 const bool wait =
2110 write_prepared || static_cast<bool>(thread->rand.OneIn(2));
2111 CancelAllBackgroundWork(db_, wait);
2112 bg_canceled = wait;
2113 }
2114 assert(!write_prepared || bg_canceled);
2115 (void) bg_canceled;
2116 #else
2117 (void) thread;
2118 #endif
2119
2120 for (auto cf : column_families_) {
2121 delete cf;
2122 }
2123 column_families_.clear();
2124
2125 #ifndef ROCKSDB_LITE
2126 if (thread->rand.OneIn(2)) {
2127 Status s = db_->Close();
2128 if (!s.ok()) {
2129 fprintf(stderr, "Non-ok close status: %s\n", s.ToString().c_str());
2130 fflush(stderr);
2131 }
2132 assert(s.ok());
2133 }
2134 #endif
2135 delete db_;
2136 db_ = nullptr;
2137 #ifndef ROCKSDB_LITE
2138 txn_db_ = nullptr;
2139 #endif
2140
2141 assert(secondaries_.size() == secondary_cfh_lists_.size());
2142 size_t n = secondaries_.size();
2143 for (size_t i = 0; i != n; ++i) {
2144 for (auto* cf : secondary_cfh_lists_[i]) {
2145 delete cf;
2146 }
2147 secondary_cfh_lists_[i].clear();
2148 delete secondaries_[i];
2149 }
2150 secondaries_.clear();
2151
2152 num_times_reopened_++;
2153 auto now = db_stress_env->NowMicros();
2154 fprintf(stdout, "%s Reopening database for the %dth time\n",
2155 db_stress_env->TimeToString(now / 1000000).c_str(),
2156 num_times_reopened_);
2157 Open();
2158 }
2159 } // namespace ROCKSDB_NAMESPACE
2160 #endif // GFLAGS
2161