1 //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "monitoring/histogram_windowing.h"
11 #include "monitoring/histogram.h"
12 #include "util/cast_util.h"
13 
14 #include <algorithm>
15 
16 namespace ROCKSDB_NAMESPACE {
17 
HistogramWindowingImpl()18 HistogramWindowingImpl::HistogramWindowingImpl() {
19   env_ = Env::Default();
20   window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
21   Clear();
22 }
23 
HistogramWindowingImpl(uint64_t num_windows,uint64_t micros_per_window,uint64_t min_num_per_window)24 HistogramWindowingImpl::HistogramWindowingImpl(
25     uint64_t num_windows,
26     uint64_t micros_per_window,
27     uint64_t min_num_per_window) :
28       num_windows_(num_windows),
29       micros_per_window_(micros_per_window),
30       min_num_per_window_(min_num_per_window) {
31   env_ = Env::Default();
32   window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
33   Clear();
34 }
35 
~HistogramWindowingImpl()36 HistogramWindowingImpl::~HistogramWindowingImpl() {
37 }
38 
Clear()39 void HistogramWindowingImpl::Clear() {
40   std::lock_guard<std::mutex> lock(mutex_);
41 
42   stats_.Clear();
43   for (size_t i = 0; i < num_windows_; i++) {
44     window_stats_[i].Clear();
45   }
46   current_window_.store(0, std::memory_order_relaxed);
47   last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
48 }
49 
Empty() const50 bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }
51 
52 // This function is designed to be lock free, as it's in the critical path
53 // of any operation.
54 // Each individual value is atomic, it is just that some samples can go
55 // in the older bucket which is tolerable.
Add(uint64_t value)56 void HistogramWindowingImpl::Add(uint64_t value){
57   TimerTick();
58 
59   // Parent (global) member update
60   stats_.Add(value);
61 
62   // Current window update
63   window_stats_[static_cast<size_t>(current_window())].Add(value);
64 }
65 
Merge(const Histogram & other)66 void HistogramWindowingImpl::Merge(const Histogram& other) {
67   if (strcmp(Name(), other.Name()) == 0) {
68     Merge(
69         *static_cast_with_check<const HistogramWindowingImpl, const Histogram>(
70             &other));
71   }
72 }
73 
Merge(const HistogramWindowingImpl & other)74 void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
75   std::lock_guard<std::mutex> lock(mutex_);
76   stats_.Merge(other.stats_);
77 
78   if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
79       micros_per_window_ != other.micros_per_window_) {
80     return;
81   }
82 
83   uint64_t cur_window = current_window();
84   uint64_t other_cur_window = other.current_window();
85   // going backwards for alignment
86   for (unsigned int i = 0;
87                     i < std::min(num_windows_, other.num_windows_); i++) {
88     uint64_t window_index =
89         (cur_window + num_windows_ - i) % num_windows_;
90     uint64_t other_window_index =
91         (other_cur_window + other.num_windows_ - i) % other.num_windows_;
92     size_t windex = static_cast<size_t>(window_index);
93     size_t other_windex = static_cast<size_t>(other_window_index);
94 
95     window_stats_[windex].Merge(
96       other.window_stats_[other_windex]);
97   }
98 }
99 
ToString() const100 std::string HistogramWindowingImpl::ToString() const {
101   return stats_.ToString();
102 }
103 
Median() const104 double HistogramWindowingImpl::Median() const {
105   return Percentile(50.0);
106 }
107 
Percentile(double p) const108 double HistogramWindowingImpl::Percentile(double p) const {
109   // Retry 3 times in total
110   for (int retry = 0; retry < 3; retry++) {
111     uint64_t start_num = stats_.num();
112     double result = stats_.Percentile(p);
113     // Detect if swap buckets or Clear() was called during calculation
114     if (stats_.num() >= start_num) {
115       return result;
116     }
117   }
118   return 0.0;
119 }
120 
Average() const121 double HistogramWindowingImpl::Average() const {
122   return stats_.Average();
123 }
124 
StandardDeviation() const125 double HistogramWindowingImpl::StandardDeviation() const {
126   return stats_.StandardDeviation();
127 }
128 
Data(HistogramData * const data) const129 void HistogramWindowingImpl::Data(HistogramData * const data) const {
130   stats_.Data(data);
131 }
132 
TimerTick()133 void HistogramWindowingImpl::TimerTick() {
134   uint64_t curr_time = env_->NowMicros();
135   size_t curr_window_ = static_cast<size_t>(current_window());
136   if (curr_time - last_swap_time() > micros_per_window_ &&
137       window_stats_[curr_window_].num() >= min_num_per_window_) {
138     SwapHistoryBucket();
139   }
140 }
141 
SwapHistoryBucket()142 void HistogramWindowingImpl::SwapHistoryBucket() {
143   // Threads executing Add() would be competing for this mutex, the first one
144   // who got the metex would take care of the bucket swap, other threads
145   // can skip this.
146   // If mutex is held by Merge() or Clear(), next Add() will take care of the
147   // swap, if needed.
148   if (mutex_.try_lock()) {
149     last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
150 
151     uint64_t curr_window = current_window();
152     uint64_t next_window = (curr_window == num_windows_ - 1) ?
153                                                     0 : curr_window + 1;
154 
155     // subtract next buckets from totals and swap to next buckets
156     HistogramStat& stats_to_drop =
157       window_stats_[static_cast<size_t>(next_window)];
158 
159     if (!stats_to_drop.Empty()) {
160       for (size_t b = 0; b < stats_.num_buckets_; b++){
161         stats_.buckets_[b].fetch_sub(
162             stats_to_drop.bucket_at(b), std::memory_order_relaxed);
163       }
164 
165       if (stats_.min() == stats_to_drop.min()) {
166         uint64_t new_min = std::numeric_limits<uint64_t>::max();
167         for (unsigned int i = 0; i < num_windows_; i++) {
168           if (i != next_window) {
169             uint64_t m = window_stats_[i].min();
170             if (m < new_min) new_min = m;
171           }
172         }
173         stats_.min_.store(new_min, std::memory_order_relaxed);
174       }
175 
176       if (stats_.max() == stats_to_drop.max()) {
177         uint64_t new_max = 0;
178         for (unsigned int i = 0; i < num_windows_; i++) {
179           if (i != next_window) {
180             uint64_t m = window_stats_[i].max();
181             if (m > new_max) new_max = m;
182           }
183         }
184         stats_.max_.store(new_max, std::memory_order_relaxed);
185       }
186 
187       stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
188       stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
189       stats_.sum_squares_.fetch_sub(
190                   stats_to_drop.sum_squares(), std::memory_order_relaxed);
191 
192       stats_to_drop.Clear();
193     }
194 
195     // advance to next window bucket
196     current_window_.store(next_window, std::memory_order_relaxed);
197 
198     mutex_.unlock();
199   }
200 }
201 
202 }  // namespace ROCKSDB_NAMESPACE
203