db_bench: add a mode to operate multiple DBs

Summary: This patch introduces a new parameter num_multi_db in db_bench. When this parameter is larger than 1, multiple DBs will be created. In all benchmarks, any operation applies to a random DB among them. This is to benchmark the performance of similar applications.

Test Plan: run db_bench on both of num_multi_db=0 and more.

Reviewers: haobo, ljin, igor

Reviewed By: igor

CC: igor, yhchiang, dhruba, nkg-, leveldb

Differential Revision: https://reviews.facebook.net/D17769
This commit is contained in:
sdong 2014-04-11 12:15:09 -07:00
parent 30aff72f77
commit d5e087b6df

View File

@ -157,6 +157,9 @@ static bool ValidateKeySize(const char* flagname, int32_t value) {
DEFINE_int32(key_size, 16, "size of each key");
DEFINE_int32(num_multi_db, 0,
"Number of DBs used in the benchmark. 0 means single DB.");
DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
" to this fraction of their original size after compression");
@ -814,6 +817,7 @@ class Benchmark {
const FilterPolicy* filter_policy_;
const SliceTransform* prefix_extractor_;
DB* db_;
std::vector<DB*> multi_dbs_;
int64_t num_;
int value_size_;
int key_size_;
@ -1096,6 +1100,10 @@ class Benchmark {
}
}
std::string GetDbNameForMultiple(std::string base_name, size_t id) {
return base_name + std::to_string(id);
}
void Run() {
PrintHeader();
Open();
@ -1245,11 +1253,18 @@ class Benchmark {
name.ToString().c_str());
method = nullptr;
} else {
if (db_ != nullptr) {
delete db_;
db_ = nullptr;
DestroyDB(FLAGS_db, Options());
Open();
}
for (size_t i = 0; i < multi_dbs_.size(); i++) {
delete multi_dbs_[i];
DestroyDB(GetDbNameForMultiple(FLAGS_db, i), Options());
}
multi_dbs_.clear();
}
Open();
}
if (method != nullptr) {
@ -1666,19 +1681,32 @@ class Benchmark {
FLAGS_universal_compression_size_percent;
}
if (FLAGS_num_multi_db <= 1) {
OpenDb(options, FLAGS_db, &db_);
} else {
multi_dbs_.clear();
for (size_t i = 0; i < FLAGS_num_multi_db; i++) {
DB* db;
OpenDb(options, GetDbNameForMultiple(FLAGS_db, i), &db);
multi_dbs_.push_back(db);
}
}
if (FLAGS_min_level_to_compress >= 0) {
options.compression_per_level.clear();
}
}
void OpenDb(Options options, std::string db_name, DB** db) {
Status s;
if(FLAGS_readonly) {
s = DB::OpenForReadOnly(options, FLAGS_db, &db_);
s = DB::OpenForReadOnly(options, db_name, db);
} else {
s = DB::Open(options, FLAGS_db, &db_);
s = DB::Open(options, db_name, db);
}
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
}
if (FLAGS_min_level_to_compress >= 0) {
options.compression_per_level.clear();
}
}
enum WriteMode {
@ -1740,13 +1768,27 @@ class Benchmark {
std::vector<uint64_t> values_;
};
DB* SelectDB(ThreadState* thread) {
if (db_ != nullptr) {
return db_;
} else {
return multi_dbs_[thread->rand.Next() % multi_dbs_.size()];
}
}
void DoWrite(ThreadState* thread, WriteMode write_mode) {
const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
const int64_t num_ops = writes_ == 0 ? num_ : writes_;
Duration duration(test_duration, num_ops);
KeyGenerator key_gen(&(thread->rand), write_mode, num_ops);
size_t num_key_gens = 1;
if (db_ == nullptr) {
num_key_gens = multi_dbs_.size();
}
std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
Duration duration(test_duration, num_ops * num_key_gens);
for (size_t i = 0; i < num_key_gens; i++) {
key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode, num_ops));
}
if (num_ != FLAGS_num) {
char msg[100];
@ -1762,14 +1804,20 @@ class Benchmark {
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
while (!duration.Done(entries_per_batch_)) {
size_t id = 0;
DB* db_to_write = db_;
if (db_to_write == nullptr) {
id = thread->rand.Next() % num_key_gens;
db_to_write = multi_dbs_[id];
}
batch.Clear();
for (int64_t j = 0; j < entries_per_batch_; j++) {
GenerateKeyFromInt(key_gen.Next(), FLAGS_num, &key);
GenerateKeyFromInt(key_gens[id]->Next(), FLAGS_num, &key);
batch.Put(key, gen.Generate(value_size_));
bytes += value_size_ + key_size_;
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedSingleOp(db_to_write);
}
s = db_->Write(write_options_, &batch);
s = db_to_write->Write(write_options_, &batch);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -1779,12 +1827,22 @@ class Benchmark {
}
void ReadSequential(ThreadState* thread) {
Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
if (db_ != nullptr) {
ReadSequential(thread, db_);
} else {
for (DB* db : multi_dbs_) {
ReadSequential(thread, db);
}
}
}
void ReadSequential(ThreadState* thread, DB* db) {
Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
int64_t i = 0;
int64_t bytes = 0;
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedSingleOp(db);
++i;
}
delete iter;
@ -1792,7 +1850,17 @@ class Benchmark {
}
void ReadReverse(ThreadState* thread) {
Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
if (db_ != nullptr) {
ReadReverse(thread, db_);
} else {
for (DB* db : multi_dbs_) {
ReadReverse(thread, db);
}
}
}
void ReadReverse(ThreadState* thread, DB* db) {
Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
int64_t i = 0;
int64_t bytes = 0;
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
@ -1814,9 +1882,10 @@ class Benchmark {
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
read++;
if (db_->Get(options, key, &value).ok()) {
if (db->Get(options, key, &value).ok()) {
found++;
}
thread->stats.FinishedSingleOp(db_);
@ -1847,11 +1916,12 @@ class Benchmark {
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
for (int64_t i = 0; i < entries_per_batch_; ++i) {
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num,
FLAGS_num, &keys[i]);
}
std::vector<Status> statuses = db_->MultiGet(options, keys, &values);
std::vector<Status> statuses = db->MultiGet(options, keys, &values);
assert(statuses.size() == entries_per_batch_);
read += entries_per_batch_;
@ -1876,9 +1946,10 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
options.prefix_seek = (FLAGS_prefix_size > 0);
while (!duration.Done(1)) {
Iterator* iter = db_->NewIterator(options);
DB* db = SelectDB(thread);
Iterator* iter = db->NewIterator(options);
delete iter;
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedSingleOp(db);
}
}
@ -1896,21 +1967,40 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
options.tailing = FLAGS_use_tailing_iterator;
options.prefix_seek = (FLAGS_prefix_size > 0);
auto* iter = db_->NewIterator(options);
Iterator* single_iter = nullptr;
std::vector<Iterator*> multi_iters;
if (db_ != nullptr) {
single_iter = db_->NewIterator(options);
} else {
for (DB* db : multi_dbs_) {
multi_iters.push_back(db->NewIterator(options));
}
}
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
// Pick a Iterator to use
Iterator* iter_to_use = single_iter;
if (single_iter == nullptr) {
iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()];
}
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
iter->Seek(key);
iter_to_use->Seek(key);
read++;
if (iter->Valid() && iter->key().compare(key) == 0) {
if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
found++;
}
thread->stats.FinishedSingleOp(db_);
}
delete single_iter;
for (auto iter : multi_iters) {
delete iter;
}
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
@ -1934,14 +2024,15 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard(key.data());
while (!duration.Done(entries_per_batch_)) {
DB* db = SelectDB(thread);
batch.Clear();
for (int64_t j = 0; j < entries_per_batch_; ++j) {
const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
GenerateKeyFromInt(k, FLAGS_num, &key);
batch.Delete(key);
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedSingleOp(db);
}
auto s = db_->Write(write_options_, &batch);
auto s = db->Write(write_options_, &batch);
if (!s.ok()) {
fprintf(stderr, "del error: %s\n", s.ToString().c_str());
exit(1);
@ -1986,6 +2077,7 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard(key.data());
while (true) {
DB* db = SelectDB(thread);
{
MutexLock l(&thread->shared->mu);
if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
@ -1995,7 +2087,7 @@ class Benchmark {
}
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
Status s = db->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -2020,8 +2112,8 @@ class Benchmark {
// Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
// in DB atomically i.e in a single batch. Also refer GetMany.
Status PutMany(const WriteOptions& writeoptions,
const Slice& key, const Slice& value) {
Status PutMany(DB* db, const WriteOptions& writeoptions, const Slice& key,
const Slice& value) {
std::string suffixes[3] = {"2", "1", "0"};
std::string keys[3];
@ -2032,14 +2124,14 @@ class Benchmark {
batch.Put(keys[i], value);
}
s = db_->Write(writeoptions, &batch);
s = db->Write(writeoptions, &batch);
return s;
}
// Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
// in DB atomically i.e in a single batch. Also refer GetMany.
Status DeleteMany(const WriteOptions& writeoptions,
Status DeleteMany(DB* db, const WriteOptions& writeoptions,
const Slice& key) {
std::string suffixes[3] = {"1", "2", "0"};
std::string keys[3];
@ -2051,26 +2143,26 @@ class Benchmark {
batch.Delete(keys[i]);
}
s = db_->Write(writeoptions, &batch);
s = db->Write(writeoptions, &batch);
return s;
}
// Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
// in the same snapshot, and verifies that all the values are identical.
// ASSUMES that PutMany was used to put (K, V) into the DB.
Status GetMany(const ReadOptions& readoptions,
const Slice& key, std::string* value) {
Status GetMany(DB* db, const ReadOptions& readoptions, const Slice& key,
std::string* value) {
std::string suffixes[3] = {"0", "1", "2"};
std::string keys[3];
Slice key_slices[3];
std::string values[3];
ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = db_->GetSnapshot();
readoptionscopy.snapshot = db->GetSnapshot();
Status s;
for (int i = 0; i < 3; i++) {
keys[i] = key.ToString() + suffixes[i];
key_slices[i] = keys[i];
s = db_->Get(readoptionscopy, key_slices[i], value);
s = db->Get(readoptionscopy, key_slices[i], value);
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
values[i] = "";
@ -2082,7 +2174,7 @@ class Benchmark {
values[i] = *value;
}
}
db_->ReleaseSnapshot(readoptionscopy.snapshot);
db->ReleaseSnapshot(readoptionscopy.snapshot);
if ((values[0] != values[1]) || (values[1] != values[2])) {
fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n",
@ -2119,6 +2211,7 @@ class Benchmark {
// the number of iterations is the larger of read_ or write_
for (int64_t i = 0; i < readwrites_; i++) {
DB* db = SelectDB(thread);
if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
// one batch completed, reinitialize for next batch
get_weight = FLAGS_readwritepercent;
@ -2129,7 +2222,7 @@ class Benchmark {
FLAGS_numdistinct, &key);
if (get_weight > 0) {
// do all the gets first
Status s = GetMany(options, key, &value);
Status s = GetMany(db, options, key, &value);
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
// we continue after error rather than exiting so that we can
@ -2142,7 +2235,7 @@ class Benchmark {
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
Status s = PutMany(write_options_, key, gen.Generate(value_size_));
Status s = PutMany(db, write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
exit(1);
@ -2150,7 +2243,7 @@ class Benchmark {
put_weight--;
puts_done++;
} else if (delete_weight > 0) {
Status s = DeleteMany(write_options_, key);
Status s = DeleteMany(db, write_options_, key);
if (!s.ok()) {
fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
exit(1);
@ -2187,6 +2280,7 @@ class Benchmark {
// the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
if (get_weight == 0 && put_weight == 0) {
// one batch completed, reinitialize for next batch
@ -2195,7 +2289,7 @@ class Benchmark {
}
if (get_weight > 0) {
// do all the gets first
Status s = db_->Get(options, key, &value);
Status s = db->Get(options, key, &value);
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
// we continue after error rather than exiting so that we can
@ -2208,7 +2302,7 @@ class Benchmark {
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
Status s = db->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -2216,7 +2310,7 @@ class Benchmark {
put_weight--;
writes_done++;
}
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedSingleOp(db);
}
char msg[100];
snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
@ -2238,18 +2332,19 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard(key.data());
// the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
if (db_->Get(options, key, &value).ok()) {
if (db->Get(options, key, &value).ok()) {
found++;
}
Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
Status s = db->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedSingleOp(db);
}
char msg[100];
snprintf(msg, sizeof(msg),
@ -2271,10 +2366,11 @@ class Benchmark {
// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
// Get the existing value
if (db_->Get(options, key, &value).ok()) {
if (db->Get(options, key, &value).ok()) {
found++;
} else {
// If not existing, then just assume an empty string of data
@ -2290,7 +2386,7 @@ class Benchmark {
value.append(operand.data(), operand.size());
// Write back to the database
Status s = db_->Put(write_options_, key, value);
Status s = db->Put(write_options_, key, value);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -2322,9 +2418,10 @@ class Benchmark {
// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
Status s = db_->Merge(write_options_, key, gen.Generate(value_size_));
Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
@ -2360,12 +2457,13 @@ class Benchmark {
// the number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
if (do_merge) {
Status s = db_->Merge(write_options_, key, gen.Generate(value_size_));
Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
@ -2374,7 +2472,7 @@ class Benchmark {
num_merges++;
} else {
Status s = db_->Get(options, key, &value);
Status s = db->Get(options, key, &value);
if (value.length() > max_length)
max_length = value.length();
@ -2402,12 +2500,25 @@ class Benchmark {
}
void Compact(ThreadState* thread) {
db_->CompactRange(nullptr, nullptr);
DB* db = SelectDB(thread);
db->CompactRange(nullptr, nullptr);
}
void PrintStats(const char* key) {
if (db_ != nullptr) {
PrintStats(db_, key, false);
}
for (DB* db : multi_dbs_) {
PrintStats(db, key, true);
}
}
void PrintStats(DB* db, const char* key, bool print_header = false) {
if (print_header) {
fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
}
std::string stats;
if (!db_->GetProperty(key, &stats)) {
if (!db->GetProperty(key, &stats)) {
stats = "(failed)";
}
fprintf(stdout, "\n%s\n", stats.c_str());