1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "rocksdb/utilities/sim_cache.h"
7 #include <atomic>
8 #include "env/composite_env_wrapper.h"
9 #include "file/writable_file_writer.h"
10 #include "monitoring/statistics.h"
11 #include "port/port.h"
12 #include "rocksdb/env.h"
13 #include "util/mutexlock.h"
14 #include "util/string_util.h"
15 
16 namespace ROCKSDB_NAMESPACE {
17 
18 namespace {
19 
20 class CacheActivityLogger {
21  public:
CacheActivityLogger()22   CacheActivityLogger()
23       : activity_logging_enabled_(false), max_logging_size_(0) {}
24 
~CacheActivityLogger()25   ~CacheActivityLogger() {
26     MutexLock l(&mutex_);
27 
28     StopLoggingInternal();
29   }
30 
StartLogging(const std::string & activity_log_file,Env * env,uint64_t max_logging_size=0)31   Status StartLogging(const std::string& activity_log_file, Env* env,
32                       uint64_t max_logging_size = 0) {
33     assert(activity_log_file != "");
34     assert(env != nullptr);
35 
36     Status status;
37     EnvOptions env_opts;
38     std::unique_ptr<WritableFile> log_file;
39 
40     MutexLock l(&mutex_);
41 
42     // Stop existing logging if any
43     StopLoggingInternal();
44 
45     // Open log file
46     status = env->NewWritableFile(activity_log_file, &log_file, env_opts);
47     if (!status.ok()) {
48       return status;
49     }
50     file_writer_.reset(new WritableFileWriter(
51         NewLegacyWritableFileWrapper(std::move(log_file)), activity_log_file,
52         env_opts));
53 
54     max_logging_size_ = max_logging_size;
55     activity_logging_enabled_.store(true);
56 
57     return status;
58   }
59 
StopLogging()60   void StopLogging() {
61     MutexLock l(&mutex_);
62 
63     StopLoggingInternal();
64   }
65 
ReportLookup(const Slice & key)66   void ReportLookup(const Slice& key) {
67     if (activity_logging_enabled_.load() == false) {
68       return;
69     }
70 
71     std::string log_line = "LOOKUP - " + key.ToString(true) + "\n";
72 
73     // line format: "LOOKUP - <KEY>"
74     MutexLock l(&mutex_);
75     Status s = file_writer_->Append(log_line);
76     if (!s.ok() && bg_status_.ok()) {
77       bg_status_ = s;
78     }
79     if (MaxLoggingSizeReached() || !bg_status_.ok()) {
80       // Stop logging if we have reached the max file size or
81       // encountered an error
82       StopLoggingInternal();
83     }
84   }
85 
ReportAdd(const Slice & key,size_t size)86   void ReportAdd(const Slice& key, size_t size) {
87     if (activity_logging_enabled_.load() == false) {
88       return;
89     }
90 
91     std::string log_line = "ADD - ";
92     log_line += key.ToString(true);
93     log_line += " - ";
94     AppendNumberTo(&log_line, size);
95   // @lint-ignore TXT2 T25377293 Grandfathered in
96 		log_line += "\n";
97 
98     // line format: "ADD - <KEY> - <KEY-SIZE>"
99     MutexLock l(&mutex_);
100     Status s = file_writer_->Append(log_line);
101     if (!s.ok() && bg_status_.ok()) {
102       bg_status_ = s;
103     }
104 
105     if (MaxLoggingSizeReached() || !bg_status_.ok()) {
106       // Stop logging if we have reached the max file size or
107       // encountered an error
108       StopLoggingInternal();
109     }
110   }
111 
bg_status()112   Status& bg_status() {
113     MutexLock l(&mutex_);
114     return bg_status_;
115   }
116 
117  private:
MaxLoggingSizeReached()118   bool MaxLoggingSizeReached() {
119     mutex_.AssertHeld();
120 
121     return (max_logging_size_ > 0 &&
122             file_writer_->GetFileSize() >= max_logging_size_);
123   }
124 
StopLoggingInternal()125   void StopLoggingInternal() {
126     mutex_.AssertHeld();
127 
128     if (!activity_logging_enabled_) {
129       return;
130     }
131 
132     activity_logging_enabled_.store(false);
133     Status s = file_writer_->Close();
134     if (!s.ok() && bg_status_.ok()) {
135       bg_status_ = s;
136     }
137   }
138 
139   // Mutex to sync writes to file_writer, and all following
140   // class data members
141   port::Mutex mutex_;
142   // Indicates if logging is currently enabled
143   // atomic to allow reads without mutex
144   std::atomic<bool> activity_logging_enabled_;
145   // When reached, we will stop logging and close the file
146   // Value of 0 means unlimited
147   uint64_t max_logging_size_;
148   std::unique_ptr<WritableFileWriter> file_writer_;
149   Status bg_status_;
150 };
151 
152 // SimCacheImpl definition
153 class SimCacheImpl : public SimCache {
154  public:
155   // capacity for real cache (ShardedLRUCache)
156   // test_capacity for key only cache
SimCacheImpl(std::shared_ptr<Cache> sim_cache,std::shared_ptr<Cache> cache)157   SimCacheImpl(std::shared_ptr<Cache> sim_cache, std::shared_ptr<Cache> cache)
158       : cache_(cache),
159         key_only_cache_(sim_cache),
160         miss_times_(0),
161         hit_times_(0),
162         stats_(nullptr) {}
163 
~SimCacheImpl()164   ~SimCacheImpl() override {}
SetCapacity(size_t capacity)165   void SetCapacity(size_t capacity) override { cache_->SetCapacity(capacity); }
166 
SetStrictCapacityLimit(bool strict_capacity_limit)167   void SetStrictCapacityLimit(bool strict_capacity_limit) override {
168     cache_->SetStrictCapacityLimit(strict_capacity_limit);
169   }
170 
Insert(const Slice & key,void * value,size_t charge,Deleter * deleter,Handle ** handle,Priority priority)171   Status Insert(const Slice& key, void* value, size_t charge, Deleter* deleter,
172                 Handle** handle, Priority priority) override {
173     // The handle and value passed in are for real cache, so we pass nullptr
174     // to key_only_cache_ for both instead. Also, the deleter should be invoked
175     // only once (on the actual value), so we pass nullptr to key_only_cache for
176     // that one as well.
177     Handle* h = key_only_cache_->Lookup(key);
178     if (h == nullptr) {
179       key_only_cache_->Insert(key, nullptr, charge, nullptr, nullptr, priority);
180     } else {
181       key_only_cache_->Release(h);
182     }
183 
184     cache_activity_logger_.ReportAdd(key, charge);
185     if (!cache_) {
186       return Status::OK();
187     }
188     return cache_->Insert(key, value, charge, deleter, handle, priority);
189   }
190 
Lookup(const Slice & key,Statistics * stats)191   Handle* Lookup(const Slice& key, Statistics* stats) override {
192     Handle* h = key_only_cache_->Lookup(key);
193     if (h != nullptr) {
194       key_only_cache_->Release(h);
195       inc_hit_counter();
196       RecordTick(stats, SIM_BLOCK_CACHE_HIT);
197     } else {
198       inc_miss_counter();
199       RecordTick(stats, SIM_BLOCK_CACHE_MISS);
200     }
201 
202     cache_activity_logger_.ReportLookup(key);
203     if (!cache_) {
204       return nullptr;
205     }
206     return cache_->Lookup(key, stats);
207   }
208 
Ref(Handle * handle)209   bool Ref(Handle* handle) override { return cache_->Ref(handle); }
210 
Release(Handle * handle,bool force_erase=false)211   bool Release(Handle* handle, bool force_erase = false) override {
212     return cache_->Release(handle, force_erase);
213   }
214 
Erase(const Slice & key)215   void Erase(const Slice& key) override {
216     cache_->Erase(key);
217     key_only_cache_->Erase(key);
218   }
219 
Value(Handle * handle)220   void* Value(Handle* handle) override { return cache_->Value(handle); }
221 
NewId()222   uint64_t NewId() override { return cache_->NewId(); }
223 
GetCapacity() const224   size_t GetCapacity() const override { return cache_->GetCapacity(); }
225 
HasStrictCapacityLimit() const226   bool HasStrictCapacityLimit() const override {
227     return cache_->HasStrictCapacityLimit();
228   }
229 
GetUsage() const230   size_t GetUsage() const override { return cache_->GetUsage(); }
231 
GetUsage(Handle * handle) const232   size_t GetUsage(Handle* handle) const override {
233     return cache_->GetUsage(handle);
234   }
235 
GetCharge(Handle * handle) const236   size_t GetCharge(Handle* handle) const override {
237     return cache_->GetCharge(handle);
238   }
239 
GetPinnedUsage() const240   size_t GetPinnedUsage() const override { return cache_->GetPinnedUsage(); }
241 
DisownData()242   void DisownData() override {
243     cache_->DisownData();
244     key_only_cache_->DisownData();
245   }
246 
ApplyToAllCacheEntries(void (* callback)(void *,size_t),bool thread_safe)247   void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
248                               bool thread_safe) override {
249     // only apply to _cache since key_only_cache doesn't hold value
250     cache_->ApplyToAllCacheEntries(callback, thread_safe);
251   }
252 
EraseUnRefEntries()253   void EraseUnRefEntries() override {
254     cache_->EraseUnRefEntries();
255     key_only_cache_->EraseUnRefEntries();
256   }
257 
GetSimCapacity() const258   size_t GetSimCapacity() const override {
259     return key_only_cache_->GetCapacity();
260   }
GetSimUsage() const261   size_t GetSimUsage() const override { return key_only_cache_->GetUsage(); }
SetSimCapacity(size_t capacity)262   void SetSimCapacity(size_t capacity) override {
263     key_only_cache_->SetCapacity(capacity);
264   }
265 
get_miss_counter() const266   uint64_t get_miss_counter() const override {
267     return miss_times_.load(std::memory_order_relaxed);
268   }
269 
get_hit_counter() const270   uint64_t get_hit_counter() const override {
271     return hit_times_.load(std::memory_order_relaxed);
272   }
273 
reset_counter()274   void reset_counter() override {
275     miss_times_.store(0, std::memory_order_relaxed);
276     hit_times_.store(0, std::memory_order_relaxed);
277     SetTickerCount(stats_, SIM_BLOCK_CACHE_HIT, 0);
278     SetTickerCount(stats_, SIM_BLOCK_CACHE_MISS, 0);
279   }
280 
ToString() const281   std::string ToString() const override {
282     std::string res;
283     res.append("SimCache MISSes: " + std::to_string(get_miss_counter()) + "\n");
284     res.append("SimCache HITs:    " + std::to_string(get_hit_counter()) + "\n");
285     char buff[350];
286     auto lookups = get_miss_counter() + get_hit_counter();
287     snprintf(buff, sizeof(buff), "SimCache HITRATE: %.2f%%\n",
288              (lookups == 0 ? 0 : get_hit_counter() * 100.0f / lookups));
289     res.append(buff);
290     return res;
291   }
292 
GetPrintableOptions() const293   std::string GetPrintableOptions() const override {
294     std::string ret;
295     ret.reserve(20000);
296     ret.append("    cache_options:\n");
297     ret.append(cache_->GetPrintableOptions());
298     ret.append("    sim_cache_options:\n");
299     ret.append(key_only_cache_->GetPrintableOptions());
300     return ret;
301   }
302 
StartActivityLogging(const std::string & activity_log_file,Env * env,uint64_t max_logging_size=0)303   Status StartActivityLogging(const std::string& activity_log_file, Env* env,
304                               uint64_t max_logging_size = 0) override {
305     return cache_activity_logger_.StartLogging(activity_log_file, env,
306                                                max_logging_size);
307   }
308 
StopActivityLogging()309   void StopActivityLogging() override { cache_activity_logger_.StopLogging(); }
310 
GetActivityLoggingStatus()311   Status GetActivityLoggingStatus() override {
312     return cache_activity_logger_.bg_status();
313   }
314 
315  private:
316   std::shared_ptr<Cache> cache_;
317   std::shared_ptr<Cache> key_only_cache_;
318   std::atomic<uint64_t> miss_times_;
319   std::atomic<uint64_t> hit_times_;
320   Statistics* stats_;
321   CacheActivityLogger cache_activity_logger_;
322 
inc_miss_counter()323   void inc_miss_counter() {
324     miss_times_.fetch_add(1, std::memory_order_relaxed);
325   }
inc_hit_counter()326   void inc_hit_counter() { hit_times_.fetch_add(1, std::memory_order_relaxed); }
327 };
328 
329 }  // end anonymous namespace
330 
331 // For instrumentation purpose, use NewSimCache instead
NewSimCache(std::shared_ptr<Cache> cache,size_t sim_capacity,int num_shard_bits)332 std::shared_ptr<SimCache> NewSimCache(std::shared_ptr<Cache> cache,
333                                       size_t sim_capacity, int num_shard_bits) {
334   LRUCacheOptions co;
335   co.capacity = sim_capacity;
336   co.num_shard_bits = num_shard_bits;
337   co.metadata_charge_policy = kDontChargeCacheMetadata;
338   return NewSimCache(NewLRUCache(co), cache, num_shard_bits);
339 }
340 
NewSimCache(std::shared_ptr<Cache> sim_cache,std::shared_ptr<Cache> cache,int num_shard_bits)341 std::shared_ptr<SimCache> NewSimCache(std::shared_ptr<Cache> sim_cache,
342                                       std::shared_ptr<Cache> cache,
343                                       int num_shard_bits) {
344   if (num_shard_bits >= 20) {
345     return nullptr;  // the cache cannot be sharded into too many fine pieces
346   }
347   return std::make_shared<SimCacheImpl>(sim_cache, cache);
348 }
349 
350 }  // namespace ROCKSDB_NAMESPACE
351