Split histogram per OperationType in db_bench
This commit is contained in:
parent
f307036bde
commit
ebc2d490d1
140
db/db_bench.cc
140
db/db_bench.cc
@ -39,6 +39,7 @@ int main() {
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "db/db_impl.h"
|
||||
#include "db/version_set.h"
|
||||
@ -741,11 +742,6 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
|
||||
"table becomes an identity function. This is only valid when key "
|
||||
"is 8 bytes");
|
||||
|
||||
enum PutOrMerge {
|
||||
kPut,
|
||||
kMerge
|
||||
};
|
||||
|
||||
enum RepFactory {
|
||||
kSkipList,
|
||||
kPrefixHash,
|
||||
@ -1163,6 +1159,35 @@ class ReporterAgent {
|
||||
bool stop_;
|
||||
};
|
||||
|
||||
enum OperationType : unsigned char {
|
||||
kRead = 0,
|
||||
kWrite,
|
||||
kDelete,
|
||||
kSeek,
|
||||
kMerge,
|
||||
kUpdate,
|
||||
kCompress,
|
||||
kUncompress,
|
||||
kCrc,
|
||||
kHash,
|
||||
kOthers
|
||||
};
|
||||
|
||||
static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
|
||||
OperationTypeString = {
|
||||
{kRead, "read"},
|
||||
{kWrite, "write"},
|
||||
{kDelete, "delete"},
|
||||
{kSeek, "seek"},
|
||||
{kMerge, "merge"},
|
||||
{kUpdate, "update"},
|
||||
{kCompress, "compress"},
|
||||
{kCompress, "uncompress"},
|
||||
{kCrc, "crc"},
|
||||
{kHash, "hash"},
|
||||
{kOthers, "op"}
|
||||
};
|
||||
|
||||
class Stats {
|
||||
private:
|
||||
int id_;
|
||||
@ -1175,7 +1200,8 @@ class Stats {
|
||||
int64_t bytes_;
|
||||
double last_op_finish_;
|
||||
double last_report_finish_;
|
||||
HistogramImpl hist_;
|
||||
std::unordered_map<OperationType, HistogramImpl,
|
||||
std::hash<unsigned char>> hist_;
|
||||
std::string message_;
|
||||
bool exclude_from_merge_;
|
||||
ReporterAgent* reporter_agent_; // does not own
|
||||
@ -1191,7 +1217,7 @@ class Stats {
|
||||
id_ = id;
|
||||
next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
|
||||
last_op_finish_ = start_;
|
||||
hist_.Clear();
|
||||
hist_.clear();
|
||||
done_ = 0;
|
||||
last_report_done_ = 0;
|
||||
bytes_ = 0;
|
||||
@ -1208,7 +1234,15 @@ class Stats {
|
||||
if (other.exclude_from_merge_)
|
||||
return;
|
||||
|
||||
hist_.Merge(other.hist_);
|
||||
for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
|
||||
auto this_it = hist_.find(it->first);
|
||||
if (this_it != hist_.end()) {
|
||||
this_it->second.Merge(other.hist_.at(it->first));
|
||||
} else {
|
||||
hist_.insert({ it->first, it->second });
|
||||
}
|
||||
}
|
||||
|
||||
done_ += other.done_;
|
||||
bytes_ += other.bytes_;
|
||||
seconds_ += other.seconds_;
|
||||
@ -1261,14 +1295,22 @@ class Stats {
|
||||
}
|
||||
}
|
||||
|
||||
void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops) {
|
||||
void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
|
||||
enum OperationType op_type = kOthers) {
|
||||
if (reporter_agent_) {
|
||||
reporter_agent_->ReportFinishedOps(num_ops);
|
||||
}
|
||||
if (FLAGS_histogram) {
|
||||
double now = FLAGS_env->NowMicros();
|
||||
double micros = now - last_op_finish_;
|
||||
hist_.Add(micros);
|
||||
|
||||
if (hist_.find(op_type) == hist_.end())
|
||||
{
|
||||
HistogramImpl hist_temp;
|
||||
hist_.insert({op_type, hist_temp});
|
||||
}
|
||||
hist_[op_type].Add(micros);
|
||||
|
||||
if (micros > 20000 && !FLAGS_stats_interval) {
|
||||
fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
|
||||
fflush(stderr);
|
||||
@ -1397,7 +1439,11 @@ class Stats {
|
||||
(extra.empty() ? "" : " "),
|
||||
extra.c_str());
|
||||
if (FLAGS_histogram) {
|
||||
fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
|
||||
for (auto it = hist_.begin(); it != hist_.end(); ++it) {
|
||||
fprintf(stdout, "Microseconds per %s:\n%s\n",
|
||||
OperationTypeString[it->first].c_str(),
|
||||
it->second.ToString().c_str());
|
||||
}
|
||||
}
|
||||
if (FLAGS_report_file_operations) {
|
||||
ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env);
|
||||
@ -2138,7 +2184,7 @@ class Benchmark {
|
||||
uint32_t crc = 0;
|
||||
while (bytes < 500 * 1048576) {
|
||||
crc = crc32c::Value(data.data(), size);
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1);
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1, kCrc);
|
||||
bytes += size;
|
||||
}
|
||||
// Print so result is not dead
|
||||
@ -2157,7 +2203,7 @@ class Benchmark {
|
||||
unsigned int xxh32 = 0;
|
||||
while (bytes < 500 * 1048576) {
|
||||
xxh32 = XXH32(data.data(), size, 0);
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1);
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1, kHash);
|
||||
bytes += size;
|
||||
}
|
||||
// Print so result is not dead
|
||||
@ -2178,7 +2224,7 @@ class Benchmark {
|
||||
ptr = ap.load(std::memory_order_acquire);
|
||||
}
|
||||
count++;
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1);
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1, kOthers);
|
||||
}
|
||||
if (ptr == nullptr) exit(1); // Disable unused variable warning.
|
||||
}
|
||||
@ -2196,7 +2242,7 @@ class Benchmark {
|
||||
ok = CompressSlice(input, &compressed);
|
||||
produced += compressed.size();
|
||||
bytes += input.size();
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1);
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
|
||||
}
|
||||
|
||||
if (!ok) {
|
||||
@ -2257,7 +2303,7 @@ class Benchmark {
|
||||
}
|
||||
delete[] uncompressed;
|
||||
bytes += input.size();
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1);
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
|
||||
}
|
||||
|
||||
if (!ok) {
|
||||
@ -2788,7 +2834,7 @@ class Benchmark {
|
||||
}
|
||||
s = db_with_cfh->db->Write(write_options_, &batch);
|
||||
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
|
||||
entries_per_batch_);
|
||||
entries_per_batch_, kWrite);
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
|
||||
exit(1);
|
||||
@ -2816,7 +2862,7 @@ class Benchmark {
|
||||
int64_t bytes = 0;
|
||||
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
|
||||
bytes += iter->key().size() + iter->value().size();
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kRead);
|
||||
++i;
|
||||
}
|
||||
delete iter;
|
||||
@ -2839,7 +2885,7 @@ class Benchmark {
|
||||
int64_t bytes = 0;
|
||||
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
|
||||
bytes += iter->key().size() + iter->value().size();
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kRead);
|
||||
++i;
|
||||
}
|
||||
delete iter;
|
||||
@ -2879,7 +2925,7 @@ class Benchmark {
|
||||
++nonexist;
|
||||
}
|
||||
}
|
||||
thread->stats.FinishedOps(nullptr, db, 100);
|
||||
thread->stats.FinishedOps(nullptr, db, 100, kRead);
|
||||
} while (!duration.Done(100));
|
||||
|
||||
char msg[100];
|
||||
@ -2947,7 +2993,7 @@ class Benchmark {
|
||||
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
|
||||
abort();
|
||||
}
|
||||
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1);
|
||||
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
|
||||
}
|
||||
|
||||
char msg[100];
|
||||
@ -2995,7 +3041,7 @@ class Benchmark {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
thread->stats.FinishedOps(nullptr, db, entries_per_batch_);
|
||||
thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
|
||||
}
|
||||
|
||||
char msg[100];
|
||||
@ -3011,7 +3057,7 @@ class Benchmark {
|
||||
DB* db = SelectDB(thread);
|
||||
Iterator* iter = db->NewIterator(options);
|
||||
delete iter;
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kOthers);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3019,7 +3065,7 @@ class Benchmark {
|
||||
if (thread->tid > 0) {
|
||||
IteratorCreation(thread);
|
||||
} else {
|
||||
BGWriter(thread, kPut);
|
||||
BGWriter(thread, kWrite);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3088,7 +3134,7 @@ class Benchmark {
|
||||
assert(iter_to_use->status().ok());
|
||||
}
|
||||
|
||||
thread->stats.FinishedOps(&db_, db_.db, 1);
|
||||
thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
|
||||
}
|
||||
delete single_iter;
|
||||
for (auto iter : multi_iters) {
|
||||
@ -3109,7 +3155,7 @@ class Benchmark {
|
||||
if (thread->tid > 0) {
|
||||
SeekRandom(thread);
|
||||
} else {
|
||||
BGWriter(thread, kPut);
|
||||
BGWriter(thread, kWrite);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3137,7 +3183,7 @@ class Benchmark {
|
||||
batch.Delete(key);
|
||||
}
|
||||
auto s = db->Write(write_options_, &batch);
|
||||
thread->stats.FinishedOps(nullptr, db, entries_per_batch_);
|
||||
thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete);
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "del error: %s\n", s.ToString().c_str());
|
||||
exit(1);
|
||||
@ -3158,7 +3204,7 @@ class Benchmark {
|
||||
if (thread->tid > 0) {
|
||||
ReadRandom(thread);
|
||||
} else {
|
||||
BGWriter(thread, kPut);
|
||||
BGWriter(thread, kWrite);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3170,7 +3216,7 @@ class Benchmark {
|
||||
}
|
||||
}
|
||||
|
||||
void BGWriter(ThreadState* thread, enum PutOrMerge write_merge) {
|
||||
void BGWriter(ThreadState* thread, enum OperationType write_merge) {
|
||||
// Special thread that keeps writing until other threads are done.
|
||||
RandomGenerator gen;
|
||||
double last = FLAGS_env->NowMicros();
|
||||
@ -3203,7 +3249,7 @@ class Benchmark {
|
||||
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
|
||||
Status s;
|
||||
|
||||
if (write_merge == kPut) {
|
||||
if (write_merge == kWrite) {
|
||||
s = db->Put(write_options_, key, gen.Generate(value_size_));
|
||||
} else {
|
||||
s = db->Merge(write_options_, key, gen.Generate(value_size_));
|
||||
@ -3214,7 +3260,7 @@ class Benchmark {
|
||||
exit(1);
|
||||
}
|
||||
bytes += key.size() + value_size_;
|
||||
thread->stats.FinishedOps(&db_, db_.db, 1);
|
||||
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
|
||||
|
||||
++num_writes;
|
||||
if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) {
|
||||
@ -3355,6 +3401,7 @@ class Benchmark {
|
||||
}
|
||||
get_weight--;
|
||||
gets_done++;
|
||||
thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
|
||||
} else if (put_weight > 0) {
|
||||
// then do all the corresponding number of puts
|
||||
// for all the gets we have done earlier
|
||||
@ -3365,6 +3412,7 @@ class Benchmark {
|
||||
}
|
||||
put_weight--;
|
||||
puts_done++;
|
||||
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
|
||||
} else if (delete_weight > 0) {
|
||||
Status s = DeleteMany(db, write_options_, key);
|
||||
if (!s.ok()) {
|
||||
@ -3373,9 +3421,8 @@ class Benchmark {
|
||||
}
|
||||
delete_weight--;
|
||||
deletes_done++;
|
||||
thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
|
||||
}
|
||||
|
||||
thread->stats.FinishedOps(&db_, db_.db, 1);
|
||||
}
|
||||
char msg[100];
|
||||
snprintf(msg, sizeof(msg),
|
||||
@ -3422,6 +3469,7 @@ class Benchmark {
|
||||
}
|
||||
get_weight--;
|
||||
reads_done++;
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kRead);
|
||||
} else if (put_weight > 0) {
|
||||
// then do all the corresponding number of puts
|
||||
// for all the gets we have done earlier
|
||||
@ -3432,8 +3480,8 @@ class Benchmark {
|
||||
}
|
||||
put_weight--;
|
||||
writes_done++;
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kWrite);
|
||||
}
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
}
|
||||
char msg[100];
|
||||
snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
|
||||
@ -3475,7 +3523,7 @@ class Benchmark {
|
||||
exit(1);
|
||||
}
|
||||
bytes += key.size() + value_size_;
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
|
||||
}
|
||||
char msg[100];
|
||||
snprintf(msg, sizeof(msg),
|
||||
@ -3530,7 +3578,7 @@ class Benchmark {
|
||||
exit(1);
|
||||
}
|
||||
bytes += key.size() + value.size();
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
|
||||
}
|
||||
|
||||
char msg[100];
|
||||
@ -3568,7 +3616,7 @@ class Benchmark {
|
||||
exit(1);
|
||||
}
|
||||
bytes += key.size() + value_size_;
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kMerge);
|
||||
}
|
||||
|
||||
// Print some statistics
|
||||
@ -3610,9 +3658,8 @@ class Benchmark {
|
||||
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
|
||||
exit(1);
|
||||
}
|
||||
|
||||
num_merges++;
|
||||
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kMerge);
|
||||
} else {
|
||||
Status s = db->Get(options, key, &value);
|
||||
if (value.length() > max_length)
|
||||
@ -3625,12 +3672,9 @@ class Benchmark {
|
||||
} else if (!s.IsNotFound()) {
|
||||
num_hits++;
|
||||
}
|
||||
|
||||
num_gets++;
|
||||
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kRead);
|
||||
}
|
||||
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
}
|
||||
|
||||
char msg[100];
|
||||
@ -3657,7 +3701,7 @@ class Benchmark {
|
||||
GenerateKeyFromInt(i, FLAGS_num, &key);
|
||||
iter->Seek(key);
|
||||
assert(iter->Valid() && iter->key() == key);
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kSeek);
|
||||
|
||||
for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) {
|
||||
if (!FLAGS_reverse_iterator) {
|
||||
@ -3667,12 +3711,12 @@ class Benchmark {
|
||||
}
|
||||
GenerateKeyFromInt(++i, FLAGS_num, &key);
|
||||
assert(iter->Valid() && iter->key() == key);
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kSeek);
|
||||
}
|
||||
|
||||
iter->Seek(key);
|
||||
assert(iter->Valid() && iter->key() == key);
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kSeek);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3834,7 +3878,7 @@ class Benchmark {
|
||||
}
|
||||
|
||||
if (!failed) {
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kOthers);
|
||||
}
|
||||
|
||||
transactions_done++;
|
||||
@ -3960,7 +4004,7 @@ class Benchmark {
|
||||
exit(1);
|
||||
}
|
||||
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kOthers);
|
||||
}
|
||||
|
||||
char msg[200];
|
||||
|
Loading…
Reference in New Issue
Block a user