1 // Copyright (c) 2013, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 #pragma once 10 11 #ifndef ROCKSDB_LITE 12 13 #include <functional> 14 #include <limits> 15 #include <list> 16 #include <memory> 17 #include <string> 18 #include <thread> 19 #include <vector> 20 21 #include "db/db_test_util.h" 22 #include "memory/arena.h" 23 #include "port/port.h" 24 #include "rocksdb/cache.h" 25 #include "table/block_based/block_builder.h" 26 #include "test_util/testharness.h" 27 #include "utilities/persistent_cache/volatile_tier_impl.h" 28 29 namespace ROCKSDB_NAMESPACE { 30 31 // 32 // Unit tests for testing PersistentCacheTier 33 // 34 class PersistentCacheTierTest : public testing::Test { 35 public: 36 PersistentCacheTierTest(); ~PersistentCacheTierTest()37 virtual ~PersistentCacheTierTest() { 38 if (cache_) { 39 Status s = cache_->Close(); 40 assert(s.ok()); 41 } 42 } 43 44 protected: 45 // Flush cache Flush()46 void Flush() { 47 if (cache_) { 48 cache_->TEST_Flush(); 49 } 50 } 51 52 // create threaded workload 53 template <class T> SpawnThreads(const size_t n,const T & fn)54 std::list<port::Thread> SpawnThreads(const size_t n, const T& fn) { 55 std::list<port::Thread> threads; 56 for (size_t i = 0; i < n; i++) { 57 port::Thread th(fn); 58 threads.push_back(std::move(th)); 59 } 60 return threads; 61 } 62 63 // Wait for threads to join Join(std::list<port::Thread> && threads)64 void Join(std::list<port::Thread>&& threads) { 65 for (auto& th : threads) { 66 th.join(); 67 } 68 threads.clear(); 69 } 70 71 // Run insert workload in threads Insert(const size_t nthreads,const size_t max_keys)72 void Insert(const size_t nthreads, const size_t max_keys) { 73 key_ = 0; 74 max_keys_ = max_keys; 75 // spawn threads 76 auto fn = std::bind(&PersistentCacheTierTest::InsertImpl, this); 77 auto threads = SpawnThreads(nthreads, fn); 78 // join with threads 79 Join(std::move(threads)); 80 // Flush cache 81 Flush(); 82 } 83 84 // Run verification on the cache 85 void Verify(const size_t nthreads = 1, const bool eviction_enabled = false) { 86 stats_verify_hits_ = 0; 87 stats_verify_missed_ = 0; 88 key_ = 0; 89 // spawn threads 90 auto fn = 91 std::bind(&PersistentCacheTierTest::VerifyImpl, this, eviction_enabled); 92 auto threads = SpawnThreads(nthreads, fn); 93 // join with threads 94 Join(std::move(threads)); 95 } 96 97 // pad 0 to numbers PaddedNumber(const size_t data,const size_t pad_size)98 std::string PaddedNumber(const size_t data, const size_t pad_size) { 99 assert(pad_size); 100 char* ret = new char[pad_size]; 101 int pos = static_cast<int>(pad_size) - 1; 102 size_t count = 0; 103 size_t t = data; 104 // copy numbers 105 while (t) { 106 count++; 107 ret[pos--] = '0' + t % 10; 108 t = t / 10; 109 } 110 // copy 0s 111 while (pos >= 0) { 112 ret[pos--] = '0'; 113 } 114 // post condition 115 assert(count <= pad_size); 116 assert(pos == -1); 117 std::string result(ret, pad_size); 118 delete[] ret; 119 return result; 120 } 121 122 // Insert workload implementation InsertImpl()123 void InsertImpl() { 124 const std::string prefix = "key_prefix_"; 125 126 while (true) { 127 size_t i = key_++; 128 if (i >= max_keys_) { 129 break; 130 } 131 132 char data[4 * 1024]; 133 memset(data, '0' + (i % 10), sizeof(data)); 134 auto k = prefix + PaddedNumber(i, /*count=*/8); 135 Slice key(k); 136 while (true) { 137 Status status = cache_->Insert(key, data, sizeof(data)); 138 if (status.ok()) { 139 break; 140 } 141 ASSERT_TRUE(status.IsTryAgain()); 142 Env::Default()->SleepForMicroseconds(1 * 1000 * 1000); 143 } 144 } 145 } 146 147 // Verification implementation 148 void VerifyImpl(const bool eviction_enabled = false) { 149 const std::string prefix = "key_prefix_"; 150 while (true) { 151 size_t i = key_++; 152 if (i >= max_keys_) { 153 break; 154 } 155 156 char edata[4 * 1024]; 157 memset(edata, '0' + (i % 10), sizeof(edata)); 158 auto k = prefix + PaddedNumber(i, /*count=*/8); 159 Slice key(k); 160 std::unique_ptr<char[]> block; 161 size_t block_size; 162 163 if (eviction_enabled) { 164 if (!cache_->Lookup(key, &block, &block_size).ok()) { 165 // assume that the key is evicted 166 stats_verify_missed_++; 167 continue; 168 } 169 } 170 171 ASSERT_OK(cache_->Lookup(key, &block, &block_size)); 172 ASSERT_EQ(block_size, sizeof(edata)); 173 ASSERT_EQ(memcmp(edata, block.get(), sizeof(edata)), 0); 174 stats_verify_hits_++; 175 } 176 } 177 178 // template for insert test RunInsertTest(const size_t nthreads,const size_t max_keys)179 void RunInsertTest(const size_t nthreads, const size_t max_keys) { 180 Insert(nthreads, max_keys); 181 Verify(nthreads); 182 ASSERT_EQ(stats_verify_hits_, max_keys); 183 ASSERT_EQ(stats_verify_missed_, 0); 184 185 cache_->Close(); 186 cache_.reset(); 187 } 188 189 // template for negative insert test RunNegativeInsertTest(const size_t nthreads,const size_t max_keys)190 void RunNegativeInsertTest(const size_t nthreads, const size_t max_keys) { 191 Insert(nthreads, max_keys); 192 Verify(nthreads, /*eviction_enabled=*/true); 193 ASSERT_LT(stats_verify_hits_, max_keys); 194 ASSERT_GT(stats_verify_missed_, 0); 195 196 cache_->Close(); 197 cache_.reset(); 198 } 199 200 // template for insert with eviction test RunInsertTestWithEviction(const size_t nthreads,const size_t max_keys)201 void RunInsertTestWithEviction(const size_t nthreads, const size_t max_keys) { 202 Insert(nthreads, max_keys); 203 Verify(nthreads, /*eviction_enabled=*/true); 204 ASSERT_EQ(stats_verify_hits_ + stats_verify_missed_, max_keys); 205 ASSERT_GT(stats_verify_hits_, 0); 206 ASSERT_GT(stats_verify_missed_, 0); 207 208 cache_->Close(); 209 cache_.reset(); 210 } 211 212 const std::string path_; 213 std::shared_ptr<Logger> log_; 214 std::shared_ptr<PersistentCacheTier> cache_; 215 std::atomic<size_t> key_{0}; 216 size_t max_keys_ = 0; 217 std::atomic<size_t> stats_verify_hits_{0}; 218 std::atomic<size_t> stats_verify_missed_{0}; 219 }; 220 221 // 222 // RocksDB tests 223 // 224 class PersistentCacheDBTest : public DBTestBase { 225 public: 226 PersistentCacheDBTest(); 227 TestGetTickerCount(const Options & options,Tickers ticker_type)228 static uint64_t TestGetTickerCount(const Options& options, 229 Tickers ticker_type) { 230 return static_cast<uint32_t>( 231 options.statistics->getTickerCount(ticker_type)); 232 } 233 234 // insert data to table Insert(const Options & options,const BlockBasedTableOptions &,const int num_iter,std::vector<std::string> * values)235 void Insert(const Options& options, 236 const BlockBasedTableOptions& /*table_options*/, 237 const int num_iter, std::vector<std::string>* values) { 238 CreateAndReopenWithCF({"pikachu"}, options); 239 // default column family doesn't have block cache 240 Options no_block_cache_opts; 241 no_block_cache_opts.statistics = options.statistics; 242 no_block_cache_opts = CurrentOptions(no_block_cache_opts); 243 BlockBasedTableOptions table_options_no_bc; 244 table_options_no_bc.no_block_cache = true; 245 no_block_cache_opts.table_factory.reset( 246 NewBlockBasedTableFactory(table_options_no_bc)); 247 ReopenWithColumnFamilies( 248 {"default", "pikachu"}, 249 std::vector<Options>({no_block_cache_opts, options})); 250 251 Random rnd(301); 252 253 // Write 8MB (80 values, each 100K) 254 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); 255 std::string str; 256 for (int i = 0; i < num_iter; i++) { 257 if (i % 4 == 0) { // high compression ratio 258 str = RandomString(&rnd, 1000); 259 } 260 values->push_back(str); 261 ASSERT_OK(Put(1, Key(i), (*values)[i])); 262 } 263 264 // flush all data from memtable so that reads are from block cache 265 ASSERT_OK(Flush(1)); 266 } 267 268 // verify data Verify(const int num_iter,const std::vector<std::string> & values)269 void Verify(const int num_iter, const std::vector<std::string>& values) { 270 for (int j = 0; j < 2; ++j) { 271 for (int i = 0; i < num_iter; i++) { 272 ASSERT_EQ(Get(1, Key(i)), values[i]); 273 } 274 } 275 } 276 277 // test template 278 void RunTest(const std::function<std::shared_ptr<PersistentCacheTier>(bool)>& 279 new_pcache, 280 const size_t max_keys, const size_t max_usecase); 281 }; 282 283 } // namespace ROCKSDB_NAMESPACE 284 285 #endif 286