1 //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #pragma once
10 
11 #ifndef ROCKSDB_LITE
12 
13 #include <functional>
14 #include <limits>
15 #include <list>
16 #include <memory>
17 #include <string>
18 #include <thread>
19 #include <vector>
20 
21 #include "db/db_test_util.h"
22 #include "memory/arena.h"
23 #include "port/port.h"
24 #include "rocksdb/cache.h"
25 #include "table/block_based/block_builder.h"
26 #include "test_util/testharness.h"
27 #include "utilities/persistent_cache/volatile_tier_impl.h"
28 
29 namespace ROCKSDB_NAMESPACE {
30 
31 //
32 // Unit tests for testing PersistentCacheTier
33 //
34 class PersistentCacheTierTest : public testing::Test {
35  public:
36   PersistentCacheTierTest();
~PersistentCacheTierTest()37   virtual ~PersistentCacheTierTest() {
38     if (cache_) {
39       Status s = cache_->Close();
40       assert(s.ok());
41     }
42   }
43 
44  protected:
45   // Flush cache
Flush()46   void Flush() {
47     if (cache_) {
48       cache_->TEST_Flush();
49     }
50   }
51 
52   // create threaded workload
53   template <class T>
SpawnThreads(const size_t n,const T & fn)54   std::list<port::Thread> SpawnThreads(const size_t n, const T& fn) {
55     std::list<port::Thread> threads;
56     for (size_t i = 0; i < n; i++) {
57       port::Thread th(fn);
58       threads.push_back(std::move(th));
59     }
60     return threads;
61   }
62 
63   // Wait for threads to join
Join(std::list<port::Thread> && threads)64   void Join(std::list<port::Thread>&& threads) {
65     for (auto& th : threads) {
66       th.join();
67     }
68     threads.clear();
69   }
70 
71   // Run insert workload in threads
Insert(const size_t nthreads,const size_t max_keys)72   void Insert(const size_t nthreads, const size_t max_keys) {
73     key_ = 0;
74     max_keys_ = max_keys;
75     // spawn threads
76     auto fn = std::bind(&PersistentCacheTierTest::InsertImpl, this);
77     auto threads = SpawnThreads(nthreads, fn);
78     // join with threads
79     Join(std::move(threads));
80     // Flush cache
81     Flush();
82   }
83 
84   // Run verification on the cache
85   void Verify(const size_t nthreads = 1, const bool eviction_enabled = false) {
86     stats_verify_hits_ = 0;
87     stats_verify_missed_ = 0;
88     key_ = 0;
89     // spawn threads
90     auto fn =
91         std::bind(&PersistentCacheTierTest::VerifyImpl, this, eviction_enabled);
92     auto threads = SpawnThreads(nthreads, fn);
93     // join with threads
94     Join(std::move(threads));
95   }
96 
97   // pad 0 to numbers
PaddedNumber(const size_t data,const size_t pad_size)98   std::string PaddedNumber(const size_t data, const size_t pad_size) {
99     assert(pad_size);
100     char* ret = new char[pad_size];
101     int pos = static_cast<int>(pad_size) - 1;
102     size_t count = 0;
103     size_t t = data;
104     // copy numbers
105     while (t) {
106       count++;
107       ret[pos--] = '0' + t % 10;
108       t = t / 10;
109     }
110     // copy 0s
111     while (pos >= 0) {
112       ret[pos--] = '0';
113     }
114     // post condition
115     assert(count <= pad_size);
116     assert(pos == -1);
117     std::string result(ret, pad_size);
118     delete[] ret;
119     return result;
120   }
121 
122   // Insert workload implementation
InsertImpl()123   void InsertImpl() {
124     const std::string prefix = "key_prefix_";
125 
126     while (true) {
127       size_t i = key_++;
128       if (i >= max_keys_) {
129         break;
130       }
131 
132       char data[4 * 1024];
133       memset(data, '0' + (i % 10), sizeof(data));
134       auto k = prefix + PaddedNumber(i, /*count=*/8);
135       Slice key(k);
136       while (true) {
137         Status status = cache_->Insert(key, data, sizeof(data));
138         if (status.ok()) {
139           break;
140         }
141         ASSERT_TRUE(status.IsTryAgain());
142         Env::Default()->SleepForMicroseconds(1 * 1000 * 1000);
143       }
144     }
145   }
146 
147   // Verification implementation
148   void VerifyImpl(const bool eviction_enabled = false) {
149     const std::string prefix = "key_prefix_";
150     while (true) {
151       size_t i = key_++;
152       if (i >= max_keys_) {
153         break;
154       }
155 
156       char edata[4 * 1024];
157       memset(edata, '0' + (i % 10), sizeof(edata));
158       auto k = prefix + PaddedNumber(i, /*count=*/8);
159       Slice key(k);
160       std::unique_ptr<char[]> block;
161       size_t block_size;
162 
163       if (eviction_enabled) {
164         if (!cache_->Lookup(key, &block, &block_size).ok()) {
165           // assume that the key is evicted
166           stats_verify_missed_++;
167           continue;
168         }
169       }
170 
171       ASSERT_OK(cache_->Lookup(key, &block, &block_size));
172       ASSERT_EQ(block_size, sizeof(edata));
173       ASSERT_EQ(memcmp(edata, block.get(), sizeof(edata)), 0);
174       stats_verify_hits_++;
175     }
176   }
177 
178   // template for insert test
RunInsertTest(const size_t nthreads,const size_t max_keys)179   void RunInsertTest(const size_t nthreads, const size_t max_keys) {
180     Insert(nthreads, max_keys);
181     Verify(nthreads);
182     ASSERT_EQ(stats_verify_hits_, max_keys);
183     ASSERT_EQ(stats_verify_missed_, 0);
184 
185     cache_->Close();
186     cache_.reset();
187   }
188 
189   // template for negative insert test
RunNegativeInsertTest(const size_t nthreads,const size_t max_keys)190   void RunNegativeInsertTest(const size_t nthreads, const size_t max_keys) {
191     Insert(nthreads, max_keys);
192     Verify(nthreads, /*eviction_enabled=*/true);
193     ASSERT_LT(stats_verify_hits_, max_keys);
194     ASSERT_GT(stats_verify_missed_, 0);
195 
196     cache_->Close();
197     cache_.reset();
198   }
199 
200   // template for insert with eviction test
RunInsertTestWithEviction(const size_t nthreads,const size_t max_keys)201   void RunInsertTestWithEviction(const size_t nthreads, const size_t max_keys) {
202     Insert(nthreads, max_keys);
203     Verify(nthreads, /*eviction_enabled=*/true);
204     ASSERT_EQ(stats_verify_hits_ + stats_verify_missed_, max_keys);
205     ASSERT_GT(stats_verify_hits_, 0);
206     ASSERT_GT(stats_verify_missed_, 0);
207 
208     cache_->Close();
209     cache_.reset();
210   }
211 
212   const std::string path_;
213   std::shared_ptr<Logger> log_;
214   std::shared_ptr<PersistentCacheTier> cache_;
215   std::atomic<size_t> key_{0};
216   size_t max_keys_ = 0;
217   std::atomic<size_t> stats_verify_hits_{0};
218   std::atomic<size_t> stats_verify_missed_{0};
219 };
220 
221 //
222 // RocksDB tests
223 //
224 class PersistentCacheDBTest : public DBTestBase {
225  public:
226   PersistentCacheDBTest();
227 
TestGetTickerCount(const Options & options,Tickers ticker_type)228   static uint64_t TestGetTickerCount(const Options& options,
229                                      Tickers ticker_type) {
230     return static_cast<uint32_t>(
231         options.statistics->getTickerCount(ticker_type));
232   }
233 
234   // insert data to table
Insert(const Options & options,const BlockBasedTableOptions &,const int num_iter,std::vector<std::string> * values)235   void Insert(const Options& options,
236               const BlockBasedTableOptions& /*table_options*/,
237               const int num_iter, std::vector<std::string>* values) {
238     CreateAndReopenWithCF({"pikachu"}, options);
239     // default column family doesn't have block cache
240     Options no_block_cache_opts;
241     no_block_cache_opts.statistics = options.statistics;
242     no_block_cache_opts = CurrentOptions(no_block_cache_opts);
243     BlockBasedTableOptions table_options_no_bc;
244     table_options_no_bc.no_block_cache = true;
245     no_block_cache_opts.table_factory.reset(
246         NewBlockBasedTableFactory(table_options_no_bc));
247     ReopenWithColumnFamilies(
248         {"default", "pikachu"},
249         std::vector<Options>({no_block_cache_opts, options}));
250 
251     Random rnd(301);
252 
253     // Write 8MB (80 values, each 100K)
254     ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
255     std::string str;
256     for (int i = 0; i < num_iter; i++) {
257       if (i % 4 == 0) {  // high compression ratio
258         str = RandomString(&rnd, 1000);
259       }
260       values->push_back(str);
261       ASSERT_OK(Put(1, Key(i), (*values)[i]));
262     }
263 
264     // flush all data from memtable so that reads are from block cache
265     ASSERT_OK(Flush(1));
266   }
267 
268   // verify data
Verify(const int num_iter,const std::vector<std::string> & values)269   void Verify(const int num_iter, const std::vector<std::string>& values) {
270     for (int j = 0; j < 2; ++j) {
271       for (int i = 0; i < num_iter; i++) {
272         ASSERT_EQ(Get(1, Key(i)), values[i]);
273       }
274     }
275   }
276 
277   // test template
278   void RunTest(const std::function<std::shared_ptr<PersistentCacheTier>(bool)>&
279                    new_pcache,
280                const size_t max_keys, const size_t max_usecase);
281 };
282 
283 }  // namespace ROCKSDB_NAMESPACE
284 
285 #endif
286