1 // Copyright (c) 2013, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 #pragma once 6 7 #ifndef ROCKSDB_LITE 8 9 #ifndef OS_WIN 10 #include <unistd.h> 11 #endif // ! OS_WIN 12 13 #include <atomic> 14 #include <list> 15 #include <memory> 16 #include <set> 17 #include <sstream> 18 #include <stdexcept> 19 #include <string> 20 #include <thread> 21 22 #include "rocksdb/cache.h" 23 #include "rocksdb/comparator.h" 24 #include "rocksdb/persistent_cache.h" 25 26 #include "utilities/persistent_cache/block_cache_tier_file.h" 27 #include "utilities/persistent_cache/block_cache_tier_metadata.h" 28 #include "utilities/persistent_cache/persistent_cache_util.h" 29 30 #include "memory/arena.h" 31 #include "memtable/skiplist.h" 32 #include "monitoring/histogram.h" 33 #include "port/port.h" 34 #include "util/coding.h" 35 #include "util/crc32c.h" 36 #include "util/mutexlock.h" 37 38 namespace ROCKSDB_NAMESPACE { 39 40 // 41 // Block cache tier implementation 42 // 43 class BlockCacheTier : public PersistentCacheTier { 44 public: BlockCacheTier(const PersistentCacheConfig & opt)45 explicit BlockCacheTier(const PersistentCacheConfig& opt) 46 : opt_(opt), 47 insert_ops_(static_cast<size_t>(opt_.max_write_pipeline_backlog_size)), 48 buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()), 49 writer_(this, opt_.writer_qdepth, static_cast<size_t>(opt_.writer_dispatch_size)) { 50 Info(opt_.log, "Initializing allocator. size=%d B count=%" ROCKSDB_PRIszt, 51 opt_.write_buffer_size, opt_.write_buffer_count()); 52 } 53 ~BlockCacheTier()54 virtual ~BlockCacheTier() { 55 // Close is re-entrant so we can call close even if it is already closed 56 Close(); 57 assert(!insert_th_.joinable()); 58 } 59 60 Status Insert(const Slice& key, const char* data, const size_t size) override; 61 Status Lookup(const Slice& key, std::unique_ptr<char[]>* data, 62 size_t* size) override; 63 Status Open() override; 64 Status Close() override; 65 bool Erase(const Slice& key) override; 66 bool Reserve(const size_t size) override; 67 IsCompressed()68 bool IsCompressed() override { return opt_.is_compressed; } 69 GetPrintableOptions()70 std::string GetPrintableOptions() const override { return opt_.ToString(); } 71 72 PersistentCache::StatsType Stats() override; 73 TEST_Flush()74 void TEST_Flush() override { 75 while (insert_ops_.Size()) { 76 /* sleep override */ 77 Env::Default()->SleepForMicroseconds(1000000); 78 } 79 } 80 81 private: 82 // Percentage of cache to be evicted when the cache is full 83 static const size_t kEvictPct = 10; 84 // Max attempts to insert key, value to cache in pipelined mode 85 static const size_t kMaxRetry = 3; 86 87 // Pipelined operation 88 struct InsertOp { InsertOpInsertOp89 explicit InsertOp(const bool signal) : signal_(signal) {} InsertOpInsertOp90 explicit InsertOp(std::string&& key, const std::string& data) 91 : key_(std::move(key)), data_(data) {} ~InsertOpInsertOp92 ~InsertOp() {} 93 94 InsertOp() = delete; 95 InsertOp(InsertOp&& /*rhs*/) = default; 96 InsertOp& operator=(InsertOp&& rhs) = default; 97 98 // used for estimating size by bounded queue SizeInsertOp99 size_t Size() { return data_.size() + key_.size(); } 100 101 std::string key_; 102 std::string data_; 103 bool signal_ = false; // signal to request processing thread to exit 104 }; 105 106 // entry point for insert thread 107 void InsertMain(); 108 // insert implementation 109 Status InsertImpl(const Slice& key, const Slice& data); 110 // Create a new cache file 111 Status NewCacheFile(); 112 // Get cache directory path GetCachePath()113 std::string GetCachePath() const { return opt_.path + "/cache"; } 114 // Cleanup folder 115 Status CleanupCacheFolder(const std::string& folder); 116 117 // Statistics 118 struct Statistics { 119 HistogramImpl bytes_pipelined_; 120 HistogramImpl bytes_written_; 121 HistogramImpl bytes_read_; 122 HistogramImpl read_hit_latency_; 123 HistogramImpl read_miss_latency_; 124 HistogramImpl write_latency_; 125 std::atomic<uint64_t> cache_hits_{0}; 126 std::atomic<uint64_t> cache_misses_{0}; 127 std::atomic<uint64_t> cache_errors_{0}; 128 std::atomic<uint64_t> insert_dropped_{0}; 129 CacheHitPctStatistics130 double CacheHitPct() const { 131 const auto lookups = cache_hits_ + cache_misses_; 132 return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0; 133 } 134 CacheMissPctStatistics135 double CacheMissPct() const { 136 const auto lookups = cache_hits_ + cache_misses_; 137 return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0; 138 } 139 }; 140 141 port::RWMutex lock_; // Synchronization 142 const PersistentCacheConfig opt_; // BlockCache options 143 BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert 144 ROCKSDB_NAMESPACE::port::Thread insert_th_; // Insert thread 145 uint32_t writer_cache_id_ = 0; // Current cache file identifier 146 WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference 147 CacheWriteBufferAllocator buffer_allocator_; // Buffer provider 148 ThreadedWriter writer_; // Writer threads 149 BlockCacheTierMetadata metadata_; // Cache meta data manager 150 std::atomic<uint64_t> size_{0}; // Size of the cache 151 Statistics stats_; // Statistics 152 }; 153 154 } // namespace ROCKSDB_NAMESPACE 155 156 #endif 157