// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #pragma once #ifndef ROCKSDB_LITE #include #include #include #include #include #include #include "rocksdb/env.h" #include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_record.h" #include "rocksdb/write_batch.h" #include "trace_replay/trace_replay.h" namespace ROCKSDB_NAMESPACE { class DBImpl; class WriteBatch; enum TraceOperationType : int { kGet = 0, kPut = 1, kDelete = 2, kSingleDelete = 3, kRangeDelete = 4, kMerge = 5, kIteratorSeek = 6, kIteratorSeekForPrev = 7, kMultiGet = 8, kTaTypeNum = 9 }; struct TraceUnit { uint64_t ts; uint32_t type; uint32_t cf_id; size_t value_size; std::string key; }; struct TypeCorrelation { uint64_t count; uint64_t total_ts; }; struct StatsUnit { uint64_t key_id; uint64_t access_count; uint64_t latest_ts; uint64_t succ_count; // current only used to count Get if key found uint32_t cf_id; size_t value_size; std::vector v_correlation; }; class AnalyzerOptions { public: std::vector> correlation_map; std::vector> correlation_list; AnalyzerOptions(); ~AnalyzerOptions(); void SparseCorrelationInput(const std::string& in_str); }; // Note that, for the variable names in the trace_analyzer, // Starting with 'a_' means the variable is used for 'accessed_keys'. // Starting with 'w_' means it is used for 'the whole key space'. // Ending with '_f' means a file write or reader pointer. // For example, 'a_count' means 'accessed_keys_count', // 'w_key_f' means 'whole_key_space_file'. struct TraceStats { uint32_t cf_id; std::string cf_name; uint64_t a_count; uint64_t a_succ_count; uint64_t a_key_id; uint64_t a_key_size_sqsum; uint64_t a_key_size_sum; uint64_t a_key_mid; uint64_t a_value_size_sqsum; uint64_t a_value_size_sum; uint64_t a_value_mid; uint32_t a_peak_qps; double a_ave_qps; std::map a_key_stats; std::map a_count_stats; std::map a_key_size_stats; std::map a_value_size_stats; std::map a_qps_stats; std::map> a_qps_prefix_stats; std::priority_queue, std::vector>, std::greater>> top_k_queue; std::priority_queue, std::vector>, std::greater>> top_k_prefix_access; std::priority_queue, std::vector>, std::greater>> top_k_prefix_ave; std::priority_queue, std::vector>, std::greater>> top_k_qps_sec; std::list time_series; std::vector> correlation_output; std::map uni_key_num; std::unique_ptr time_series_f; std::unique_ptr a_key_f; std::unique_ptr a_count_dist_f; std::unique_ptr a_prefix_cut_f; std::unique_ptr a_value_size_f; std::unique_ptr a_key_size_f; std::unique_ptr a_key_num_f; std::unique_ptr a_qps_f; std::unique_ptr a_top_qps_prefix_f; std::unique_ptr w_key_f; std::unique_ptr w_prefix_cut_f; TraceStats(); ~TraceStats(); TraceStats(const TraceStats&) = delete; TraceStats& operator=(const TraceStats&) = delete; TraceStats(TraceStats&&) = default; TraceStats& operator=(TraceStats&&) = default; }; struct TypeUnit { std::string type_name; bool enabled; uint64_t total_keys; uint64_t total_access; uint64_t total_succ_access; uint32_t sample_count; std::map stats; TypeUnit() = default; ~TypeUnit() = default; TypeUnit(const TypeUnit&) = delete; TypeUnit& operator=(const TypeUnit&) = delete; TypeUnit(TypeUnit&&) = default; TypeUnit& operator=(TypeUnit&&) = default; }; struct CfUnit { uint32_t cf_id; uint64_t w_count; // total keys in this cf if we use the whole key space uint64_t a_count; // the total keys in this cf that are accessed std::map w_key_size_stats; // whole key space key size // statistic this cf std::map cf_qps; }; class TraceAnalyzer : private TraceRecord::Handler, private WriteBatch::Handler { public: TraceAnalyzer(std::string& trace_path, std::string& output_path, AnalyzerOptions _analyzer_opts); ~TraceAnalyzer(); Status PrepareProcessing(); Status StartProcessing(); Status MakeStatistics(); Status ReProcessing(); Status EndProcessing(); Status WriteTraceUnit(TraceUnit& unit); std::vector& GetTaVector() { return ta_; } private: using TraceRecord::Handler::Handle; Status Handle(const WriteQueryTraceRecord& record, std::unique_ptr* result) override; Status Handle(const GetQueryTraceRecord& record, std::unique_ptr* result) override; Status Handle(const IteratorSeekQueryTraceRecord& record, std::unique_ptr* result) override; Status Handle(const MultiGetQueryTraceRecord& record, std::unique_ptr* result) override; using WriteBatch::Handler::PutCF; Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override; using WriteBatch::Handler::DeleteCF; Status DeleteCF(uint32_t column_family_id, const Slice& key) override; using WriteBatch::Handler::SingleDeleteCF; Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override; using WriteBatch::Handler::DeleteRangeCF; Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key, const Slice& end_key) override; using WriteBatch::Handler::MergeCF; Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override; // The following hanlders are not implemented, return Status::OK() to avoid // the running time assertion and other irrelevant falures. using WriteBatch::Handler::PutBlobIndexCF; Status PutBlobIndexCF(uint32_t /*column_family_id*/, const Slice& /*key*/, const Slice& /*value*/) override { return Status::OK(); } // The default implementation of LogData does nothing. using WriteBatch::Handler::LogData; void LogData(const Slice& /*blob*/) override {} using WriteBatch::Handler::MarkBeginPrepare; Status MarkBeginPrepare(bool = false) override { return Status::OK(); } using WriteBatch::Handler::MarkEndPrepare; Status MarkEndPrepare(const Slice& /*xid*/) override { return Status::OK(); } using WriteBatch::Handler::MarkNoop; Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } using WriteBatch::Handler::MarkRollback; Status MarkRollback(const Slice& /*xid*/) override { return Status::OK(); } using WriteBatch::Handler::MarkCommit; Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); } ROCKSDB_NAMESPACE::Env* env_; EnvOptions env_options_; std::unique_ptr trace_reader_; size_t offset_; char buffer_[1024]; uint64_t c_time_; std::string trace_name_; std::string output_path_; AnalyzerOptions analyzer_opts_; uint64_t total_requests_; uint64_t total_access_keys_; uint64_t total_gets_; uint64_t total_writes_; uint64_t total_seeks_; uint64_t total_seek_prevs_; uint64_t total_multigets_; uint64_t trace_create_time_; uint64_t begin_time_; uint64_t end_time_; uint64_t time_series_start_; uint32_t sample_max_; uint32_t cur_time_sec_; std::unique_ptr trace_sequence_f_; // readable trace std::unique_ptr qps_f_; // overall qps std::unique_ptr cf_qps_f_; // The qps of each CF> std::vector ta_; // The main statistic collecting data structure std::map cfs_; // All the cf_id appears in this trace; std::vector qps_peak_; std::vector qps_ave_; Status ReadTraceHeader(Trace* header); Status ReadTraceFooter(Trace* footer); Status ReadTraceRecord(Trace* trace); Status KeyStatsInsertion(const uint32_t& type, const uint32_t& cf_id, const std::string& key, const size_t value_size, const uint64_t ts); Status StatsUnitCorrelationUpdate(StatsUnit& unit, const uint32_t& type, const uint64_t& ts, const std::string& key); Status OpenStatsOutputFiles(const std::string& type, TraceStats& new_stats); Status CreateOutputFile( const std::string& type, const std::string& cf_name, const std::string& ending, std::unique_ptr* f_ptr); Status CloseOutputFiles(); void PrintStatistics(); Status TraceUnitWriter( std::unique_ptr& f_ptr, TraceUnit& unit); Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id, const Slice& key, const size_t value_size, const uint64_t ts); Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats); Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit); Status MakeStatisticQPS(); int db_version_; }; int trace_analyzer_tool(int argc, char** argv); } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE