Add db_bench option --report_file_operations

Summary: Add db_bench option --report_file_operations

Test Plan:
./db_bench --report_file_operations
Observe outputs on # of file operations

Reviewers: ljin, MarkCallaghan, sdong

Reviewed By: sdong

Subscribers: yhchiang, rven, igor, dhruba

Differential Revision: https://reviews.facebook.net/D27945
This commit is contained in:
Shi Feng 2014-10-29 14:24:34 -07:00
parent 2ea1219eb6
commit ea18b944a7

View File

@ -575,6 +575,8 @@ DEFINE_string(merge_operator, "", "The merge operator to use with the database."
DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try " DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try "
"linear search first for this many steps from the previous " "linear search first for this many steps from the previous "
"position"); "position");
DEFINE_bool(report_file_operations, false, "if report number of file "
"operations");
static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) = static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) =
RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit); RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
@ -606,6 +608,131 @@ static const bool FLAGS_table_cache_numshardbits_dummy __attribute__((unused)) =
namespace rocksdb { namespace rocksdb {
namespace {
struct ReportFileOpCounters {
std::atomic<int> open_counter_;
std::atomic<int> read_counter_;
std::atomic<int> append_counter_;
std::atomic<uint64_t> bytes_read_;
std::atomic<uint64_t> bytes_written_;
};
// A special Env to records and report file operations in db_bench
class ReportFileOpEnv : public EnvWrapper {
public:
explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); }
void reset() {
counters_.open_counter_ = 0;
counters_.read_counter_ = 0;
counters_.append_counter_ = 0;
counters_.bytes_read_ = 0;
counters_.bytes_written_ = 0;
}
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) {
class CountingFile : public SequentialFile {
private:
unique_ptr<SequentialFile> target_;
ReportFileOpCounters* counters_;
public:
CountingFile(unique_ptr<SequentialFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
virtual Status Read(size_t n, Slice* result, char* scratch) {
counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Read(n, result, scratch);
counters_->bytes_read_.fetch_add(result->size(),
std::memory_order_relaxed);
return rv;
}
virtual Status Skip(uint64_t n) { return target_->Skip(n); }
};
Status s = target()->NewSequentialFile(f, r, soptions);
if (s.ok()) {
counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
r->reset(new CountingFile(std::move(*r), counters()));
}
return s;
}
Status NewRandomAccessFile(const std::string& f,
unique_ptr<RandomAccessFile>* r,
const EnvOptions& soptions) {
class CountingFile : public RandomAccessFile {
private:
unique_ptr<RandomAccessFile> target_;
ReportFileOpCounters* counters_;
public:
CountingFile(unique_ptr<RandomAccessFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Read(offset, n, result, scratch);
counters_->bytes_read_.fetch_add(result->size(),
std::memory_order_relaxed);
return rv;
}
};
Status s = target()->NewRandomAccessFile(f, r, soptions);
if (s.ok()) {
counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
r->reset(new CountingFile(std::move(*r), counters()));
}
return s;
}
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
const EnvOptions& soptions) {
class CountingFile : public WritableFile {
private:
unique_ptr<WritableFile> target_;
ReportFileOpCounters* counters_;
public:
CountingFile(unique_ptr<WritableFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
Status Append(const Slice& data) {
counters_->append_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Append(data);
counters_->bytes_written_.fetch_add(data.size(),
std::memory_order_relaxed);
return rv;
}
Status Close() { return target_->Close(); }
Status Flush() { return target_->Flush(); }
Status Sync() { return target_->Sync(); }
};
Status s = target()->NewWritableFile(f, r, soptions);
if (s.ok()) {
counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
r->reset(new CountingFile(std::move(*r), counters()));
}
return s;
}
// getter
ReportFileOpCounters* counters() { return &counters_; }
private:
ReportFileOpCounters counters_;
};
} // namespace
// Helper for quickly generating random data. // Helper for quickly generating random data.
class RandomGenerator { class RandomGenerator {
private: private:
@ -810,6 +937,21 @@ class Stats {
if (FLAGS_histogram) { if (FLAGS_histogram) {
fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
} }
if (FLAGS_report_file_operations) {
ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env);
ReportFileOpCounters* counters = env->counters();
fprintf(stdout, "Num files opened: %d\n",
counters->open_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num Read(): %d\n",
counters->read_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num Append(): %d\n",
counters->append_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num bytes read: %" PRIu64 "\n",
counters->bytes_read_.load(std::memory_order_relaxed));
fprintf(stdout, "Num bytes written: %" PRIu64 "\n",
counters->bytes_written_.load(std::memory_order_relaxed));
env->reset();
}
fflush(stdout); fflush(stdout);
} }
}; };
@ -899,6 +1041,7 @@ class Benchmark {
int64_t writes_; int64_t writes_;
int64_t readwrites_; int64_t readwrites_;
int64_t merge_keys_; int64_t merge_keys_;
bool report_file_operations_;
bool SanityCheck() { bool SanityCheck() {
if (FLAGS_compression_ratio > 1) { if (FLAGS_compression_ratio > 1) {
@ -1118,7 +1261,18 @@ class Benchmark {
readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num : readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num :
((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads) ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)
), ),
merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys) { merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
report_file_operations_(FLAGS_report_file_operations) {
if (report_file_operations_) {
if (!FLAGS_hdfs.empty()) {
fprintf(stderr,
"--hdfs and --report_file_operations cannot be enabled "
"at the same time");
exit(1);
}
FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default());
}
if (FLAGS_prefix_size > FLAGS_key_size) { if (FLAGS_prefix_size > FLAGS_key_size) {
fprintf(stderr, "prefix size is larger than key size"); fprintf(stderr, "prefix size is larger than key size");
exit(1); exit(1);