From 72613657f06a363d122da78f605a0c560a6699f0 Mon Sep 17 00:00:00 2001 From: sdong Date: Thu, 13 Aug 2015 14:35:54 -0700 Subject: [PATCH] Measure file read latency histogram per level Summary: In internal stats, remember read latency histogram, if statistics is enabled. It can be retrieved from DB::GetProperty() with "rocksdb.dbstats" property, if it is enabled. Test Plan: Manually run db_bench and prints out "rocksdb.dbstats" by hand and make sure it prints out as expected Reviewers: igor, IslamAbdelRahman, rven, kradhakrishnan, anthony, yhchiang Reviewed By: yhchiang Subscribers: MarkCallaghan, leveldb, dhruba Differential Revision: https://reviews.facebook.net/D44193 --- db/builder.cc | 11 +++++--- db/builder.h | 2 ++ db/compaction_job.cc | 4 ++- db/db_impl.cc | 4 +-- db/flush_job.cc | 16 ++++++------ db/forward_iterator.cc | 3 ++- db/internal_stats.cc | 10 ++++++++ db/internal_stats.h | 8 ++++++ db/repair.cc | 2 +- db/table_cache.cc | 13 ++++++---- db/table_cache.h | 7 +++-- db/version_set.cc | 52 ++++++++++++++++++++++++-------------- util/file_reader_writer.cc | 17 ++++++++++--- util/file_reader_writer.h | 8 ++++-- 14 files changed, 109 insertions(+), 48 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 2c3d6842d..9f499e650 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -12,6 +12,7 @@ #include #include "db/dbformat.h" #include "db/filename.h" +#include "db/internal_stats.h" #include "db/merge_helper.h" #include "db/table_cache.h" #include "db/version_edit.h" @@ -54,7 +55,8 @@ Status BuildTable( const SequenceNumber earliest_seqno_in_memtable, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, - const Env::IOPriority io_priority, TableProperties* table_properties) { + InternalStats* internal_stats, const Env::IOPriority io_priority, + TableProperties* table_properties) { // Reports the IOStats for flush for every following bytes. const size_t kReportFlushIOStatsEvery = 1048576; Status s; @@ -248,8 +250,11 @@ Status BuildTable( if (s.ok()) { // Verify that the table is usable - Iterator* it = table_cache->NewIterator(ReadOptions(), env_options, - internal_comparator, meta->fd); + Iterator* it = table_cache->NewIterator( + ReadOptions(), env_options, internal_comparator, meta->fd, nullptr, + (internal_stats == nullptr) ? nullptr + : internal_stats->GetFileReadHist(0), + false); s = it->status(); if (s.ok() && paranoid_file_checks) { for (it->SeekToFirst(); it->Valid(); it->Next()) {} diff --git a/db/builder.h b/db/builder.h index e1f625c8b..9a48227bd 100644 --- a/db/builder.h +++ b/db/builder.h @@ -30,6 +30,7 @@ class TableCache; class VersionEdit; class TableBuilder; class WritableFileWriter; +class InternalStats; TableBuilder* NewTableBuilder( const ImmutableCFOptions& options, @@ -55,6 +56,7 @@ extern Status BuildTable( const SequenceNumber earliest_seqno_in_memtable, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, + InternalStats* internal_stats, const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index e45e6cbda..376e82b51 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -760,7 +760,9 @@ Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) { FileDescriptor fd(output_number, output_path_id, current_bytes); Iterator* iter = cfd->table_cache()->NewIterator( ReadOptions(), env_options_, cfd->internal_comparator(), fd, nullptr, - true); + cfd->internal_stats()->GetFileReadHist( + compact_->compaction->output_level()), + false); s = iter->status(); if (s.ok() && paranoid_file_checks_) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 1f2fb0d75..1c3883eae 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1293,8 +1293,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, iter.get(), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), newest_snapshot, earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()), - cfd->ioptions()->compression_opts, paranoid_file_checks, Env::IO_HIGH, - &info.table_properties); + cfd->ioptions()->compression_opts, paranoid_file_checks, + cfd->internal_stats(), Env::IO_HIGH, &info.table_properties); LogFlush(db_options_.info_log); Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" diff --git a/db/flush_job.cc b/db/flush_job.cc index 2fd65c75d..e2e920378 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -232,14 +232,14 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); - s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, - cfd_->table_cache(), iter.get(), meta, - cfd_->internal_comparator(), - cfd_->int_tbl_prop_collector_factories(), newest_snapshot_, - earliest_seqno_in_memtable, output_compression_, - cfd_->ioptions()->compression_opts, - mutable_cf_options_.paranoid_file_checks, Env::IO_HIGH, - &info.table_properties); + s = BuildTable( + dbname_, db_options_.env, *cfd_->ioptions(), env_options_, + cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), + cfd_->int_tbl_prop_collector_factories(), newest_snapshot_, + earliest_seqno_in_memtable, output_compression_, + cfd_->ioptions()->compression_opts, + mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), + Env::IO_HIGH, &info.table_properties); LogFlush(db_options_.info_log); } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 5ed125930..32efe8090 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -47,7 +47,8 @@ class LevelIterator : public Iterator { assert(file_index_ < files_.size()); file_iter_.reset(cfd_->table_cache()->NewIterator( read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), - files_[file_index_]->fd, nullptr /* table_reader_ptr */, false)); + files_[file_index_]->fd, nullptr /* table_reader_ptr */, nullptr, + false)); } void SeekToLast() override { status_ = Status::NotSupported("LevelIterator::SeekToLast()"); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 69021b0e7..a3004a810 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -540,6 +540,16 @@ void InternalStats::DumpDBStats(std::string* value) { 10000.0 / std::max(interval_seconds_up, 0.001)); value->append(buf); + for (int level = 0; level < number_levels_; level++) { + if (!file_read_latency_[level].Empty()) { + char buf2[5000]; + snprintf(buf2, sizeof(buf2), + "** Level %d read latency histogram (micros):\n%s\n", level, + file_read_latency_[level].ToString().c_str()); + value->append(buf2); + } + } + db_stats_snapshot_.seconds_up = seconds_up; db_stats_snapshot_.ingest_bytes = user_bytes_written; db_stats_snapshot_.write_other = write_other; diff --git a/db/internal_stats.h b/db/internal_stats.h index c91bb2523..7b2775b04 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -98,6 +98,7 @@ class InternalStats { comp_stats_(num_levels), stall_leveln_slowdown_count_hard_(num_levels), stall_leveln_slowdown_count_soft_(num_levels), + file_read_latency_(num_levels), bg_error_count_(0), number_levels_(num_levels), env_(env), @@ -238,6 +239,10 @@ class InternalStats { db_stats_[type] += value; } + HistogramImpl* GetFileReadHist(int level) { + return &file_read_latency_[level]; + } + uint64_t GetBackgroundErrorCount() const { return bg_error_count_; } uint64_t BumpAndGetBackgroundErrorCount() { return ++bg_error_count_; } @@ -265,6 +270,7 @@ class InternalStats { // These count the number of microseconds for which MakeRoomForWrite stalls. std::vector stall_leveln_slowdown_count_hard_; std::vector stall_leveln_slowdown_count_soft_; + std::vector file_read_latency_; // Used to compute per-interval statistics struct CFStatsSnapshot { @@ -389,6 +395,8 @@ class InternalStats { void AddDBStats(InternalDBStatsType type, uint64_t value) {} + HistogramImpl* GetFileReadHist(int level) { return nullptr; } + uint64_t GetBackgroundErrorCount() const { return 0; } uint64_t BumpAndGetBackgroundErrorCount() { return 0; } diff --git a/db/repair.cc b/db/repair.cc index a0322b683..ff2b4bcaa 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -293,7 +293,7 @@ class Repairer { status = BuildTable(dbname_, env_, ioptions_, env_options_, table_cache_, iter.get(), &meta, icmp_, &int_tbl_prop_collector_factories_, 0, 0, - kNoCompression, CompressionOptions(), false); + kNoCompression, CompressionOptions(), false, nullptr); } delete mem->Unref(); delete cf_mems_default; diff --git a/db/table_cache.cc b/db/table_cache.cc index 8b11d0fed..47a4e2d3a 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -79,7 +79,8 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { Status TableCache::FindTable(const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, Cache::Handle** handle, - const bool no_io, bool record_read_stats) { + const bool no_io, bool record_read_stats, + HistogramImpl* file_read_hist) { PERF_TIMER_GUARD(find_table_nanos); Status s; uint64_t number = fd.GetNumber(); @@ -104,7 +105,7 @@ Status TableCache::FindTable(const EnvOptions& env_options, new RandomAccessFileReader( std::move(file), ioptions_.env, record_read_stats ? ioptions_.statistics : nullptr, - SST_READ_MICROS)); + SST_READ_MICROS, file_read_hist)); s = ioptions_.table_factory->NewTableReader( ioptions_, env_options, internal_comparator, std::move(file_reader), fd.GetFileSize(), &table_reader); @@ -128,6 +129,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, const InternalKeyComparator& icomparator, const FileDescriptor& fd, TableReader** table_reader_ptr, + HistogramImpl* file_read_hist, bool for_compaction, Arena* arena) { PERF_TIMER_GUARD(new_table_iterator_nanos); @@ -139,7 +141,8 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, Status s; if (table_reader == nullptr) { s = FindTable(env_options, icomparator, fd, &handle, - options.read_tier == kBlockCacheTier, !for_compaction); + options.read_tier == kBlockCacheTier, !for_compaction, + file_read_hist); if (!s.ok()) { return NewErrorIterator(s, arena); } @@ -164,7 +167,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, Status TableCache::Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, const Slice& k, - GetContext* get_context) { + GetContext* get_context, HistogramImpl* file_read_hist) { TableReader* t = fd.table_reader; Status s; Cache::Handle* handle = nullptr; @@ -210,7 +213,7 @@ Status TableCache::Get(const ReadOptions& options, if (!t) { s = FindTable(env_options_, internal_comparator, fd, &handle, - options.read_tier == kBlockCacheTier); + options.read_tier == kBlockCacheTier, file_read_hist); if (s.ok()) { t = GetTableReaderFromHandle(handle); } diff --git a/db/table_cache.h b/db/table_cache.h index 95b74b54f..4474fcc6a 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -28,6 +28,7 @@ class Env; class Arena; struct FileDescriptor; class GetContext; +class HistogramImpl; class TableCache { public: @@ -46,6 +47,7 @@ class TableCache { const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr, + HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, Arena* arena = nullptr); // If a seek to internal key "k" in specified file finds an entry, @@ -54,7 +56,7 @@ class TableCache { Status Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, const Slice& k, - GetContext* get_context); + GetContext* get_context, HistogramImpl* file_read_hist = nullptr); // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); @@ -63,7 +65,8 @@ class TableCache { Status FindTable(const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, Cache::Handle**, - const bool no_io = false, bool record_read_stats = true); + const bool no_io = false, bool record_read_stats = true, + HistogramImpl* file_read_hist = nullptr); // Get TableReader from a cache handle. TableReader* GetTableReaderFromHandle(Cache::Handle* handle); diff --git a/db/version_set.cc b/db/version_set.cc index c142cb985..c6e0b52d9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -24,6 +24,7 @@ #include #include "db/filename.h" +#include "db/internal_stats.h" #include "db/log_reader.h" #include "db/log_writer.h" #include "db/memtable.h" @@ -474,13 +475,18 @@ class LevelFileNumIterator : public Iterator { class LevelFileIteratorState : public TwoLevelIteratorState { public: LevelFileIteratorState(TableCache* table_cache, - const ReadOptions& read_options, const EnvOptions& env_options, - const InternalKeyComparator& icomparator, bool for_compaction, - bool prefix_enabled) - : TwoLevelIteratorState(prefix_enabled), - table_cache_(table_cache), read_options_(read_options), - env_options_(env_options), icomparator_(icomparator), - for_compaction_(for_compaction) {} + const ReadOptions& read_options, + const EnvOptions& env_options, + const InternalKeyComparator& icomparator, + HistogramImpl* file_read_hist, bool for_compaction, + bool prefix_enabled) + : TwoLevelIteratorState(prefix_enabled), + table_cache_(table_cache), + read_options_(read_options), + env_options_(env_options), + icomparator_(icomparator), + file_read_hist_(file_read_hist), + for_compaction_(for_compaction) {} Iterator* NewSecondaryIterator(const Slice& meta_handle) override { if (meta_handle.size() != sizeof(FileDescriptor)) { @@ -491,7 +497,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { reinterpret_cast(meta_handle.data()); return table_cache_->NewIterator( read_options_, env_options_, icomparator_, *fd, - nullptr /* don't need reference to table*/, for_compaction_); + nullptr /* don't need reference to table*/, file_read_hist_, + for_compaction_); } } @@ -504,6 +511,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { const ReadOptions read_options_; const EnvOptions& env_options_; const InternalKeyComparator& icomparator_; + HistogramImpl* file_read_hist_; bool for_compaction_; }; @@ -705,7 +713,7 @@ void Version::AddIterators(const ReadOptions& read_options, const auto& file = storage_info_.LevelFilesBrief(0).files[i]; merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, - false, arena)); + cfd_->internal_stats()->GetFileReadHist(0), false, arena)); } // For levels > 0, we can use a concatenating iterator that sequentially @@ -714,10 +722,12 @@ void Version::AddIterators(const ReadOptions& read_options, for (int level = 1; level < storage_info_.num_non_empty_levels(); level++) { if (storage_info_.LevelFilesBrief(level).num_files != 0) { auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); - auto* state = new (mem) LevelFileIteratorState( - cfd_->table_cache(), read_options, soptions, - cfd_->internal_comparator(), false /* for_compaction */, - cfd_->ioptions()->prefix_extractor != nullptr); + auto* state = new (mem) + LevelFileIteratorState(cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), + cfd_->internal_stats()->GetFileReadHist(level), + false /* for_compaction */, + cfd_->ioptions()->prefix_extractor != nullptr); mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); auto* first_level_iter = new (mem) LevelFileNumIterator( cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); @@ -810,8 +820,9 @@ void Version::Get(const ReadOptions& read_options, user_comparator(), internal_comparator()); FdWithKeyRange* f = fp.GetNextFile(); while (f != nullptr) { - *status = table_cache_->Get(read_options, *internal_comparator(), f->fd, - ikey, &get_context); + *status = table_cache_->Get( + read_options, *internal_comparator(), f->fd, ikey, &get_context, + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel())); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -3059,14 +3070,17 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { list[num++] = cfd->table_cache()->NewIterator( read_options, env_options_compactions_, cfd->internal_comparator(), flevel->files[i].fd, nullptr, + nullptr, /* no per level latency histogram*/ true /* for compaction */); } } else { // Create concatenating iterator for the files from this level - list[num++] = NewTwoLevelIterator(new LevelFileIteratorState( - cfd->table_cache(), read_options, env_options_, - cfd->internal_comparator(), true /* for_compaction */, - false /* prefix enabled */), + list[num++] = NewTwoLevelIterator( + new LevelFileIteratorState( + cfd->table_cache(), read_options, env_options_, + cfd->internal_comparator(), + nullptr /* no per level latency histogram */, + true /* for_compaction */, false /* prefix enabled */), new LevelFileNumIterator(cfd->internal_comparator(), c->input_levels(which))); } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 9775556d4..502b55ce2 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -11,6 +11,7 @@ #include #include "port/port.h" +#include "util/histogram.h" #include "util/iostats_context_imp.h" #include "util/random.h" #include "util/rate_limiter.h" @@ -27,10 +28,18 @@ Status SequentialFileReader::Skip(uint64_t n) { return file_->Skip(n); } Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, char* scratch) const { - StopWatch sw(env_, stats_, hist_type_); - IOSTATS_TIMER_GUARD(read_nanos); - Status s = file_->Read(offset, n, result, scratch); - IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); + Status s; + uint64_t elapsed = 0; + { + StopWatch sw(env_, stats_, hist_type_, + (stats_ != nullptr) ? &elapsed : nullptr); + IOSTATS_TIMER_GUARD(read_nanos); + s = file_->Read(offset, n, result, scratch); + IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); + } + if (stats_ != nullptr && file_read_hist_ != nullptr) { + file_read_hist_->Add(elapsed); + } return s; } diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 42de8a0e4..c0a3b5aa8 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -12,6 +12,7 @@ namespace rocksdb { class Statistics; +class HistogramImpl; class SequentialFileReader { private: @@ -33,16 +34,19 @@ class RandomAccessFileReader : public RandomAccessFile { Env* env_; Statistics* stats_; uint32_t hist_type_; + HistogramImpl* file_read_hist_; public: explicit RandomAccessFileReader(std::unique_ptr&& raf, Env* env = nullptr, Statistics* stats = nullptr, - uint32_t hist_type = 0) + uint32_t hist_type = 0, + HistogramImpl* file_read_hist = nullptr) : file_(std::move(raf)), env_(env), stats_(stats), - hist_type_(hist_type) {} + hist_type_(hist_type), + file_read_hist_(file_read_hist) {} Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const;