From fe10200ddc91d4eef3cca5dd335282c848c31ff4 Mon Sep 17 00:00:00 2001 From: Abhishek Kona Date: Fri, 15 Feb 2013 11:53:17 -0800 Subject: [PATCH] Introduce histogram in statistics.h Summary: * Introduce is histogram in statistics.h * stop watch to measure time. * introduce two timers as a poc. Replaced NULL with nullptr to fight some lint errors Should be useful for google. Test Plan: ran db_bench and check stats. make all check Reviewers: dhruba, heyongqiang Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D8637 --- db/db_bench.cc | 69 +++++++++------- db/db_impl.cc | 149 ++++++++++++++++++----------------- db/db_statistics.h | 43 ++++++++-- include/leveldb/statistics.h | 54 ++++++++++++- tools/db_stress.cc | 18 ++--- util/histogram.cc | 34 +++++--- util/histogram.h | 27 ++++--- util/histogram_test.cc | 6 +- util/stop_watch.h | 68 ++++++++++++++++ 9 files changed, 322 insertions(+), 146 deletions(-) create mode 100644 util/stop_watch.h diff --git a/db/db_bench.cc b/db/db_bench.cc index 601d3b54e..f2361fb36 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include #include #include @@ -123,7 +124,7 @@ static int FLAGS_bloom_bits = -1; static bool FLAGS_use_existing_db = false; // Use the db with the following name. -static const char* FLAGS_db = NULL; +static const char* FLAGS_db = nullptr; // Number of shards for the block cache is 2 ** FLAGS_cache_numshardbits. // Negative means use default settings. This is applied only @@ -135,7 +136,7 @@ static bool FLAGS_verify_checksum = false; // Database statistics static bool FLAGS_statistics = false; -static class leveldb::DBStatistics* dbstats = NULL; +static class leveldb::DBStatistics* dbstats = nullptr; // Number of write operations to do. If negative, do FLAGS_num reads. static long FLAGS_writes = -1; @@ -304,7 +305,7 @@ class Stats { int64_t bytes_; double last_op_finish_; double last_report_finish_; - Histogram hist_; + HistogramImpl hist_; std::string message_; public: @@ -531,7 +532,7 @@ class Benchmark { const int len = FLAGS_block_size; char* text = (char*) malloc(len+1); bool result = true; - const char* name = NULL; + const char* name = nullptr; std::string compressed; memset(text, (int) 'y', len); @@ -573,18 +574,18 @@ class Benchmark { kMajorVersion, kMinorVersion); #if defined(__linux) - time_t now = time(NULL); + time_t now = time(nullptr); fprintf(stderr, "Date: %s", ctime(&now)); // ctime() adds newline FILE* cpuinfo = fopen("/proc/cpuinfo", "r"); - if (cpuinfo != NULL) { + if (cpuinfo != nullptr) { char line[1000]; int num_cpus = 0; std::string cpu_type; std::string cache_size; - while (fgets(line, sizeof(line), cpuinfo) != NULL) { + while (fgets(line, sizeof(line), cpuinfo) != nullptr) { const char* sep = strchr(line, ':'); - if (sep == NULL) { + if (sep == nullptr) { continue; } Slice key = TrimSpace(Slice(line, sep - 1 - line)); @@ -603,13 +604,21 @@ class Benchmark { #endif } + void PrintHistogram(Histograms histogram_type, std::string name) { + HistogramData histogramData; + dbstats->histogramData(histogram_type, &histogramData); + fprintf(stdout, "%s statistics : \n", name.c_str()); + fprintf(stdout, "Median : %f\n",histogramData.median); + fprintf(stdout, "99ile : %f\n", histogramData.percentile99); + } + void PrintStatistics() { if (FLAGS_statistics) { fprintf(stdout, "File opened:%ld closed:%ld errors:%ld\n" "Block Cache Hit Count:%ld Block Cache Miss Count:%ld\n" "Bloom Filter Useful: %ld \n" "Compaction key_drop_newer_entry: %ld key_drop_obsolete: %ld " - "Compaction key_drop_user: %ld", + "Compaction key_drop_user: %ld\n", dbstats->getNumFileOpens(), dbstats->getNumFileCloses(), dbstats->getNumFileErrors(), @@ -619,6 +628,8 @@ class Benchmark { dbstats->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), dbstats->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE), dbstats->getTickerCount(COMPACTION_KEY_DROP_USER)); + PrintHistogram(DB_GET, "DB_GET"); + PrintHistogram(DB_WRITE, "DB_WRITE"); } } @@ -627,11 +638,11 @@ class Benchmark { : cache_(FLAGS_cache_size >= 0 ? (FLAGS_cache_numshardbits >= 1 ? NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits) : - NewLRUCache(FLAGS_cache_size)) : NULL), + NewLRUCache(FLAGS_cache_size)) : nullptr), filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) - : NULL), - db_(NULL), + : nullptr), + db_(nullptr), num_(FLAGS_num), value_size_(FLAGS_value_size), entries_per_batch_(1), @@ -663,12 +674,12 @@ class Benchmark { Open(); const char* benchmarks = FLAGS_benchmarks; - while (benchmarks != NULL) { + while (benchmarks != nullptr) { const char* sep = strchr(benchmarks, ','); Slice name; - if (sep == NULL) { + if (sep == nullptr) { name = benchmarks; - benchmarks = NULL; + benchmarks = nullptr; } else { name = Slice(benchmarks, sep - benchmarks); benchmarks = sep + 1; @@ -687,7 +698,7 @@ class Benchmark { write_options_.disableWAL = FLAGS_disable_wal; - void (Benchmark::*method)(ThreadState*) = NULL; + void (Benchmark::*method)(ThreadState*) = nullptr; bool fresh_db = false; int num_threads = FLAGS_threads; @@ -764,16 +775,16 @@ class Benchmark { if (FLAGS_use_existing_db) { fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n", name.ToString().c_str()); - method = NULL; + method = nullptr; } else { delete db_; - db_ = NULL; + db_ = nullptr; DestroyDB(FLAGS_db, Options()); Open(); } } - if (method != NULL) { + if (method != nullptr) { RunBenchmark(num_threads, name, method); } } @@ -866,7 +877,7 @@ class Benchmark { uint32_t crc = 0; while (bytes < 500 * 1048576) { crc = crc32c::Value(data.data(), size); - thread->stats.FinishedSingleOp(NULL); + thread->stats.FinishedSingleOp(nullptr); bytes += size; } // Print so result is not dead @@ -880,16 +891,16 @@ class Benchmark { int dummy; port::AtomicPointer ap(&dummy); int count = 0; - void *ptr = NULL; + void *ptr = nullptr; thread->stats.AddMessage("(each op is 1000 loads)"); while (count < 100000) { for (int i = 0; i < 1000; i++) { ptr = ap.Acquire_Load(); } count++; - thread->stats.FinishedSingleOp(NULL); + thread->stats.FinishedSingleOp(nullptr); } - if (ptr == NULL) exit(1); // Disable unused variable warning. + if (ptr == nullptr) exit(1); // Disable unused variable warning. } void SnappyCompress(ThreadState* thread) { @@ -904,7 +915,7 @@ class Benchmark { input.size(), &compressed); produced += compressed.size(); bytes += input.size(); - thread->stats.FinishedSingleOp(NULL); + thread->stats.FinishedSingleOp(nullptr); } if (!ok) { @@ -930,7 +941,7 @@ class Benchmark { ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), uncompressed); bytes += input.size(); - thread->stats.FinishedSingleOp(NULL); + thread->stats.FinishedSingleOp(nullptr); } delete[] uncompressed; @@ -942,11 +953,11 @@ class Benchmark { } void Open() { - assert(db_ == NULL); + assert(db_ == nullptr); Options options; options.create_if_missing = !FLAGS_use_existing_db; options.block_cache = cache_; - if (cache_ == NULL) { + if (cache_ == nullptr) { options.no_block_cache = true; } options.write_buffer_size = FLAGS_write_buffer_size; @@ -1257,7 +1268,7 @@ class Benchmark { } void Compact(ThreadState* thread) { - db_->CompactRange(NULL, NULL); + db_->CompactRange(nullptr, nullptr); } void PrintStats(const char* key) { @@ -1479,7 +1490,7 @@ int main(int argc, char** argv) { FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); // Choose a location for the test database if none given with --db= - if (FLAGS_db == NULL) { + if (FLAGS_db == nullptr) { leveldb::Env::Default()->GetTestDirectory(&default_db_path); default_db_path += "/dbbench"; FLAGS_db = default_db_path.c_str(); diff --git a/db/db_impl.cc b/db/db_impl.cc index 8fb00fd55..4e6853e3e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -34,11 +34,12 @@ #include "table/block.h" #include "table/merger.h" #include "table/two_level_iterator.h" +#include "util/auto_roll_logger.h" +#include "util/build_version.h" #include "util/coding.h" #include "util/logging.h" #include "util/mutexlock.h" -#include "util/build_version.h" -#include "util/auto_roll_logger.h" +#include "util/stop_watch.h" namespace leveldb { @@ -116,19 +117,19 @@ Options SanitizeOptions(const std::string& dbname, const Options& src) { Options result = src; result.comparator = icmp; - result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL; + result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr; ClipToRange(&result.max_open_files, 20, 50000); ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); ClipToRange(&result.block_size, 1<<10, 4<<20); - if (result.info_log == NULL) { + if (result.info_log == nullptr) { Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env, result, &result.info_log); if (!s.ok()) { // No place suitable for logging - result.info_log = NULL; + result.info_log = nullptr; } } - if (result.block_cache == NULL && !result.no_block_cache) { + if (result.block_cache == nullptr && !result.no_block_cache) { result.block_cache = NewLRUCache(8 << 20); } result.compression_per_level = src.compression_per_level; @@ -143,16 +144,16 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) dbname, &internal_comparator_, &internal_filter_policy_, options)), internal_filter_policy_(options.filter_policy), owns_info_log_(options_.info_log != options.info_log), - db_lock_(NULL), - shutting_down_(NULL), + db_lock_(nullptr), + shutting_down_(nullptr), bg_cv_(&mutex_), mem_(new MemTable(internal_comparator_, NumberLevels())), logfile_number_(0), tmp_batch_(new WriteBatch), bg_compaction_scheduled_(0), bg_logstats_scheduled_(false), - manual_compaction_(NULL), - logger_(NULL), + manual_compaction_(nullptr), + logger_(nullptr), disable_delete_obsolete_files_(false), delete_obsolete_files_last_run_(0), stall_level0_slowdown_(0), @@ -198,17 +199,17 @@ DBImpl::~DBImpl() { FlushMemTable(FlushOptions()); } mutex_.Lock(); - shutting_down_.Release_Store(this); // Any non-NULL value is ok + shutting_down_.Release_Store(this); // Any non-nullptr value is ok while (bg_compaction_scheduled_ || bg_logstats_scheduled_) { bg_cv_.Wait(); } mutex_.Unlock(); - if (db_lock_ != NULL) { + if (db_lock_ != nullptr) { env_->UnlockFile(db_lock_); } - if (mem_ != NULL) mem_->Unref(); + if (mem_ != nullptr) mem_->Unref(); imm_.UnrefAll(); delete tmp_batch_; delete[] stats_; @@ -233,7 +234,7 @@ void DBImpl::TEST_Destroy_DBImpl() { mutex_.Unlock(); // force release the lock file. - if (db_lock_ != NULL) { + if (db_lock_ != nullptr) { env_->UnlockFile(db_lock_); } @@ -466,7 +467,7 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, bool error_if_log_file_exist) { mutex_.AssertHeld(); - assert(db_lock_ == NULL); + assert(db_lock_ == nullptr); if (!external_table) { // We call CreateDirIfMissing() as the directory may already exist (if we // are reopening a DB), when this happens we don't want creating the @@ -567,12 +568,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, Env* env; Logger* info_log; const char* fname; - Status* status; // NULL if options_.paranoid_checks==false + Status* status; // nullptr if options_.paranoid_checks==false virtual void Corruption(size_t bytes, const Status& s) { Log(info_log, "%s%s: dropping %d bytes; %s", - (this->status == NULL ? "(ignoring error) " : ""), + (this->status == nullptr ? "(ignoring error) " : ""), fname, static_cast(bytes), s.ToString().c_str()); - if (this->status != NULL && this->status->ok()) *this->status = s; + if (this->status != nullptr && this->status->ok()) *this->status = s; } }; @@ -592,7 +593,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, reporter.env = env_; reporter.info_log = options_.info_log.get(); reporter.fname = fname.c_str(); - reporter.status = (options_.paranoid_checks ? &status : NULL); + reporter.status = (options_.paranoid_checks ? &status : nullptr); // We intentially make log::Reader do checksumming even if // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly @@ -606,7 +607,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = NULL; + MemTable* mem = nullptr; if (external_table) { mem = external_table; } @@ -619,7 +620,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, } WriteBatchInternal::SetContents(&batch, record); - if (mem == NULL) { + if (mem == nullptr) { mem = new MemTable(internal_comparator_, NumberLevels()); mem->Ref(); } @@ -644,17 +645,17 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, break; } mem->Unref(); - mem = NULL; + mem = nullptr; } } - if (status.ok() && mem != NULL && !external_table) { + if (status.ok() && mem != nullptr && !external_table) { status = WriteLevel0TableForRecovery(mem, edit); // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. } - if (mem != NULL && !external_table) mem->Unref(); + if (mem != nullptr && !external_table) mem->Unref(); return status; } @@ -750,7 +751,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for // that key range. - if (base != NULL && options_.max_background_compactions <= 1) { + if (base != nullptr && options_.max_background_compactions <= 1) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, @@ -778,7 +779,7 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { // This will release and re-acquire the mutex. uint64_t file_number; MemTable* m = imm_.PickMemtableToFlush(); - if (m == NULL) { + if (m == nullptr) { Log(options_.info_log, "Nothing in memstore to flush"); Status s = Status::IOError("Nothing in memstore to flush"); return s; @@ -891,8 +892,8 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, Status DBImpl::FindProbableWALFiles(std::vector* const allLogs, std::vector* const result, const SequenceNumber target) { - assert(allLogs != NULL); - assert(result != NULL); + assert(allLogs != nullptr); + assert(result != nullptr); std::sort(allLogs->begin(), allLogs->end()); long start = 0; // signed to avoid overflow when target is < first file. @@ -951,12 +952,12 @@ Status DBImpl::ReadFirstLine(const std::string& fname, Env* env; Logger* info_log; const char* fname; - Status* status; // NULL if options_.paranoid_checks==false + Status* status; // nullptr if options_.paranoid_checks==false virtual void Corruption(size_t bytes, const Status& s) { Log(info_log, "%s%s: dropping %d bytes; %s", - (this->status == NULL ? "(ignoring error) " : ""), + (this->status == nullptr ? "(ignoring error) " : ""), fname, static_cast(bytes), s.ToString().c_str()); - if (this->status != NULL && this->status->ok()) *this->status = s; + if (this->status != nullptr && this->status->ok()) *this->status = s; } }; @@ -972,7 +973,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname, reporter.env = env_; reporter.info_log = options_.info_log.get(); reporter.fname = fname.c_str(); - reporter.status = (options_.paranoid_checks ? &status : NULL); + reporter.status = (options_.paranoid_checks ? &status : nullptr); log::Reader reader(std::move(file), &reporter, true/*checksum*/, 0/*initial_offset*/); std::string scratch; @@ -993,7 +994,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname, Status DBImpl::ListAllWALFiles(const std::string& path, std::vector* const logFiles, WalFileType logType) { - assert(logFiles != NULL); + assert(logFiles != nullptr); std::vector allFiles; const Status status = env_->GetChildren(path, &allFiles); if (!status.ok()) { @@ -1020,14 +1021,14 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { manual.level = level; manual.done = false; manual.in_progress = false; - if (begin == NULL) { - manual.begin = NULL; + if (begin == nullptr) { + manual.begin = nullptr; } else { begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); manual.begin = &begin_storage; } - if (end == NULL) { - manual.end = NULL; + if (end == nullptr) { + manual.end = nullptr; } else { end_storage = InternalKey(*end, 0, static_cast(0)); manual.end = &end_storage; @@ -1053,7 +1054,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { Log(options_.info_log, "Manual compaction starting"); while (!manual.done) { - while (manual_compaction_ != NULL) { + while (manual_compaction_ != nullptr) { bg_cv_.Wait(); } manual_compaction_ = &manual; @@ -1077,8 +1078,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { } Status DBImpl::FlushMemTable(const FlushOptions& options) { - // NULL batch means just wait for earlier writes to be done - Status s = Write(WriteOptions(), NULL); + // nullptr batch means just wait for earlier writes to be done + Status s = Write(WriteOptions(), nullptr); if (s.ok() && options.wait) { // Wait until the compaction completes s = WaitForCompactMemTable(); @@ -1123,7 +1124,7 @@ void DBImpl::MaybeScheduleCompaction() { } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else if (!imm_.IsFlushPending() && - manual_compaction_ == NULL && + manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // No work to be done } else { @@ -1195,7 +1196,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } unique_ptr c; - bool is_manual = (manual_compaction_ != NULL) && + bool is_manual = (manual_compaction_ != nullptr) && (manual_compaction_->in_progress == false); InternalKey manual_end; if (is_manual) { @@ -1274,19 +1275,19 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, m->begin = &m->tmp_storage; } m->in_progress = false; // not being processed anymore - manual_compaction_ = NULL; + manual_compaction_ = nullptr; } return status; } void DBImpl::CleanupCompaction(CompactionState* compact) { mutex_.AssertHeld(); - if (compact->builder != NULL) { + if (compact->builder != nullptr) { // May happen if we get a shutdown call in the middle of compaction compact->builder->Abandon(); compact->builder.reset(); } else { - assert(compact->outfile == NULL); + assert(compact->outfile == nullptr); } for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; @@ -1300,8 +1301,8 @@ void DBImpl::CleanupCompaction(CompactionState* compact) { // Insert them into pending_outputs so that they do not get deleted. void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) { mutex_.AssertHeld(); - assert(compact != NULL); - assert(compact->builder == NULL); + assert(compact != nullptr); + assert(compact->builder == nullptr); int filesNeeded = compact->compaction->num_input_files(1); for (int i = 0; i < filesNeeded; i++) { uint64_t file_number = versions_->NewFileNumber(); @@ -1323,8 +1324,8 @@ void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) { } Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { - assert(compact != NULL); - assert(compact->builder == NULL); + assert(compact != nullptr); + assert(compact->builder == nullptr); uint64_t file_number; // If we have not yet exhausted the pre-allocated file numbers, // then use the one from the front. Otherwise, we have to acquire @@ -1362,9 +1363,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, Iterator* input) { - assert(compact != NULL); + assert(compact != nullptr); assert(compact->outfile); - assert(compact->builder != NULL); + assert(compact->builder != nullptr); const uint64_t output_number = compact->current_output()->number; assert(output_number != 0); @@ -1491,7 +1492,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { Log(options_.info_log, "Compaction start summary: %s\n", scratch); assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); - assert(compact->builder == NULL); + assert(compact->builder == nullptr); assert(!compact->outfile); SequenceNumber visible_at_tip = 0; @@ -1536,7 +1537,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { SequenceNumber visible_in_snapshot = kMaxSequenceNumber; for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work - if (imm_.imm_flush_needed.NoBarrier_Load() != NULL) { + if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); if (imm_.IsFlushPending()) { @@ -1549,9 +1550,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { Slice key = input->key(); Slice value = input->value(); - Slice* compaction_filter_value = NULL; + Slice* compaction_filter_value = nullptr; if (compact->compaction->ShouldStopBefore(key) && - compact->builder != NULL) { + compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input.get()); if (!status.ok()) { break; @@ -1604,7 +1605,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Therefore this deletion marker is obsolete and can be dropped. drop = true; RecordTick(options_.statistics, COMPACTION_KEY_DROP_OBSOLETE); - } else if (options_.CompactionFilter != NULL && + } else if (options_.CompactionFilter != nullptr && ikey.type != kTypeDeletion && ikey.sequence < earliest_snapshot) { // If the user has specified a compaction filter, then invoke @@ -1619,7 +1620,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER); } // If the application wants to change the value, then do so here. - if (compaction_filter_value != NULL) { + if (compaction_filter_value != nullptr) { value = *compaction_filter_value; delete compaction_filter_value; } @@ -1651,7 +1652,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } // Open output file if necessary - if (compact->builder == NULL) { + if (compact->builder == nullptr) { status = OpenCompactionOutputFile(compact); if (!status.ok()) { break; @@ -1679,7 +1680,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok() && shutting_down_.Acquire_Load()) { status = Status::IOError("Deleting DB during compaction"); } - if (status.ok() && compact->builder != NULL) { + if (status.ok() && compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input.get()); } if (status.ok()) { @@ -1694,7 +1695,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { stats.files_in_levelnp1 = compact->compaction->num_input_files(1); int num_output_files = compact->outputs.size(); - if (compact->builder != NULL) { + if (compact->builder != nullptr) { // An error occured so ignore the last output. assert(num_output_files > 0); --num_output_files; @@ -1789,7 +1790,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, cleanup->mu = &mutex_; cleanup->version = versions_->current(); - internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); + internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); mutex_.Unlock(); return internal_iter; @@ -1810,8 +1811,11 @@ Status DBImpl::Get(const ReadOptions& options, std::string* value) { Status s; MutexLock l(&mutex_); + std::unique_ptr sw = stats::StartStopWatch(env_, + options_.statistics, + DB_GET); SequenceNumber snapshot; - if (options.snapshot != NULL) { + if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; } else { snapshot = versions_->LastSequence(); @@ -1859,7 +1863,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); return NewDBIterator( &dbname_, env_, user_comparator(), internal_iter, - (options.snapshot != NULL + (options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot)); } @@ -1891,6 +1895,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { w.done = false; MutexLock l(&mutex_); + std::unique_ptr sw = stats::StartStopWatch(env_, + options_.statistics, + DB_WRITE); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); @@ -1900,10 +1907,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } // May temporarily unlock and wait. - Status status = MakeRoomForWrite(my_batch == NULL); + Status status = MakeRoomForWrite(my_batch == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; - if (status.ok() && my_batch != NULL) { // NULL batch is for compactions + if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions WriteBatch* updates = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(updates, last_sequence + 1); int my_batch_count = WriteBatchInternal::Count(updates); @@ -1960,12 +1967,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } // REQUIRES: Writer list must be non-empty -// REQUIRES: First writer must have a non-NULL batch +// REQUIRES: First writer must have a non-nullptr batch WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { assert(!writers_.empty()); Writer* first = writers_.front(); WriteBatch* result = first->batch; - assert(result != NULL); + assert(result != nullptr); size_t size = WriteBatchInternal::ByteSize(first->batch); @@ -1993,7 +2000,7 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { break; } - if (w->batch != NULL) { + if (w->batch != nullptr) { size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { // Do not make batch too big @@ -2268,11 +2275,11 @@ DB::~DB() { } Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { - *dbptr = NULL; + *dbptr = nullptr; - if (options.block_cache != NULL && options.no_block_cache) { + if (options.block_cache != nullptr && options.no_block_cache) { return Status::InvalidArgument( - "no_block_cache is true while block_cache is not NULL"); + "no_block_cache is true while block_cache is not nullptr"); } DBImpl* impl = new DBImpl(options, dbname); Status s = impl->CreateArchivalDirectory(); diff --git a/db/db_statistics.h b/db/db_statistics.h index 04e782d5c..76f3435f1 100644 --- a/db/db_statistics.h +++ b/db/db_statistics.h @@ -2,48 +2,75 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#ifndef LEVELDB_STORAGE_DB_DB_STATISTICS_H_ +#define LEVELDB_STORAGE_DB_DB_STATISTICS_H_ + #include #include #include #include "leveldb/statistics.h" +#include "util/histogram.h" #include "port/port.h" #include "util/mutexlock.h" + + namespace leveldb { class DBStatistics: public Statistics { public: - DBStatistics() : allTickers_(TICKER_ENUM_MAX) { } + DBStatistics() : allTickers_(TICKER_ENUM_MAX), + allHistograms_(HISTOGRAM_ENUM_MAX) { } - void incNumFileOpens() { + virtual ~DBStatistics() {} + + virtual void incNumFileOpens() { MutexLock l(&mu_); numFileOpens_++; } - void incNumFileCloses() { + virtual void incNumFileCloses() { MutexLock l(&mu_); numFileCloses_++; } - void incNumFileErrors() { + virtual void incNumFileErrors() { MutexLock l(&mu_); numFileErrors_++; } - long getTickerCount(Tickers tickerType) { + virtual long getTickerCount(Tickers tickerType) { assert(tickerType < TICKER_ENUM_MAX); return allTickers_[tickerType].getCount(); } - void recordTick(Tickers tickerType, uint64_t count) { + virtual void recordTick(Tickers tickerType, uint64_t count) { assert(tickerType < TICKER_ENUM_MAX); allTickers_[tickerType].recordTick(count); } - private: + virtual void measureTime(Histograms histogramType, uint64_t value) { + assert(histogramType < HISTOGRAM_ENUM_MAX); + allHistograms_[histogramType].Add(value); + } + + virtual void measureTime(Histograms histogramType, double value) { + assert(histogramType < HISTOGRAM_ENUM_MAX); + allHistograms_[histogramType].Add(value); + } + + virtual void histogramData(Histograms histogramType, + HistogramData * const data) { + assert(histogramType < HISTOGRAM_ENUM_MAX); + allHistograms_[histogramType].Data(data); + } + port::Mutex mu_; std::vector allTickers_; + std::vector allHistograms_; }; -} +} // namespace leveldb + +#endif // LEVELDB_STORAGE_DB_DB_STATISTICS_H_ diff --git a/include/leveldb/statistics.h b/include/leveldb/statistics.h index 967b20090..dbdfb7225 100644 --- a/include/leveldb/statistics.h +++ b/include/leveldb/statistics.h @@ -5,6 +5,11 @@ #ifndef STORAGE_LEVELDB_INCLUDE_STATISTICS_H_ #define STORAGE_LEVELDB_INCLUDE_STATISTICS_H_ +#include +#include +#include +#include + namespace leveldb { /** @@ -31,6 +36,46 @@ enum Tickers { TICKER_ENUM_MAX = 8, }; +/** + * Keep adding histogram's here. + * Any histogram whould have value less than HISTOGRAM_ENUM_MAX + * Add a new Histogram by assigning it the current value of HISTOGRAM_ENUM_MAX + * And increment HISTOGRAM_ENUM_MAX + */ +enum Histograms { + DB_GET = 0, + DB_WRITE = 1, + HISTOGRAM_ENUM_MAX = 2, +}; + +struct HistogramData { + double median; + double percentile95; + double percentile99; + double average; + double standard_deviation; +}; + + +class Histogram { + public: + // clear's the histogram + virtual void Clear() = 0; + virtual ~Histogram(); + // Add a value to be recorded in the histogram. + virtual void Add(uint64_t value) = 0; + virtual void Add(double value) = 0; + + virtual std::string ToString() const = 0; + + // Get statistics + virtual double Median() const = 0; + virtual double Percentile(double p) const = 0; + virtual double Average() const = 0; + virtual double StandardDeviation() const = 0; + virtual void Data(HistogramData * const data) const = 0; + +}; /** * A dumb ticker which keeps incrementing through its life time. @@ -47,6 +92,7 @@ class Ticker { inline void recordTick(int count) { count_ += count; } + inline uint64_t getCount() { return count_; } @@ -70,10 +116,14 @@ class Statistics { virtual long getNumFileOpens() { return numFileOpens_;} virtual long getNumFileCloses() { return numFileCloses_;} virtual long getNumFileErrors() { return numFileErrors_;} - virtual ~Statistics() {} + ~Statistics() {} virtual long getTickerCount(Tickers tickerType) = 0; virtual void recordTick(Tickers tickerType, uint64_t count = 0) = 0; + virtual void measureTime(Histograms histogramType, uint64_t count) = 0; + virtual void measureTime(Histograms histogramType, double count) = 0; + + virtual void histogramData(Histograms type, HistogramData * const data) = 0; protected: long numFileOpens_; @@ -85,7 +135,7 @@ class Statistics { inline void RecordTick(Statistics* const statistics, Tickers ticker, uint64_t count = 1) { - if (statistics != NULL) { + if (statistics != nullptr) { statistics->recordTick(ticker, count); } } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 5a5e41dda..0eb1e541e 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -73,7 +73,7 @@ static int FLAGS_open_files = 0; static int FLAGS_bloom_bits = 10; // Use the db with the following name. -static const char* FLAGS_db = NULL; +static const char* FLAGS_db = nullptr; // Verify checksum for every block read from storage static bool FLAGS_verify_checksum = false; @@ -158,7 +158,7 @@ class Stats { int next_report_; size_t bytes_; double last_op_finish_; - Histogram hist_; + HistogramImpl hist_; public: Stats() { } @@ -258,7 +258,7 @@ class SharedState { public: static const uint32_t SENTINEL = 0xffffffff; - SharedState(StressTest* stress_test) : + explicit SharedState(StressTest* stress_test) : cv_(&mu_), seed_(FLAGS_seed), max_key_(FLAGS_max_key), @@ -418,8 +418,8 @@ class StressTest { : cache_(NewLRUCache(FLAGS_cache_size)), filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) - : NULL), - db_(NULL), + : nullptr), + db_(nullptr), num_times_reopened_(0) { std::vector files; FLAGS_env->GetChildren(FLAGS_db, &files); @@ -485,7 +485,7 @@ class StressTest { for (unsigned int i = 0; i < n; i++) { delete threads[i]; - threads[i] = NULL; + threads[i] = nullptr; } double now = FLAGS_env->NowMicros(); fprintf(stdout, "%s Verification successful\n", @@ -698,7 +698,7 @@ class StressTest { } void Open() { - assert(db_ == NULL); + assert(db_ == nullptr); Options options; options.block_cache = cache_; options.write_buffer_size = FLAGS_write_buffer_size; @@ -736,7 +736,7 @@ class StressTest { // do not close the db. Just delete the lock file. This // simulates a crash-recovery kind of situation. ((DBImpl*) db_)->TEST_Destroy_DBImpl(); - db_ = NULL; + db_ = nullptr; num_times_reopened_++; double now = FLAGS_env->NowMicros(); @@ -910,7 +910,7 @@ int main(int argc, char** argv) { } // Choose a location for the test database if none given with --db= - if (FLAGS_db == NULL) { + if (FLAGS_db == nullptr) { leveldb::Env::Default()->GetTestDirectory(&default_db_path); default_db_path += "/dbstress"; FLAGS_db = default_db_path.c_str(); diff --git a/util/histogram.cc b/util/histogram.cc index 422bc1fda..0d52878e7 100644 --- a/util/histogram.cc +++ b/util/histogram.cc @@ -2,11 +2,12 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "util/histogram.h" + #include #include #include #include "port/port.h" -#include "util/histogram.h" namespace leveldb { @@ -57,7 +58,7 @@ namespace { } -Histogram::Histogram() : +HistogramImpl::HistogramImpl() : min_(bucketMapper.LastValue()), max_(0), num_(0), @@ -65,7 +66,7 @@ Histogram::Histogram() : sum_squares_(0), buckets_(std::vector(bucketMapper.BucketCount(), 0)) {} -void Histogram::Clear() { +void HistogramImpl::Clear() { min_ = bucketMapper.LastValue(); max_ = 0; num_ = 0; @@ -74,7 +75,7 @@ void Histogram::Clear() { buckets_.resize(bucketMapper.BucketCount(), 0); } -void Histogram::Add(uint64_t value) { +void HistogramImpl::Add(uint64_t value) { const size_t index = bucketMapper.IndexForValue(value); buckets_[index] += 1; if (min_ > value) min_ = value; @@ -84,11 +85,11 @@ void Histogram::Add(uint64_t value) { sum_squares_ += (value * value); } -void Histogram::Add(double value) { +void HistogramImpl::Add(double value) { Add(static_cast(value)); } -void Histogram::Merge(const Histogram& other) { +void HistogramImpl::Merge(const HistogramImpl& other) { if (other.min_ < min_) min_ = other.min_; if (other.max_ > max_) max_ = other.max_; num_ += other.num_; @@ -99,11 +100,11 @@ void Histogram::Merge(const Histogram& other) { } } -double Histogram::Median() const { +double HistogramImpl::Median() const { return Percentile(50.0); } -double Histogram::Percentile(double p) const { +double HistogramImpl::Percentile(double p) const { double threshold = num_ * (p / 100.0); double sum = 0; for (int b = 0; b < bucketMapper.BucketCount(); b++) { @@ -128,18 +129,18 @@ double Histogram::Percentile(double p) const { return max_; } -double Histogram::Average() const { +double HistogramImpl::Average() const { if (num_ == 0.0) return 0; return sum_ / num_; } -double Histogram::StandardDeviation() const { +double HistogramImpl::StandardDeviation() const { if (num_ == 0.0) return 0; double variance = (sum_squares_ * num_ - sum_ * sum_) / (num_ * num_); return sqrt(variance); } -std::string Histogram::ToString() const { +std::string HistogramImpl::ToString() const { std::string r; char buf[200]; snprintf(buf, sizeof(buf), @@ -177,4 +178,13 @@ std::string Histogram::ToString() const { return r; } -} // namespace leveldb +void HistogramImpl::Data(HistogramData * const data) const { + assert(data); + data->median = Median(); + data->percentile95 = Percentile(95); + data->percentile99 = Percentile(99); + data->average = Average(); + data->standard_deviation = StandardDeviation(); +} + +} // namespace levedb diff --git a/util/histogram.h b/util/histogram.h index 03d7c6a8b..5350b02b2 100644 --- a/util/histogram.h +++ b/util/histogram.h @@ -5,6 +5,8 @@ #ifndef STORAGE_LEVELDB_UTIL_HISTOGRAM_H_ #define STORAGE_LEVELDB_UTIL_HISTOGRAM_H_ +#include "leveldb/statistics.h" + #include #include #include @@ -45,21 +47,22 @@ class HistogramBucketMapper { std::map valueIndexMap_; }; -class Histogram { +class HistogramImpl { public: - Histogram(); + HistogramImpl(); + virtual ~HistogramImpl() {} + virtual void Clear(); + virtual void Add(uint64_t value); + virtual void Add(double value); + void Merge(const HistogramImpl& other); - void Clear(); - void Add(uint64_t value); - void Add(double value); - void Merge(const Histogram& other); + virtual std::string ToString() const; - std::string ToString() const; - - double Median() const; - double Percentile(double p) const; - double Average() const; - double StandardDeviation() const; + virtual double Median() const; + virtual double Percentile(double p) const; + virtual double Average() const; + virtual double StandardDeviation() const; + virtual void Data(HistogramData * const data) const; private: double min_; diff --git a/util/histogram_test.cc b/util/histogram_test.cc index 2a7aae4ca..158682af8 100644 --- a/util/histogram_test.cc +++ b/util/histogram_test.cc @@ -8,7 +8,7 @@ class HistogramTest { }; TEST(HistogramTest, BasicOperation) { - Histogram histogram; + HistogramImpl histogram; for (uint64_t i = 1; i <= 100; i++) { histogram.Add(i); } @@ -33,14 +33,14 @@ TEST(HistogramTest, BasicOperation) { } TEST(HistogramTest, EmptyHistogram) { - Histogram histogram; + HistogramImpl histogram; ASSERT_EQ(histogram.Median(), 0.0); ASSERT_EQ(histogram.Percentile(85.0), 0.0); ASSERT_EQ(histogram.Average(), 0.0); } TEST(HistogramTest, ClearHistogram) { - Histogram histogram; + HistogramImpl histogram; for (uint64_t i = 1; i <= 100; i++) { histogram.Add(i); } diff --git a/util/stop_watch.h b/util/stop_watch.h new file mode 100644 index 000000000..a9507bbfe --- /dev/null +++ b/util/stop_watch.h @@ -0,0 +1,68 @@ +#ifndef STORAGE_LEVELDB_UTIL_STOP_WATCH_H_ +#define STORAGE_LEVELDB_UTIL_STOP_WATCH_H_ + +#include "leveldb/env.h" +#include "leveldb/statistics.h" +#include +namespace leveldb { + +class StopWatch { + public: + virtual uint64_t ElapsedMicros() = 0; + virtual ~StopWatch() {} +}; + +class DoNothingStopWatch : public StopWatch { + public: + virtual uint64_t ElapsedMicros() { + return 0; + } +}; + +// Auto-scoped. +// Records the statistic into the corresponding histogram. +class ScopedRecordingStopWatch : public StopWatch { + public: + ScopedRecordingStopWatch(Env * const env, + Statistics * const statistics, + const Histograms histogram_name) : + env_(env), + start_time_(env->NowMicros()), + statistics_(statistics), + histogram_name_(histogram_name) {} + + virtual uint64_t ElapsedMicros() { + return env_->NowMicros() - start_time_; + } + + virtual ~ScopedRecordingStopWatch() { + uint64_t elapsed_time = env_->NowMicros() - start_time_; + statistics_->measureTime(histogram_name_, elapsed_time); + } + + private: + Env* const env_; + const uint64_t start_time_; + Statistics* const statistics_; + const Histograms histogram_name_; + +}; + +namespace stats { +// Helper method +std::unique_ptr StartStopWatch(Env * const env, + Statistics * const statistics, + Histograms histogram_name) { + assert(env); + if (statistics != nullptr) { + return std::unique_ptr(new ScopedRecordingStopWatch( + env, + statistics, + histogram_name)); + } else { + return std::unique_ptr(new DoNothingStopWatch()); + } +}; +} // namespace stats +} // namespace leveldb +#endif // STORAGE_LEVELDB_UTIL_STOP_WATCH_H_