1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 
11 #ifdef GFLAGS
12 #include "db_stress_tool/db_stress_common.h"
13 #include <cmath>
14 
15 ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env = nullptr;
16 enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =
17     ROCKSDB_NAMESPACE::kSnappyCompression;
18 enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e =
19     ROCKSDB_NAMESPACE::kSnappyCompression;
20 enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e =
21     ROCKSDB_NAMESPACE::kCRC32c;
22 enum RepFactory FLAGS_rep_factory = kSkipList;
23 std::vector<double> sum_probs(100001);
24 int64_t zipf_sum_size = 100000;
25 
26 namespace ROCKSDB_NAMESPACE {
27 
28 // Zipfian distribution is generated based on a pre-calculated array.
29 // It should be used before start the stress test.
30 // First, the probability distribution function (PDF) of this Zipfian follows
31 // power low. P(x) = 1/(x^alpha).
32 // So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop
33 // and add the PDF value togetger as c. So we get the total probability in c.
34 // Next, we calculate inverse CDF of Zipfian and store the value of each in
35 // an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for
36 // integer k, its Zipfian CDF value is sum_probs[k].
37 // Third, when we need to get an integer whose probability follows Zipfian
38 // distribution, we use a rand_seed [0,1] which follows uniform distribution
39 // as a seed and search it in the sum_probs via binary search. When we find
40 // the closest sum_probs[i] of rand_seed, i is the integer that in
41 // [0, zipf_sum_size] following Zipfian distribution with parameter alpha.
42 // Finally, we can scale i to [0, max_key] scale.
43 // In order to avoid that hot keys are close to each other and skew towards 0,
44 // we use Rando64 to shuffle it.
InitializeHotKeyGenerator(double alpha)45 void InitializeHotKeyGenerator(double alpha) {
46   double c = 0;
47   for (int64_t i = 1; i <= zipf_sum_size; i++) {
48     c = c + (1.0 / std::pow(static_cast<double>(i), alpha));
49   }
50   c = 1.0 / c;
51 
52   sum_probs[0] = 0;
53   for (int64_t i = 1; i <= zipf_sum_size; i++) {
54     sum_probs[i] =
55         sum_probs[i - 1] + c / std::pow(static_cast<double>(i), alpha);
56   }
57 }
58 
59 // Generate one key that follows the Zipfian distribution. The skewness
60 // is decided by the parameter alpha. Input is the rand_seed [0,1] and
61 // the max of the key to be generated. If we directly return tmp_zipf_seed,
62 // the closer to 0, the higher probability will be. To randomly distribute
63 // the hot keys in [0, max_key], we use Random64 to shuffle it.
GetOneHotKeyID(double rand_seed,int64_t max_key)64 int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) {
65   int64_t low = 1, mid, high = zipf_sum_size, zipf = 0;
66   while (low <= high) {
67     mid = (low + high) / 2;
68     if (sum_probs[mid] >= rand_seed && sum_probs[mid - 1] < rand_seed) {
69       zipf = mid;
70       break;
71     } else if (sum_probs[mid] >= rand_seed) {
72       high = mid - 1;
73     } else {
74       low = mid + 1;
75     }
76   }
77   int64_t tmp_zipf_seed = zipf * max_key / zipf_sum_size;
78   Random64 rand_local(tmp_zipf_seed);
79   return rand_local.Next() % max_key;
80 }
81 
PoolSizeChangeThread(void * v)82 void PoolSizeChangeThread(void* v) {
83   assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
84   ThreadState* thread = reinterpret_cast<ThreadState*>(v);
85   SharedState* shared = thread->shared;
86 
87   while (true) {
88     {
89       MutexLock l(shared->GetMutex());
90       if (shared->ShouldStopBgThread()) {
91         shared->IncBgThreadsFinished();
92         if (shared->BgThreadsFinished()) {
93           shared->GetCondVar()->SignalAll();
94         }
95         return;
96       }
97     }
98 
99     auto thread_pool_size_base = FLAGS_max_background_compactions;
100     auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations;
101     int new_thread_pool_size =
102         thread_pool_size_base - thread_pool_size_var +
103         thread->rand.Next() % (thread_pool_size_var * 2 + 1);
104     if (new_thread_pool_size < 1) {
105       new_thread_pool_size = 1;
106     }
107     db_stress_env->SetBackgroundThreads(new_thread_pool_size,
108                                         ROCKSDB_NAMESPACE::Env::Priority::LOW);
109     // Sleep up to 3 seconds
110     db_stress_env->SleepForMicroseconds(
111         thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
112             1000 +
113         1);
114   }
115 }
116 
DbVerificationThread(void * v)117 void DbVerificationThread(void* v) {
118   assert(FLAGS_continuous_verification_interval > 0);
119   auto* thread = reinterpret_cast<ThreadState*>(v);
120   SharedState* shared = thread->shared;
121   StressTest* stress_test = shared->GetStressTest();
122   assert(stress_test != nullptr);
123   while (true) {
124     {
125       MutexLock l(shared->GetMutex());
126       if (shared->ShouldStopBgThread()) {
127         shared->IncBgThreadsFinished();
128         if (shared->BgThreadsFinished()) {
129           shared->GetCondVar()->SignalAll();
130         }
131         return;
132       }
133     }
134     if (!shared->HasVerificationFailedYet()) {
135       stress_test->ContinuouslyVerifyDb(thread);
136     }
137     db_stress_env->SleepForMicroseconds(
138         thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 +
139         1);
140   }
141 }
142 
PrintKeyValue(int cf,uint64_t key,const char * value,size_t sz)143 void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) {
144   if (!FLAGS_verbose) {
145     return;
146   }
147   std::string tmp;
148   tmp.reserve(sz * 2 + 16);
149   char buf[4];
150   for (size_t i = 0; i < sz; i++) {
151     snprintf(buf, 4, "%X", value[i]);
152     tmp.append(buf);
153   }
154   fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", cf,
155           key, sz, tmp.c_str());
156 }
157 
158 // Note that if hot_key_alpha != 0, it generates the key based on Zipfian
159 // distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does
160 // not ensure the order of the keys being generated and the keys does not have
161 // the active range which is related to FLAGS_active_width.
GenerateOneKey(ThreadState * thread,uint64_t iteration)162 int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {
163   const double completed_ratio =
164       static_cast<double>(iteration) / FLAGS_ops_per_thread;
165   const int64_t base_key = static_cast<int64_t>(
166       completed_ratio * (FLAGS_max_key - FLAGS_active_width));
167   int64_t rand_seed = base_key + thread->rand.Next() % FLAGS_active_width;
168   int64_t cur_key = rand_seed;
169   if (FLAGS_hot_key_alpha != 0) {
170     // If set the Zipfian distribution Alpha to non 0, use Zipfian
171     double float_rand =
172         (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
173         FLAGS_max_key;
174     cur_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
175   }
176   return cur_key;
177 }
178 
179 // Note that if hot_key_alpha != 0, it generates the key based on Zipfian
180 // distribution. Keys being generated are in random order.
181 // If user want to generate keys based on uniform distribution, user needs to
182 // set hot_key_alpha == 0. It will generate the random keys in increasing
183 // order in the key array (ensure key[i] >= key[i+1]) and constrained in a
184 // range related to FLAGS_active_width.
GenerateNKeys(ThreadState * thread,int num_keys,uint64_t iteration)185 std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
186                                    uint64_t iteration) {
187   const double completed_ratio =
188       static_cast<double>(iteration) / FLAGS_ops_per_thread;
189   const int64_t base_key = static_cast<int64_t>(
190       completed_ratio * (FLAGS_max_key - FLAGS_active_width));
191   std::vector<int64_t> keys;
192   keys.reserve(num_keys);
193   int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
194   keys.push_back(next_key);
195   for (int i = 1; i < num_keys; ++i) {
196     // Generate the key follows zipfian distribution
197     if (FLAGS_hot_key_alpha != 0) {
198       double float_rand =
199           (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
200           FLAGS_max_key;
201       next_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
202     } else {
203       // This may result in some duplicate keys
204       next_key = next_key + thread->rand.Next() %
205                                 (FLAGS_active_width - (next_key - base_key));
206     }
207     keys.push_back(next_key);
208   }
209   return keys;
210 }
211 
GenerateValue(uint32_t rand,char * v,size_t max_sz)212 size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) {
213   size_t value_sz =
214       ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
215   assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
216   (void)max_sz;
217   *((uint32_t*)v) = rand;
218   for (size_t i = sizeof(uint32_t); i < value_sz; i++) {
219     v[i] = (char)(rand ^ i);
220   }
221   v[value_sz] = '\0';
222   return value_sz;  // the size of the value set.
223 }
224 }  // namespace ROCKSDB_NAMESPACE
225 #endif  // GFLAGS
226