1 //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 
7 #ifndef ROCKSDB_LITE
8 
9 #ifndef  OS_WIN
10 #include <unistd.h>
11 #endif // ! OS_WIN
12 
13 #include <atomic>
14 #include <list>
15 #include <memory>
16 #include <set>
17 #include <sstream>
18 #include <stdexcept>
19 #include <string>
20 #include <thread>
21 
22 #include "rocksdb/cache.h"
23 #include "rocksdb/comparator.h"
24 #include "rocksdb/persistent_cache.h"
25 
26 #include "utilities/persistent_cache/block_cache_tier_file.h"
27 #include "utilities/persistent_cache/block_cache_tier_metadata.h"
28 #include "utilities/persistent_cache/persistent_cache_util.h"
29 
30 #include "memory/arena.h"
31 #include "memtable/skiplist.h"
32 #include "monitoring/histogram.h"
33 #include "port/port.h"
34 #include "util/coding.h"
35 #include "util/crc32c.h"
36 #include "util/mutexlock.h"
37 
38 namespace ROCKSDB_NAMESPACE {
39 
40 //
41 // Block cache tier implementation
42 //
43 class BlockCacheTier : public PersistentCacheTier {
44  public:
BlockCacheTier(const PersistentCacheConfig & opt)45   explicit BlockCacheTier(const PersistentCacheConfig& opt)
46       : opt_(opt),
47         insert_ops_(static_cast<size_t>(opt_.max_write_pipeline_backlog_size)),
48         buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()),
49         writer_(this, opt_.writer_qdepth, static_cast<size_t>(opt_.writer_dispatch_size)) {
50     Info(opt_.log, "Initializing allocator. size=%d B count=%" ROCKSDB_PRIszt,
51          opt_.write_buffer_size, opt_.write_buffer_count());
52   }
53 
~BlockCacheTier()54   virtual ~BlockCacheTier() {
55     // Close is re-entrant so we can call close even if it is already closed
56     Close();
57     assert(!insert_th_.joinable());
58   }
59 
60   Status Insert(const Slice& key, const char* data, const size_t size) override;
61   Status Lookup(const Slice& key, std::unique_ptr<char[]>* data,
62                 size_t* size) override;
63   Status Open() override;
64   Status Close() override;
65   bool Erase(const Slice& key) override;
66   bool Reserve(const size_t size) override;
67 
IsCompressed()68   bool IsCompressed() override { return opt_.is_compressed; }
69 
GetPrintableOptions()70   std::string GetPrintableOptions() const override { return opt_.ToString(); }
71 
72   PersistentCache::StatsType Stats() override;
73 
TEST_Flush()74   void TEST_Flush() override {
75     while (insert_ops_.Size()) {
76       /* sleep override */
77       Env::Default()->SleepForMicroseconds(1000000);
78     }
79   }
80 
81  private:
82   // Percentage of cache to be evicted when the cache is full
83   static const size_t kEvictPct = 10;
84   // Max attempts to insert key, value to cache in pipelined mode
85   static const size_t kMaxRetry = 3;
86 
87   // Pipelined operation
88   struct InsertOp {
InsertOpInsertOp89     explicit InsertOp(const bool signal) : signal_(signal) {}
InsertOpInsertOp90     explicit InsertOp(std::string&& key, const std::string& data)
91         : key_(std::move(key)), data_(data) {}
~InsertOpInsertOp92     ~InsertOp() {}
93 
94     InsertOp() = delete;
95     InsertOp(InsertOp&& /*rhs*/) = default;
96     InsertOp& operator=(InsertOp&& rhs) = default;
97 
98     // used for estimating size by bounded queue
SizeInsertOp99     size_t Size() { return data_.size() + key_.size(); }
100 
101     std::string key_;
102     std::string data_;
103     bool signal_ = false;  // signal to request processing thread to exit
104   };
105 
106   // entry point for insert thread
107   void InsertMain();
108   // insert implementation
109   Status InsertImpl(const Slice& key, const Slice& data);
110   // Create a new cache file
111   Status NewCacheFile();
112   // Get cache directory path
GetCachePath()113   std::string GetCachePath() const { return opt_.path + "/cache"; }
114   // Cleanup folder
115   Status CleanupCacheFolder(const std::string& folder);
116 
117   // Statistics
118   struct Statistics {
119     HistogramImpl bytes_pipelined_;
120     HistogramImpl bytes_written_;
121     HistogramImpl bytes_read_;
122     HistogramImpl read_hit_latency_;
123     HistogramImpl read_miss_latency_;
124     HistogramImpl write_latency_;
125     std::atomic<uint64_t> cache_hits_{0};
126     std::atomic<uint64_t> cache_misses_{0};
127     std::atomic<uint64_t> cache_errors_{0};
128     std::atomic<uint64_t> insert_dropped_{0};
129 
CacheHitPctStatistics130     double CacheHitPct() const {
131       const auto lookups = cache_hits_ + cache_misses_;
132       return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
133     }
134 
CacheMissPctStatistics135     double CacheMissPct() const {
136       const auto lookups = cache_hits_ + cache_misses_;
137       return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
138     }
139   };
140 
141   port::RWMutex lock_;                          // Synchronization
142   const PersistentCacheConfig opt_;             // BlockCache options
143   BoundedQueue<InsertOp> insert_ops_;           // Ops waiting for insert
144   ROCKSDB_NAMESPACE::port::Thread insert_th_;   // Insert thread
145   uint32_t writer_cache_id_ = 0;                // Current cache file identifier
146   WriteableCacheFile* cache_file_ = nullptr;    // Current cache file reference
147   CacheWriteBufferAllocator buffer_allocator_;  // Buffer provider
148   ThreadedWriter writer_;                       // Writer threads
149   BlockCacheTierMetadata metadata_;             // Cache meta data manager
150   std::atomic<uint64_t> size_{0};               // Size of the cache
151   Statistics stats_;                                 // Statistics
152 };
153 
154 }  // namespace ROCKSDB_NAMESPACE
155 
156 #endif
157