Block cache analyzer: Support reading from human readable trace file. (#5679)

Summary:
This PR adds support in block cache trace analyzer to read from human readable trace file. This is needed when a user does not have access to the binary trace file.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5679

Test Plan: USE_CLANG=1 make check -j32

Differential Revision: D16697239

Pulled By: HaoyuHuang

fbshipit-source-id: f2e29d7995816c389b41458f234ec8e184a924db
This commit is contained in:
haoyuhuang 2019-08-09 13:09:04 -07:00 committed by Facebook Github Bot
parent e0b84538af
commit 3da225716c
6 changed files with 266 additions and 62 deletions

View File

@ -24,6 +24,10 @@
using GFLAGS_NAMESPACE::ParseCommandLineFlags;
DEFINE_string(block_cache_trace_path, "", "The trace file path.");
DEFINE_bool(is_block_cache_human_readable_trace, false,
"Is the trace file provided for analysis generated by running "
"block_cache_trace_analyzer with "
"FLAGS_human_readable_trace_file_path is specified.");
DEFINE_string(
block_cache_sim_config_path, "",
"The config file path. One cache configuration per line. The format of a "
@ -1433,6 +1437,7 @@ BlockCacheTraceAnalyzer::BlockCacheTraceAnalyzer(
const std::string& trace_file_path, const std::string& output_dir,
const std::string& human_readable_trace_file_path,
bool compute_reuse_distance, bool mrc_only,
bool is_human_readable_trace_file,
std::unique_ptr<BlockCacheTraceSimulator>&& cache_simulator)
: env_(rocksdb::Env::Default()),
trace_file_path_(trace_file_path),
@ -1440,6 +1445,7 @@ BlockCacheTraceAnalyzer::BlockCacheTraceAnalyzer(
human_readable_trace_file_path_(human_readable_trace_file_path),
compute_reuse_distance_(compute_reuse_distance),
mrc_only_(mrc_only),
is_human_readable_trace_file_(is_human_readable_trace_file),
cache_simulator_(std::move(cache_simulator)) {}
void BlockCacheTraceAnalyzer::ComputeReuseDistance(
@ -1460,33 +1466,6 @@ void BlockCacheTraceAnalyzer::ComputeReuseDistance(
info->unique_blocks_since_last_access.clear();
}
Status BlockCacheTraceAnalyzer::WriteHumanReadableTraceRecord(
const BlockCacheTraceRecord& access, uint64_t block_id,
uint64_t get_key_id) {
if (!human_readable_trace_file_writer_) {
return Status::OK();
}
int ret = snprintf(
trace_record_buffer_, sizeof(trace_record_buffer_),
"%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32
",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64
",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n",
access.access_timestamp, block_id, access.block_type, access.block_size,
access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number,
access.caller, access.no_insert, access.get_id, get_key_id,
access.referenced_data_size, access.is_cache_hit,
access.referenced_key_exist_in_block, access.num_keys_in_block,
BlockCacheTraceHelper::GetTableId(access),
BlockCacheTraceHelper::GetSequenceNumber(access), access.block_key.size(),
access.referenced_key.size(),
BlockCacheTraceHelper::GetBlockOffsetInFile(access));
if (ret < 0) {
return Status::IOError("failed to format the output");
}
std::string printout(trace_record_buffer_);
return human_readable_trace_file_writer_->Append(printout);
}
Status BlockCacheTraceAnalyzer::RecordAccess(
const BlockCacheTraceRecord& access) {
ColumnFamilyAccessInfoAggregate& cf_aggr = cf_aggregates_map_[access.cf_name];
@ -1535,25 +1514,30 @@ Status BlockCacheTraceAnalyzer::RecordAccess(
}
}
}
return WriteHumanReadableTraceRecord(access, block_access_info.block_id,
get_key_id);
return human_readable_trace_writer_.WriteHumanReadableTraceRecord(
access, block_access_info.block_id, get_key_id);
}
Status BlockCacheTraceAnalyzer::Analyze() {
std::unique_ptr<TraceReader> trace_reader;
Status s =
NewFileTraceReader(env_, EnvOptions(), trace_file_path_, &trace_reader);
if (!s.ok()) {
return s;
}
BlockCacheTraceReader reader(std::move(trace_reader));
s = reader.ReadHeader(&header_);
if (!s.ok()) {
return s;
std::unique_ptr<BlockCacheTraceReader> reader;
Status s = Status::OK();
if (is_human_readable_trace_file_) {
reader.reset(new BlockCacheHumanReadableTraceReader(trace_file_path_));
} else {
std::unique_ptr<TraceReader> trace_reader;
s = NewFileTraceReader(env_, EnvOptions(), trace_file_path_, &trace_reader);
if (!s.ok()) {
return s;
}
reader.reset(new BlockCacheTraceReader(std::move(trace_reader)));
s = reader->ReadHeader(&header_);
if (!s.ok()) {
return s;
}
}
if (!human_readable_trace_file_path_.empty()) {
s = env_->NewWritableFile(human_readable_trace_file_path_,
&human_readable_trace_file_writer_, EnvOptions());
s = human_readable_trace_writer_.NewWritableFile(
human_readable_trace_file_path_, env_);
if (!s.ok()) {
return s;
}
@ -1562,7 +1546,7 @@ Status BlockCacheTraceAnalyzer::Analyze() {
uint64_t time_interval = 0;
while (s.ok()) {
BlockCacheTraceRecord access;
s = reader.ReadAccess(&access);
s = reader->ReadAccess(&access);
if (!s.ok()) {
break;
}
@ -1598,10 +1582,6 @@ Status BlockCacheTraceAnalyzer::Analyze() {
time_interval++;
}
}
if (human_readable_trace_file_writer_) {
human_readable_trace_file_writer_->Flush();
human_readable_trace_file_writer_->Close();
}
uint64_t now = env_->NowMicros();
uint64_t duration = (now - start) / kMicrosInSecond;
uint64_t trace_duration =
@ -2152,11 +2132,11 @@ int block_cache_trace_analyzer_tool(int argc, char** argv) {
exit(1);
}
}
BlockCacheTraceAnalyzer analyzer(FLAGS_block_cache_trace_path,
FLAGS_block_cache_analysis_result_dir,
FLAGS_human_readable_trace_file_path,
!FLAGS_reuse_distance_labels.empty(),
FLAGS_mrc_only, std::move(cache_simulator));
BlockCacheTraceAnalyzer analyzer(
FLAGS_block_cache_trace_path, FLAGS_block_cache_analysis_result_dir,
FLAGS_human_readable_trace_file_path,
!FLAGS_reuse_distance_labels.empty(), FLAGS_mrc_only,
FLAGS_is_block_cache_human_readable_trace, std::move(cache_simulator));
Status s = analyzer.Analyze();
if (!s.IsIncomplete() && !s.ok()) {
// Read all traces.

View File

@ -145,6 +145,7 @@ class BlockCacheTraceAnalyzer {
const std::string& trace_file_path, const std::string& output_dir,
const std::string& human_readable_trace_file_path,
bool compute_reuse_distance, bool mrc_only,
bool is_human_readable_trace_file,
std::unique_ptr<BlockCacheTraceSimulator>&& cache_simulator);
~BlockCacheTraceAnalyzer() = default;
// No copy and move.
@ -365,15 +366,13 @@ class BlockCacheTraceAnalyzer {
const std::map<std::string, Predictions>& label_predictions,
uint32_t max_number_of_values) const;
Status WriteHumanReadableTraceRecord(const BlockCacheTraceRecord& access,
uint64_t block_id, uint64_t get_key_id);
rocksdb::Env* env_;
const std::string trace_file_path_;
const std::string output_dir_;
std::string human_readable_trace_file_path_;
const bool compute_reuse_distance_;
const bool mrc_only_;
const bool is_human_readable_trace_file_;
BlockCacheTraceHeader header_;
std::unique_ptr<BlockCacheTraceSimulator> cache_simulator_;
@ -386,8 +385,7 @@ class BlockCacheTraceAnalyzer {
MissRatioStats miss_ratio_stats_;
uint64_t unique_block_id_ = 1;
uint64_t unique_get_key_id_ = 1;
char trace_record_buffer_[1024 * 1024];
std::unique_ptr<rocksdb::WritableFile> human_readable_trace_file_writer_;
BlockCacheHumanReadableTraceWriter human_readable_trace_writer_;
};
int block_cache_trace_analyzer_tool(int argc, char** argv);

View File

@ -634,12 +634,14 @@ TEST_F(BlockCacheTracerTest, MixedBlocks) {
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
// Read blocks.
BlockCacheTraceAnalyzer analyzer(trace_file_path_,
/*output_miss_ratio_curve_path=*/"",
/*human_readable_trace_file_path=*/"",
/*compute_reuse_distance=*/true,
/*mrc_only=*/false,
/*simulator=*/nullptr);
BlockCacheTraceAnalyzer analyzer(
trace_file_path_,
/*output_miss_ratio_curve_path=*/"",
/*human_readable_trace_file_path=*/"",
/*compute_reuse_distance=*/true,
/*mrc_only=*/false,
/*is_block_cache_human_readable_trace=*/false,
/*simulator=*/nullptr);
// The analyzer ends when it detects an incomplete access record.
ASSERT_EQ(Status::Incomplete(""), analyzer.Analyze());
const uint64_t expected_num_cfs = 1;

View File

@ -5,6 +5,10 @@
#include "trace_replay/block_cache_tracer.h"
#include <cinttypes>
#include <cstdio>
#include <cstdlib>
#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "rocksdb/slice.h"
@ -300,6 +304,141 @@ Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
return Status::OK();
}
BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() {
if (human_readable_trace_file_writer_) {
human_readable_trace_file_writer_->Flush();
human_readable_trace_file_writer_->Close();
}
}
Status BlockCacheHumanReadableTraceWriter::NewWritableFile(
const std::string& human_readable_trace_file_path, rocksdb::Env* env) {
if (human_readable_trace_file_path.empty()) {
return Status::InvalidArgument(
"The provided human_readable_trace_file_path is null.");
}
return env->NewWritableFile(human_readable_trace_file_path,
&human_readable_trace_file_writer_, EnvOptions());
}
Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord(
const BlockCacheTraceRecord& access, uint64_t block_id,
uint64_t get_key_id) {
if (!human_readable_trace_file_writer_) {
return Status::OK();
}
int ret = snprintf(
trace_record_buffer_, sizeof(trace_record_buffer_),
"%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32
",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64
",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n",
access.access_timestamp, block_id, access.block_type, access.block_size,
access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number,
access.caller, access.no_insert, access.get_id, get_key_id,
access.referenced_data_size, access.is_cache_hit,
access.referenced_key_exist_in_block, access.num_keys_in_block,
BlockCacheTraceHelper::GetTableId(access),
BlockCacheTraceHelper::GetSequenceNumber(access),
static_cast<uint64_t>(access.block_key.size()),
static_cast<uint64_t>(access.referenced_key.size()),
BlockCacheTraceHelper::GetBlockOffsetInFile(access));
if (ret < 0) {
return Status::IOError("failed to format the output");
}
std::string printout(trace_record_buffer_);
return human_readable_trace_file_writer_->Append(printout);
}
BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader(
const std::string& trace_file_path)
: BlockCacheTraceReader(/*trace_reader=*/nullptr) {
human_readable_trace_reader_.open(trace_file_path, std::ifstream::in);
}
BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() {
human_readable_trace_reader_.close();
}
Status BlockCacheHumanReadableTraceReader::ReadHeader(
BlockCacheTraceHeader* /*header*/) {
return Status::OK();
}
Status BlockCacheHumanReadableTraceReader::ReadAccess(
BlockCacheTraceRecord* record) {
std::string line;
if (!std::getline(human_readable_trace_reader_, line)) {
return Status::Incomplete("No more records to read.");
}
std::stringstream ss(line);
std::vector<std::string> record_strs;
while (ss.good()) {
std::string substr;
getline(ss, substr, ',');
record_strs.push_back(substr);
}
if (record_strs.size() != 21) {
return Status::Incomplete("Records format is wrong.");
}
record->access_timestamp = ParseUint64(record_strs[0]);
uint64_t block_key = ParseUint64(record_strs[1]);
record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2]));
record->block_size = ParseUint64(record_strs[3]);
record->cf_id = ParseUint64(record_strs[4]);
record->cf_name = record_strs[5];
record->level = static_cast<uint32_t>(ParseUint64(record_strs[6]));
record->sst_fd_number = ParseUint64(record_strs[7]);
record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8]));
record->no_insert = static_cast<Boolean>(ParseUint64(record_strs[9]));
record->get_id = ParseUint64(record_strs[10]);
uint64_t get_key_id = ParseUint64(record_strs[11]);
record->referenced_data_size = ParseUint64(record_strs[12]);
record->is_cache_hit = static_cast<Boolean>(ParseUint64(record_strs[13]));
record->referenced_key_exist_in_block =
static_cast<Boolean>(ParseUint64(record_strs[14]));
record->num_keys_in_block = ParseUint64(record_strs[15]);
uint64_t table_id = ParseUint64(record_strs[16]);
if (table_id > 0) {
// Decrement since valid table id in the trace file equals traced table id
// + 1.
table_id -= 1;
}
uint64_t get_sequence_number = ParseUint64(record_strs[17]);
if (get_sequence_number > 0) {
record->get_from_user_specified_snapshot = Boolean::kTrue;
// Decrement since valid seq number in the trace file equals traced seq
// number + 1.
get_sequence_number -= 1;
}
uint64_t block_key_size = ParseUint64(record_strs[18]);
uint64_t get_key_size = ParseUint64(record_strs[19]);
uint64_t block_offset = ParseUint64(record_strs[20]);
std::string tmp_block_key;
PutVarint64(&tmp_block_key, block_key);
PutVarint64(&tmp_block_key, block_offset);
// Append 1 until the size is the same as traced block key size.
while (record->block_key.size() < block_key_size - tmp_block_key.size()) {
record->block_key += "1";
}
record->block_key += tmp_block_key;
if (get_key_id != 0) {
std::string tmp_get_key;
PutFixed64(&tmp_get_key, get_key_id);
PutFixed64(&tmp_get_key, get_sequence_number << 8);
PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id));
// Append 1 until the size is the same as traced key size.
while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) {
record->referenced_key += "1";
}
record->referenced_key += tmp_get_key;
}
return Status::OK();
}
BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }

View File

@ -6,6 +6,7 @@
#pragma once
#include <atomic>
#include <fstream>
#include "monitoring/instrumented_mutex.h"
#include "rocksdb/env.h"
@ -195,6 +196,24 @@ class BlockCacheTraceWriter {
std::unique_ptr<TraceWriter> trace_writer_;
};
// Write a trace record in human readable format, see
// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format
// for details.
class BlockCacheHumanReadableTraceWriter {
public:
~BlockCacheHumanReadableTraceWriter();
Status NewWritableFile(const std::string& human_readable_trace_file_path,
rocksdb::Env* env);
Status WriteHumanReadableTraceRecord(const BlockCacheTraceRecord& access,
uint64_t block_id, uint64_t get_key_id);
private:
char trace_record_buffer_[1024 * 1024];
std::unique_ptr<rocksdb::WritableFile> human_readable_trace_file_writer_;
};
// BlockCacheTraceReader helps read the trace file generated by
// BlockCacheTraceWriter using a user provided TraceReader.
class BlockCacheTraceReader {
@ -215,6 +234,23 @@ class BlockCacheTraceReader {
std::unique_ptr<TraceReader> trace_reader_;
};
// Read a trace record in human readable format, see
// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format
// for detailed.
class BlockCacheHumanReadableTraceReader : public BlockCacheTraceReader {
public:
BlockCacheHumanReadableTraceReader(const std::string& trace_file_path);
~BlockCacheHumanReadableTraceReader();
Status ReadHeader(BlockCacheTraceHeader* header);
Status ReadAccess(BlockCacheTraceRecord* record);
private:
std::ifstream human_readable_trace_reader_;
};
// A block cache tracer. It downsamples the accesses according to
// trace_options and uses BlockCacheTraceWriter to write the access record to
// the trace file.

View File

@ -321,6 +321,55 @@ TEST_F(BlockCacheTracerTest, MixedBlocks) {
}
}
TEST_F(BlockCacheTracerTest, HumanReadableTrace) {
BlockCacheTraceRecord record = GenerateAccessRecord();
record.get_id = 1;
record.referenced_key = "";
record.caller = TableReaderCaller::kUserGet;
record.get_from_user_specified_snapshot = Boolean::kTrue;
record.referenced_data_size = kReferencedDataSize;
PutFixed32(&record.referenced_key, 111);
PutLengthPrefixedSlice(&record.referenced_key, "get_key");
PutFixed64(&record.referenced_key, 2 << 8);
PutLengthPrefixedSlice(&record.block_key, "block_key");
PutVarint64(&record.block_key, 333);
{
// Generate a human readable trace file.
BlockCacheHumanReadableTraceWriter writer;
ASSERT_OK(writer.NewWritableFile(trace_file_path_, env_));
ASSERT_OK(writer.WriteHumanReadableTraceRecord(record, 1, 1));
ASSERT_OK(env_->FileExists(trace_file_path_));
}
{
BlockCacheHumanReadableTraceReader reader(trace_file_path_);
BlockCacheTraceHeader header;
BlockCacheTraceRecord read_record;
ASSERT_OK(reader.ReadHeader(&header));
ASSERT_OK(reader.ReadAccess(&read_record));
ASSERT_EQ(TraceType::kBlockTraceDataBlock, read_record.block_type);
ASSERT_EQ(kBlockSize, read_record.block_size);
ASSERT_EQ(kCFId, read_record.cf_id);
ASSERT_EQ(kDefaultColumnFamilyName, read_record.cf_name);
ASSERT_EQ(TableReaderCaller::kUserGet, read_record.caller);
ASSERT_EQ(kLevel, read_record.level);
ASSERT_EQ(kSSTFDNumber, read_record.sst_fd_number);
ASSERT_EQ(Boolean::kFalse, read_record.is_cache_hit);
ASSERT_EQ(Boolean::kFalse, read_record.no_insert);
ASSERT_EQ(1, read_record.get_id);
ASSERT_EQ(Boolean::kTrue, read_record.get_from_user_specified_snapshot);
ASSERT_EQ(Boolean::kTrue, read_record.referenced_key_exist_in_block);
ASSERT_EQ(kNumKeysInBlock, read_record.num_keys_in_block);
ASSERT_EQ(kReferencedDataSize, read_record.referenced_data_size);
ASSERT_EQ(record.block_key.size(), read_record.block_key.size());
ASSERT_EQ(record.referenced_key.size(), record.referenced_key.size());
ASSERT_EQ(112, BlockCacheTraceHelper::GetTableId(read_record));
ASSERT_EQ(3, BlockCacheTraceHelper::GetSequenceNumber(read_record));
ASSERT_EQ(333, BlockCacheTraceHelper::GetBlockOffsetInFile(read_record));
// Read again should fail.
ASSERT_NOK(reader.ReadAccess(&read_record));
}
}
} // namespace rocksdb
int main(int argc, char** argv) {