diff --git a/CMakeLists.txt b/CMakeLists.txt index eda0d703b..4eea5cfff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -207,6 +207,7 @@ set(SOURCES util/filter_policy.cc util/hash.cc util/histogram.cc + util/histogram_windowing.cc util/instrumented_mutex.cc util/iostats_context.cc tools/ldb_cmd.cc diff --git a/src.mk b/src.mk index f98075028..aaca3bcdb 100644 --- a/src.mk +++ b/src.mk @@ -107,6 +107,7 @@ LIB_SOURCES = \ util/filter_policy.cc \ util/hash.cc \ util/histogram.cc \ + util/histogram_windowing.cc \ util/instrumented_mutex.cc \ util/iostats_context.cc \ utilities/backupable/backupable_db.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 65ce703f1..2bb63a1c4 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1205,7 +1205,7 @@ class Stats { uint64_t bytes_; uint64_t last_op_finish_; uint64_t last_report_finish_; - std::unordered_map, std::hash> hist_; std::string message_; bool exclude_from_merge_; @@ -1242,7 +1242,7 @@ class Stats { for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) { auto this_it = hist_.find(it->first); if (this_it != hist_.end()) { - this_it->second.Merge(other.hist_.at(it->first)); + this_it->second->Merge(*(other.hist_.at(it->first))); } else { hist_.insert({ it->first, it->second }); } @@ -1316,10 +1316,10 @@ class Stats { if (hist_.find(op_type) == hist_.end()) { - HistogramImpl hist_temp; - hist_.insert({op_type, hist_temp}); + auto hist_temp = std::make_shared(); + hist_.insert({op_type, std::move(hist_temp)}); } - hist_[op_type].Add(micros); + hist_[op_type]->Add(micros); if (micros > 20000 && !FLAGS_stats_interval) { fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, ""); @@ -1452,7 +1452,7 @@ class Stats { for (auto it = hist_.begin(); it != hist_.end(); ++it) { fprintf(stdout, "Microseconds per %s:\n%s\n", OperationTypeString[it->first].c_str(), - it->second.ToString().c_str()); + it->second->ToString().c_str()); } } if (FLAGS_report_file_operations) { diff --git a/util/histogram.cc b/util/histogram.cc index 4b5013a55..d052abb33 100644 --- a/util/histogram.cc +++ b/util/histogram.cc @@ -7,11 +7,15 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "util/histogram.h" +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif +#include #include #include #include +#include "util/histogram.h" #include "port/port.h" namespace rocksdb { @@ -73,90 +77,126 @@ namespace { const HistogramBucketMapper bucketMapper; } -void HistogramImpl::Clear() { - min_ = static_cast(bucketMapper.LastValue()); - max_ = 0; - num_ = 0; - sum_ = 0; - sum_squares_ = 0; - memset(buckets_, 0, sizeof buckets_); +HistogramStat::HistogramStat() + : num_buckets_(bucketMapper.BucketCount()) { + assert(num_buckets_ == sizeof(buckets_) / sizeof(*buckets_)); + Clear(); } -bool HistogramImpl::Empty() { return num_ == 0; } +void HistogramStat::Clear() { + min_.store(bucketMapper.LastValue(), std::memory_order_relaxed); + max_.store(0, std::memory_order_relaxed); + num_.store(0, std::memory_order_relaxed); + sum_.store(0, std::memory_order_relaxed); + sum_squares_.store(0, std::memory_order_relaxed); + for (unsigned int b = 0; b < num_buckets_; b++) { + buckets_[b].store(0, std::memory_order_relaxed); + } +}; -void HistogramImpl::Add(uint64_t value) { +bool HistogramStat::Empty() const { return num() == 0; } + +void HistogramStat::Add(uint64_t value) { + // This function is designed to be lock free, as it's in the critical path + // of any operation. Each individual value is atomic and the order of updates + // by concurrent threads is tolerable. const size_t index = bucketMapper.IndexForValue(value); - buckets_[index] += 1; - if (min_ > value) min_ = static_cast(value); - if (max_ < value) max_ = static_cast(value); - num_++; - sum_ += value; - sum_squares_ += (value * value); + assert(index < num_buckets_ && index >= 0); + buckets_[index].fetch_add(1, std::memory_order_relaxed); + + uint64_t old_min = min(); + while (value < old_min && !min_.compare_exchange_weak(old_min, value)) {} + + uint64_t old_max = max(); + while (value > old_max && !max_.compare_exchange_weak(old_max, value)) {} + + num_.fetch_add(1, std::memory_order_relaxed); + sum_.fetch_add(value, std::memory_order_relaxed); + sum_squares_.fetch_add(value * value, std::memory_order_relaxed); } -void HistogramImpl::Merge(const HistogramImpl& other) { - if (other.min_ < min_) min_ = other.min_; - if (other.max_ > max_) max_ = other.max_; - num_ += other.num_; - sum_ += other.sum_; - sum_squares_ += other.sum_squares_; - for (unsigned int b = 0; b < bucketMapper.BucketCount(); b++) { - buckets_[b] += other.buckets_[b]; +void HistogramStat::Merge(const HistogramStat& other) { + // This function needs to be performned with the outer lock acquired + // However, atomic operation on every member is still need, since Add() + // requires no lock and value update can still happen concurrently + uint64_t old_min = min(); + uint64_t other_min = other.min(); + while (other_min < old_min && + !min_.compare_exchange_weak(old_min, other_min)) {} + + uint64_t old_max = max(); + uint64_t other_max = other.max(); + while (other_max > old_max && + !max_.compare_exchange_weak(old_max, other_max)) {} + + num_.fetch_add(other.num(), std::memory_order_relaxed); + sum_.fetch_add(other.sum(), std::memory_order_relaxed); + sum_squares_.fetch_add(other.sum_squares(), std::memory_order_relaxed); + for (unsigned int b = 0; b < num_buckets_; b++) { + buckets_[b].fetch_add(other.bucket_at(b), std::memory_order_relaxed); } } -double HistogramImpl::Median() const { +double HistogramStat::Median() const { return Percentile(50.0); } -double HistogramImpl::Percentile(double p) const { - double threshold = num_ * (p / 100.0); - double sum = 0; - for (unsigned int b = 0; b < bucketMapper.BucketCount(); b++) { - sum += buckets_[b]; - if (sum >= threshold) { +double HistogramStat::Percentile(double p) const { + double threshold = num() * (p / 100.0); + uint64_t cumulative_sum = 0; + for (unsigned int b = 0; b < num_buckets_; b++) { + uint64_t bucket_value = bucket_at(b); + cumulative_sum += bucket_value; + if (cumulative_sum >= threshold) { // Scale linearly within this bucket - double left_point = - static_cast((b == 0) ? 0 : bucketMapper.BucketLimit(b-1)); - double right_point = - static_cast(bucketMapper.BucketLimit(b)); - double left_sum = sum - buckets_[b]; - double right_sum = sum; + uint64_t left_point = (b == 0) ? 0 : bucketMapper.BucketLimit(b-1); + uint64_t right_point = bucketMapper.BucketLimit(b); + uint64_t left_sum = cumulative_sum - bucket_value; + uint64_t right_sum = cumulative_sum; double pos = 0; - double right_left_diff = right_sum - left_sum; + uint64_t right_left_diff = right_sum - left_sum; if (right_left_diff != 0) { - pos = (threshold - left_sum) / (right_sum - left_sum); + pos = (threshold - left_sum) / right_left_diff; } double r = left_point + (right_point - left_point) * pos; - if (r < min_) r = min_; - if (r > max_) r = max_; + uint64_t cur_min = min(); + uint64_t cur_max = max(); + if (r < cur_min) r = static_cast(cur_min); + if (r > cur_max) r = static_cast(cur_max); return r; } } - return max_; + return static_cast(max()); } -double HistogramImpl::Average() const { - if (num_ == 0.0) return 0; - return sum_ / num_; +double HistogramStat::Average() const { + uint64_t cur_num = num(); + uint64_t cur_sum = sum(); + if (cur_num == 0) return 0; + return static_cast(cur_sum) / static_cast(cur_num); } -double HistogramImpl::StandardDeviation() const { - if (num_ == 0.0) return 0; - double variance = (sum_squares_ * num_ - sum_ * sum_) / (num_ * num_); +double HistogramStat::StandardDeviation() const { + uint64_t cur_num = num(); + uint64_t cur_sum = sum(); + uint64_t cur_sum_squares = sum_squares(); + if (cur_num == 0) return 0; + double variance = + static_cast(cur_sum_squares * cur_num - cur_sum * cur_sum) / + static_cast(cur_num * cur_num); return sqrt(variance); } - -std::string HistogramImpl::ToString() const { +std::string HistogramStat::ToString() const { + uint64_t cur_num = num(); std::string r; char buf[200]; snprintf(buf, sizeof(buf), - "Count: %.0f Average: %.4f StdDev: %.2f\n", - num_, Average(), StandardDeviation()); + "Count: %" PRIu64 " Average: %.4f StdDev: %.2f\n", + cur_num, Average(), StandardDeviation()); r.append(buf); snprintf(buf, sizeof(buf), - "Min: %.4f Median: %.4f Max: %.4f\n", - (num_ == 0.0 ? 0.0 : min_), Median(), max_); + "Min: %" PRIu64 " Median: %.4f Max: %" PRIu64 "\n", + (cur_num == 0 ? 0 : min()), Median(), (cur_num == 0 ? 0 : max())); r.append(buf); snprintf(buf, sizeof(buf), "Percentiles: " @@ -165,30 +205,30 @@ std::string HistogramImpl::ToString() const { Percentile(99.99)); r.append(buf); r.append("------------------------------------------------------\n"); - const double mult = 100.0 / num_; - double sum = 0; - for (unsigned int b = 0; b < bucketMapper.BucketCount(); b++) { - if (buckets_[b] <= 0.0) continue; - sum += buckets_[b]; + const double mult = 100.0 / cur_num; + uint64_t cumulative_sum = 0; + for (unsigned int b = 0; b < num_buckets_; b++) { + uint64_t bucket_value = bucket_at(b); + if (bucket_value <= 0.0) continue; + cumulative_sum += bucket_value; snprintf(buf, sizeof(buf), - "[ %7lu, %7lu ) %8lu %7.3f%% %7.3f%% ", - // left - (unsigned long)((b == 0) ? 0 : bucketMapper.BucketLimit(b-1)), - (unsigned long)bucketMapper.BucketLimit(b), // right - (unsigned long)buckets_[b], // count - (mult * buckets_[b]), // percentage - (mult * sum)); // cumulative percentage + "[ %7" PRIu64 ", %7" PRIu64 " ) %8" PRIu64 " %7.3f%% %7.3f%% ", + (b == 0) ? 0 : bucketMapper.BucketLimit(b-1), // left + bucketMapper.BucketLimit(b), // right + bucket_value, // count + (mult * bucket_value), // percentage + (mult * cumulative_sum)); // cumulative percentage r.append(buf); // Add hash marks based on percentage; 20 marks for 100%. - int marks = static_cast(20*(buckets_[b] / num_) + 0.5); + size_t marks = static_cast(mult * bucket_value / 5 + 0.5); r.append(marks, '#'); r.push_back('\n'); } return r; } -void HistogramImpl::Data(HistogramData * const data) const { +void HistogramStat::Data(HistogramData * const data) const { assert(data); data->median = Median(); data->percentile95 = Percentile(95); @@ -197,4 +237,52 @@ void HistogramImpl::Data(HistogramData * const data) const { data->standard_deviation = StandardDeviation(); } +void HistogramImpl::Clear() { + std::lock_guard lock(mutex_); + stats_.Clear(); +} + +bool HistogramImpl::Empty() const { + return stats_.Empty(); +} + +void HistogramImpl::Add(uint64_t value) { + stats_.Add(value); +} + +void HistogramImpl::Merge(const Histogram& other) { + if (strcmp(Name(), other.Name()) == 0) { + Merge(dynamic_cast(other)); + } +} + +void HistogramImpl::Merge(const HistogramImpl& other) { + std::lock_guard lock(mutex_); + stats_.Merge(other.stats_); +} + +double HistogramImpl::Median() const { + return stats_.Median(); +} + +double HistogramImpl::Percentile(double p) const { + return stats_.Percentile(p); +} + +double HistogramImpl::Average() const { + return stats_.Average(); +} + +double HistogramImpl::StandardDeviation() const { + return stats_.StandardDeviation(); +} + +std::string HistogramImpl::ToString() const { + return stats_.ToString(); +} + +void HistogramImpl::Data(HistogramData * const data) const { + stats_.Data(data); +} + } // namespace levedb diff --git a/util/histogram.h b/util/histogram.h index 2b6cd8bab..84c3e94fe 100644 --- a/util/histogram.h +++ b/util/histogram.h @@ -14,8 +14,7 @@ #include #include #include - -#include +#include namespace rocksdb { @@ -25,7 +24,7 @@ class HistogramBucketMapper { HistogramBucketMapper(); // converts a value to the bucket index. - size_t IndexForValue(const uint64_t value) const; + size_t IndexForValue(uint64_t value) const; // number of buckets required. size_t BucketCount() const { @@ -52,33 +51,99 @@ class HistogramBucketMapper { std::map valueIndexMap_; }; -class HistogramImpl { +struct HistogramStat { + HistogramStat(); + ~HistogramStat() {} + + HistogramStat(const HistogramStat&) = delete; + HistogramStat& operator=(const HistogramStat&) = delete; + + void Clear(); + bool Empty() const; + void Add(uint64_t value); + void Merge(const HistogramStat& other); + + inline uint64_t min() const { return min_.load(std::memory_order_relaxed); } + inline uint64_t max() const { return max_.load(std::memory_order_relaxed); } + inline uint64_t num() const { return num_.load(std::memory_order_relaxed); } + inline uint64_t sum() const { return sum_.load(std::memory_order_relaxed); } + inline uint64_t sum_squares() const { + return sum_squares_.load(std::memory_order_relaxed); + } + inline uint64_t bucket_at(size_t b) const { + return buckets_[b].load(std::memory_order_relaxed); + } + + double Median() const; + double Percentile(double p) const; + double Average() const; + double StandardDeviation() const; + void Data(HistogramData* const data) const; + std::string ToString() const; + + // To be able to use HistogramStat as thread local variable, it + // cannot have dynamic allocated member. That's why we're + // using manually values from BucketMapper + std::atomic_uint_fast64_t min_; + std::atomic_uint_fast64_t max_; + std::atomic_uint_fast64_t num_; + std::atomic_uint_fast64_t sum_; + std::atomic_uint_fast64_t sum_squares_; + std::atomic_uint_fast64_t buckets_[138]; // 138==BucketMapper::BucketCount() + const uint64_t num_buckets_; +}; + +class Histogram { +public: + Histogram() {} + virtual ~Histogram() {}; + + virtual void Clear() = 0; + virtual bool Empty() const = 0; + virtual void Add(uint64_t value) = 0; + virtual void Merge(const Histogram&) = 0; + + virtual std::string ToString() const = 0; + virtual const char* Name() const = 0; + virtual uint64_t min() const = 0; + virtual uint64_t max() const = 0; + virtual uint64_t num() const = 0; + virtual double Median() const = 0; + virtual double Percentile(double p) const = 0; + virtual double Average() const = 0; + virtual double StandardDeviation() const = 0; + virtual void Data(HistogramData* const data) const = 0; +}; + +class HistogramImpl : public Histogram { public: - HistogramImpl() { memset(buckets_, 0, sizeof(buckets_)); } - virtual void Clear(); - virtual bool Empty(); - virtual void Add(uint64_t value); + HistogramImpl() { Clear(); } + + HistogramImpl(const HistogramImpl&) = delete; + HistogramImpl& operator=(const HistogramImpl&) = delete; + + virtual void Clear() override; + virtual bool Empty() const override; + virtual void Add(uint64_t value) override; + virtual void Merge(const Histogram& other) override; void Merge(const HistogramImpl& other); - virtual std::string ToString() const; - - virtual double Median() const; - virtual double Percentile(double p) const; - virtual double Average() const; - virtual double StandardDeviation() const; - virtual void Data(HistogramData * const data) const; + virtual std::string ToString() const override; + virtual const char* Name() const override { return "HistogramImpl"; } + virtual uint64_t min() const override { return stats_.min(); } + virtual uint64_t max() const override { return stats_.max(); } + virtual uint64_t num() const override { return stats_.num(); } + virtual double Median() const override; + virtual double Percentile(double p) const override; + virtual double Average() const override; + virtual double StandardDeviation() const override; + virtual void Data(HistogramData* const data) const override; virtual ~HistogramImpl() {} private: - // To be able to use HistogramImpl as thread local variable, its constructor - // has to be static. That's why we're using manually values from BucketMapper - double min_ = 1000000000; // this is BucketMapper:LastValue() - double max_ = 0; - double num_ = 0; - double sum_ = 0; - double sum_squares_ = 0; - uint64_t buckets_[138]; // this is BucketMapper::BucketCount() + HistogramStat stats_; + std::mutex mutex_; }; -} // namespace rocksdb +} // namespace rocksdb \ No newline at end of file diff --git a/util/histogram_test.cc b/util/histogram_test.cc index b9657db06..ce363ff6b 100644 --- a/util/histogram_test.cc +++ b/util/histogram_test.cc @@ -4,56 +4,202 @@ // of patent rights can be found in the PATENTS file in the same directory. // #include "util/histogram.h" - +#include "util/histogram_windowing.h" #include "util/testharness.h" namespace rocksdb { class HistogramTest : public testing::Test {}; -TEST_F(HistogramTest, BasicOperation) { - HistogramImpl histogram; - for (uint64_t i = 1; i <= 100; i++) { - histogram.Add(i); - } - - { - double median = histogram.Median(); - // ASSERT_LE(median, 50); - ASSERT_GT(median, 0); - } - - { - double percentile100 = histogram.Percentile(100.0); - ASSERT_LE(percentile100, 100.0); - ASSERT_GT(percentile100, 0.0); - double percentile99 = histogram.Percentile(99.0); - double percentile85 = histogram.Percentile(85.0); - ASSERT_LE(percentile99, 99.0); - ASSERT_TRUE(percentile99 >= percentile85); - } - - ASSERT_EQ(histogram.Average(), 50.5); // avg is acurately calculated. +namespace { + const double kIota = 0.1; + const HistogramBucketMapper bucketMapper; + Env* env = Env::Default(); } -TEST_F(HistogramTest, EmptyHistogram) { - HistogramImpl histogram; +void PopulateHistogram(Histogram& histogram, + uint64_t low, uint64_t high, uint64_t loop = 1) { + for (; loop > 0; loop--) { + for (uint64_t i = low; i <= high; i++) { + histogram.Add(i); + } + } +} + +void BasicOperation(Histogram& histogram) { + PopulateHistogram(histogram, 1, 100, 10); + + HistogramData data; + histogram.Data(&data); + + ASSERT_LE(std::fabs(histogram.Percentile(100.0) - 100.0), kIota); + ASSERT_LE(std::fabs(data.percentile99 - 99.0), kIota); + ASSERT_LE(std::fabs(data.percentile95 - 95.0), kIota); + ASSERT_LE(std::fabs(data.median - 50.0), kIota); + ASSERT_EQ(data.average, 50.5); // avg is acurately calculated. + ASSERT_LT(std::fabs(data.standard_deviation- 28.86), kIota); //sd is ~= 28.86 +} + +void MergeHistogram(Histogram& histogram, Histogram& other) { + PopulateHistogram(histogram, 1, 100); + PopulateHistogram(other, 101, 200); + histogram.Merge(other); + + HistogramData data; + histogram.Data(&data); + + ASSERT_LE(std::fabs(histogram.Percentile(100.0) - 200.0), kIota); + ASSERT_LE(std::fabs(data.percentile99 - 198.0), kIota); + ASSERT_LE(std::fabs(data.percentile95 - 190.0), kIota); + ASSERT_LE(std::fabs(data.median - 100.0), kIota); + ASSERT_EQ(data.average, 100.5); // avg is acurately calculated. + ASSERT_LT(std::fabs(data.standard_deviation - 57.73), kIota); //sd is ~= 57.73 +} + +void EmptyHistogram(Histogram& histogram) { + ASSERT_EQ(histogram.min(), bucketMapper.LastValue()); + ASSERT_EQ(histogram.max(), 0); + ASSERT_EQ(histogram.num(), 0); ASSERT_EQ(histogram.Median(), 0.0); ASSERT_EQ(histogram.Percentile(85.0), 0.0); ASSERT_EQ(histogram.Average(), 0.0); + ASSERT_EQ(histogram.StandardDeviation(), 0.0); } -TEST_F(HistogramTest, ClearHistogram) { - HistogramImpl histogram; +void ClearHistogram(Histogram& histogram) { for (uint64_t i = 1; i <= 100; i++) { histogram.Add(i); } histogram.Clear(); + ASSERT_TRUE(histogram.Empty()); ASSERT_EQ(histogram.Median(), 0); ASSERT_EQ(histogram.Percentile(85.0), 0); ASSERT_EQ(histogram.Average(), 0); } +TEST_F(HistogramTest, BasicOperation) { + HistogramImpl histogram; + BasicOperation(histogram); + + HistogramWindowingImpl histogramWindowing; + BasicOperation(histogramWindowing); +} + +TEST_F(HistogramTest, MergeHistogram) { + HistogramImpl histogram; + HistogramImpl other; + MergeHistogram(histogram, other); + + HistogramWindowingImpl histogramWindowing; + HistogramWindowingImpl otherWindowing; + MergeHistogram(histogramWindowing, otherWindowing); +} + +TEST_F(HistogramTest, EmptyHistogram) { + HistogramImpl histogram; + EmptyHistogram(histogram); + + HistogramWindowingImpl histogramWindowing; + EmptyHistogram(histogramWindowing); +} + +TEST_F(HistogramTest, ClearHistogram) { + HistogramImpl histogram; + ClearHistogram(histogram); + + HistogramWindowingImpl histogramWindowing; + ClearHistogram(histogramWindowing); +} + +TEST_F(HistogramTest, HistogramWindowingExpire) { + uint64_t num_windows = 3; + int micros_per_window = 1000000; + uint64_t min_num_per_window = 0; + + HistogramWindowingImpl + histogramWindowing(num_windows, micros_per_window, min_num_per_window); + + PopulateHistogram(histogramWindowing, 1, 1, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 100); + ASSERT_EQ(histogramWindowing.min(), 1); + ASSERT_EQ(histogramWindowing.max(), 1); + ASSERT_EQ(histogramWindowing.Average(), 1); + + PopulateHistogram(histogramWindowing, 2, 2, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 200); + ASSERT_EQ(histogramWindowing.min(), 1); + ASSERT_EQ(histogramWindowing.max(), 2); + ASSERT_EQ(histogramWindowing.Average(), 1.5); + + PopulateHistogram(histogramWindowing, 3, 3, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 300); + ASSERT_EQ(histogramWindowing.min(), 1); + ASSERT_EQ(histogramWindowing.max(), 3); + ASSERT_EQ(histogramWindowing.Average(), 2.0); + + // dropping oldest window with value 1, remaining 2 ~ 4 + PopulateHistogram(histogramWindowing, 4, 4, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 300); + ASSERT_EQ(histogramWindowing.min(), 2); + ASSERT_EQ(histogramWindowing.max(), 4); + ASSERT_EQ(histogramWindowing.Average(), 3.0); + + // dropping oldest window with value 2, remaining 3 ~ 5 + PopulateHistogram(histogramWindowing, 5, 5, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 300); + ASSERT_EQ(histogramWindowing.min(), 3); + ASSERT_EQ(histogramWindowing.max(), 5); + ASSERT_EQ(histogramWindowing.Average(), 4.0); +} + +TEST_F(HistogramTest, HistogramWindowingMerge) { + uint64_t num_windows = 3; + int micros_per_window = 1000000; + uint64_t min_num_per_window = 0; + + HistogramWindowingImpl + histogramWindowing(num_windows, micros_per_window, min_num_per_window); + HistogramWindowingImpl + otherWindowing(num_windows, micros_per_window, min_num_per_window); + + PopulateHistogram(histogramWindowing, 1, 1, 100); + PopulateHistogram(otherWindowing, 1, 1, 100); + env->SleepForMicroseconds(micros_per_window); + + PopulateHistogram(histogramWindowing, 2, 2, 100); + PopulateHistogram(otherWindowing, 2, 2, 100); + env->SleepForMicroseconds(micros_per_window); + + PopulateHistogram(histogramWindowing, 3, 3, 100); + PopulateHistogram(otherWindowing, 3, 3, 100); + env->SleepForMicroseconds(micros_per_window); + + histogramWindowing.Merge(otherWindowing); + ASSERT_EQ(histogramWindowing.num(), 600); + ASSERT_EQ(histogramWindowing.min(), 1); + ASSERT_EQ(histogramWindowing.max(), 3); + ASSERT_EQ(histogramWindowing.Average(), 2.0); + + // dropping oldest window with value 1, remaining 2 ~ 4 + PopulateHistogram(histogramWindowing, 4, 4, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 500); + ASSERT_EQ(histogramWindowing.min(), 2); + ASSERT_EQ(histogramWindowing.max(), 4); + + // dropping oldest window with value 2, remaining 3 ~ 5 + PopulateHistogram(histogramWindowing, 5, 5, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 400); + ASSERT_EQ(histogramWindowing.min(), 3); + ASSERT_EQ(histogramWindowing.max(), 5); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/util/histogram_windowing.cc b/util/histogram_windowing.cc new file mode 100644 index 000000000..091338558 --- /dev/null +++ b/util/histogram_windowing.cc @@ -0,0 +1,193 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/histogram.h" +#include "util/histogram_windowing.h" + +#include + +namespace rocksdb { + +namespace { + const HistogramBucketMapper bucketMapper; +} + +HistogramWindowingImpl::HistogramWindowingImpl() { + env_ = Env::Default(); + window_stats_.reset(new HistogramStat[num_windows_]); + Clear(); +} + +HistogramWindowingImpl::HistogramWindowingImpl( + uint64_t num_windows, + uint64_t micros_per_window, + uint64_t min_num_per_window) : + num_windows_(num_windows), + micros_per_window_(micros_per_window), + min_num_per_window_(min_num_per_window) { + env_ = Env::Default(); + window_stats_.reset(new HistogramStat[num_windows_]); + Clear(); +} + +HistogramWindowingImpl::~HistogramWindowingImpl(){ + window_stats_.release(); +} + +void HistogramWindowingImpl::Clear() { + std::lock_guard lock(mutex_); + + stats_.Clear(); + for (size_t i = 0; i < num_windows_; i++) { + window_stats_[i].Clear(); + } + current_window_.store(0, std::memory_order_relaxed); + last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed); +} + +bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); } + +// This function is designed to be lock free, as it's in the critical path +// of any operation. +// Each individual value is atomic, it is just that some samples can go +// in the older bucket which is tolerable. +void HistogramWindowingImpl::Add(uint64_t value){ + TimerTick(); + + // Parent (global) member update + stats_.Add(value); + + // Current window update + window_stats_[current_window()].Add(value); +} + +void HistogramWindowingImpl::Merge(const Histogram& other) { + if (strcmp(Name(), other.Name()) == 0) { + Merge(dynamic_cast(other)); + } +} + +void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) { + std::lock_guard lock(mutex_); + stats_.Merge(other.stats_); + + if (stats_.num_buckets_ != other.stats_.num_buckets_ || + micros_per_window_ != other.micros_per_window_) { + return; + } + + uint64_t cur_window = current_window(); + uint64_t other_cur_window = other.current_window(); + // going backwards for alignment + for (unsigned int i = 0; + i < std::min(num_windows_, other.num_windows_); i++) { + uint64_t window_index = + (cur_window + num_windows_ - i) % num_windows_; + uint64_t other_window_index = + (other_cur_window + other.num_windows_ - i) % other.num_windows_; + + window_stats_[window_index].Merge(other.window_stats_[other_window_index]); + } +} + +std::string HistogramWindowingImpl::ToString() const { + return stats_.ToString(); +} + +double HistogramWindowingImpl::Median() const { + return Percentile(50.0); +} + +double HistogramWindowingImpl::Percentile(double p) const { + // Retry 3 times in total + for (int retry = 0; retry < 3; retry++) { + uint64_t start_num = stats_.num(); + double result = stats_.Percentile(p); + // Detect if swap buckets or Clear() was called during calculation + if (stats_.num() >= start_num) { + return result; + } + } + return 0.0; +} + +double HistogramWindowingImpl::Average() const { + return stats_.Average(); +} + +double HistogramWindowingImpl::StandardDeviation() const { + return stats_.StandardDeviation(); +} + +void HistogramWindowingImpl::Data(HistogramData * const data) const { + stats_.Data(data); +} + +void HistogramWindowingImpl::TimerTick() { + uint64_t curr_time = env_->NowMicros(); + if (curr_time - last_swap_time() > micros_per_window_ && + window_stats_[current_window()].num() >= min_num_per_window_) { + SwapHistoryBucket(); + } +} + +void HistogramWindowingImpl::SwapHistoryBucket() { + // Threads executing Add() would be competing for this mutex, the first one + // who got the metex would take care of the bucket swap, other threads + // can skip this. + // If mutex is held by Merge() or Clear(), next Add() will take care of the + // swap, if needed. + if (mutex_.try_lock()) { + last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed); + + uint64_t next_window = (current_window() + 1) % num_windows_; + + // subtract next buckets from totals and swap to next buckets + HistogramStat& stats_to_drop = window_stats_[next_window]; + + if (!stats_to_drop.Empty()) { + for (size_t b = 0; b < stats_.num_buckets_; b++){ + stats_.buckets_[b].fetch_sub( + stats_to_drop.bucket_at(b), std::memory_order_relaxed); + } + + if (stats_.min() == stats_to_drop.min()) { + uint64_t new_min = bucketMapper.LastValue(); + for (unsigned int i = 1; i < num_windows_; i++) { + uint64_t m = window_stats_[(next_window + i) % num_windows_].min(); + if (m < new_min) new_min = m; + } + stats_.min_.store(new_min, std::memory_order_relaxed); + } + + if (stats_.max() == stats_to_drop.max()) { + uint64_t new_max = 0; + for (unsigned int i = 1; i < num_windows_; i++) { + uint64_t m = window_stats_[(next_window + i) % num_windows_].max(); + if (m > new_max) new_max = m; + } + stats_.max_.store(new_max, std::memory_order_relaxed); + } + + stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed); + stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed); + stats_.sum_squares_.fetch_sub( + stats_to_drop.sum_squares(), std::memory_order_relaxed); + + stats_to_drop.Clear(); + } + + // advance to next window bucket + current_window_.store(next_window, std::memory_order_relaxed); + + mutex_.unlock(); + } +} + +} // namespace rocksdb diff --git a/util/histogram_windowing.h b/util/histogram_windowing.h new file mode 100644 index 000000000..cdcf1ba8a --- /dev/null +++ b/util/histogram_windowing.h @@ -0,0 +1,80 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "util/histogram.h" +#include "rocksdb/env.h" + +namespace rocksdb { + +class HistogramWindowingImpl : public Histogram +{ +public: + HistogramWindowingImpl(); + HistogramWindowingImpl(uint64_t num_windows, + uint64_t micros_per_window, + uint64_t min_num_per_window); + + HistogramWindowingImpl(const HistogramImpl&) = delete; + HistogramWindowingImpl& operator=(const HistogramImpl&) = delete; + + ~HistogramWindowingImpl(); + + virtual void Clear() override; + virtual bool Empty() const override; + virtual void Add(uint64_t value) override; + virtual void Merge(const Histogram& other) override; + void Merge(const HistogramWindowingImpl& other); + + virtual std::string ToString() const override; + virtual const char* Name() const override { return "HistogramWindowingImpl"; } + virtual uint64_t min() const override { return stats_.min(); } + virtual uint64_t max() const override { return stats_.max(); } + virtual uint64_t num() const override { return stats_.num(); } + virtual double Median() const override; + virtual double Percentile(double p) const override; + virtual double Average() const override; + virtual double StandardDeviation() const override; + virtual void Data(HistogramData* const data) const override; + +private: + void TimerTick(); + void SwapHistoryBucket(); + inline uint64_t current_window() const { + return current_window_.load(std::memory_order_relaxed); + } + inline uint64_t last_swap_time() const{ + return last_swap_time_.load(std::memory_order_relaxed); + } + + Env* env_; + std::mutex mutex_; + + // Aggregated stats over windows_stats_, all the computation is done + // upon aggregated values + HistogramStat stats_; + + // This is a circular array representing the latest N time-windows. + // Each entry stores a time-window of data. Expiration is done + // on window-based. + std::unique_ptr window_stats_; + + std::atomic_uint_fast64_t current_window_; + std::atomic_uint_fast64_t last_swap_time_; + + // Following parameters are configuable + uint64_t num_windows_ = 5; + uint64_t micros_per_window_ = 60000000; + // By default, don't care about the number of values in current window + // when decide whether to swap windows or not. + uint64_t min_num_per_window_ = 0; +}; + +} // namespace rocksdb \ No newline at end of file