Adding Column Family support in db_bench.

Summary:
Adding num_column_families flag. Adding support for column families in DoWrite and ReadRandom methods.
[Igor, please let me know if this approach sounds good. I shall add it to other methods too.]

Test Plan: Ran fillseq on 1M keys and 10 Column families and ran readrandom.

Reviewers: sdong, yhchiang, igor, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D21387
This commit is contained in:
Radheshyam Balasundaram 2014-08-18 18:15:01 -07:00
parent 28b5c76004
commit 162b8151f1

View File

@ -146,6 +146,7 @@ DEFINE_int64(merge_keys, -1,
"Number of distinct keys to use for MergeRandom and "
"ReadRandomMergeRandom. "
"If negative, there will be FLAGS_num keys.");
DEFINE_int32(num_column_families, 1, "Number of Column Families to use.");
DEFINE_int64(reads, -1, "Number of read operations to do. "
"If negative, do FLAGS_num reads.");
@ -847,8 +848,15 @@ class Benchmark {
shared_ptr<Cache> compressed_cache_;
const FilterPolicy* filter_policy_;
const SliceTransform* prefix_extractor_;
DB* db_;
std::vector<DB*> multi_dbs_;
struct DBWithColumnFamilies {
std::vector<ColumnFamilyHandle*> cfh;
DB* db;
DBWithColumnFamilies() : db(nullptr) {
cfh.clear();
}
};
DBWithColumnFamilies db_;
std::vector<DBWithColumnFamilies> multi_dbs_;
int64_t num_;
int value_size_;
int key_size_;
@ -1068,7 +1076,6 @@ class Benchmark {
? NewBloomFilterPolicy(FLAGS_bloom_bits)
: nullptr),
prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
db_(nullptr),
num_(FLAGS_num),
value_size_(FLAGS_value_size),
key_size_(FLAGS_key_size),
@ -1099,7 +1106,7 @@ class Benchmark {
}
~Benchmark() {
delete db_;
delete db_.db;
delete filter_policy_;
delete prefix_extractor_;
}
@ -1159,6 +1166,16 @@ class Benchmark {
return base_name + std::to_string(id);
}
std::string ColumnFamilyName(int i) {
if (i == 0) {
return kDefaultColumnFamilyName;
} else {
char name[100];
snprintf(name, sizeof(name), "column_family_name_%06d", i);
return std::string(name);
}
}
void Run() {
if (!SanityCheck()) {
exit(1);
@ -1313,13 +1330,14 @@ class Benchmark {
name.ToString().c_str());
method = nullptr;
} else {
if (db_ != nullptr) {
delete db_;
db_ = nullptr;
if (db_.db != nullptr) {
delete db_.db;
db_.db = nullptr;
db_.cfh.clear();
DestroyDB(FLAGS_db, Options());
}
for (size_t i = 0; i < multi_dbs_.size(); i++) {
delete multi_dbs_[i];
delete multi_dbs_[i].db;
DestroyDB(GetDbNameForMultiple(FLAGS_db, i), Options());
}
multi_dbs_.clear();
@ -1617,9 +1635,10 @@ class Benchmark {
}
void Open() {
assert(db_ == nullptr);
assert(db_.db == nullptr);
Options options;
options.create_if_missing = !FLAGS_use_existing_db;
options.create_missing_column_families = FLAGS_num_column_families > 1;
options.block_cache = cache_;
options.block_cache_compressed = compressed_cache_;
if (cache_ == nullptr) {
@ -1816,10 +1835,9 @@ class Benchmark {
OpenDb(options, FLAGS_db, &db_);
} else {
multi_dbs_.clear();
multi_dbs_.resize(FLAGS_num_multi_db);
for (int i = 0; i < FLAGS_num_multi_db; i++) {
DB* db;
OpenDb(options, GetDbNameForMultiple(FLAGS_db, i), &db);
multi_dbs_.push_back(db);
OpenDb(options, GetDbNameForMultiple(FLAGS_db, i), &multi_dbs_[i]);
}
}
if (FLAGS_min_level_to_compress >= 0) {
@ -1827,12 +1845,27 @@ class Benchmark {
}
}
void OpenDb(Options options, std::string db_name, DB** db) {
void OpenDb(const Options& options, const std::string& db_name,
DBWithColumnFamilies* db) {
Status s;
if(FLAGS_readonly) {
s = DB::OpenForReadOnly(options, db_name, db);
// Open with column families if necessary.
if (FLAGS_num_column_families > 1) {
db->cfh.resize(FLAGS_num_column_families);
std::vector<ColumnFamilyDescriptor> column_families;
for (int i = 0; i < FLAGS_num_column_families; i++) {
column_families.push_back(ColumnFamilyDescriptor(
ColumnFamilyName(i), ColumnFamilyOptions(options)));
}
if (FLAGS_readonly) {
s = DB::OpenForReadOnly(options, db_name, column_families,
&db->cfh, &db->db);
} else {
s = DB::Open(options, db_name, db);
s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
}
} else if (FLAGS_readonly) {
s = DB::OpenForReadOnly(options, db_name, &db->db);
} else {
s = DB::Open(options, db_name, &db->db);
}
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
@ -1900,10 +1933,18 @@ class Benchmark {
};
DB* SelectDB(ThreadState* thread) {
if (db_ != nullptr) {
return db_;
return SelectDBWithCfh(thread)->db;
}
DBWithColumnFamilies* SelectDBWithCfh(ThreadState* thread) {
return SelectDBWithCfh(thread->rand.Next());
}
DBWithColumnFamilies* SelectDBWithCfh(uint64_t rand_int) {
if (db_.db != nullptr) {
return &db_;
} else {
return multi_dbs_[thread->rand.Next() % multi_dbs_.size()];
return &multi_dbs_[rand_int % multi_dbs_.size()];
}
}
@ -1912,7 +1953,7 @@ class Benchmark {
const int64_t num_ops = writes_ == 0 ? num_ : writes_;
size_t num_key_gens = 1;
if (db_ == nullptr) {
if (db_.db == nullptr) {
num_key_gens = multi_dbs_.size();
}
std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
@ -1935,20 +1976,25 @@ class Benchmark {
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
while (!duration.Done(entries_per_batch_)) {
size_t id = 0;
DB* db_to_write = db_;
if (db_to_write == nullptr) {
id = thread->rand.Next() % num_key_gens;
db_to_write = multi_dbs_[id];
}
size_t id = thread->rand.Next() % num_key_gens;
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
batch.Clear();
for (int64_t j = 0; j < entries_per_batch_; j++) {
GenerateKeyFromInt(key_gens[id]->Next(), FLAGS_num, &key);
int64_t rand_num = key_gens[id]->Next();
GenerateKeyFromInt(rand_num, FLAGS_num, &key);
if (FLAGS_num_column_families <= 1) {
batch.Put(key, gen.Generate(value_size_));
} else {
// We use same rand_num as seed for key and column family so that we
// can deterministically find the cfh corresponding to a particular
// key while reading the key.
batch.Put(db_with_cfh->cfh[rand_num % db_with_cfh->cfh.size()],
key, gen.Generate(value_size_));
}
bytes += value_size_ + key_size_;
}
s = db_to_write->Write(write_options_, &batch);
thread->stats.FinishedOps(db_to_write, entries_per_batch_);
s = db_with_cfh->db->Write(write_options_, &batch);
thread->stats.FinishedOps(db_with_cfh->db, entries_per_batch_);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -1958,11 +2004,11 @@ class Benchmark {
}
void ReadSequential(ThreadState* thread) {
if (db_ != nullptr) {
ReadSequential(thread, db_);
if (db_.db != nullptr) {
ReadSequential(thread, db_.db);
} else {
for (DB* db : multi_dbs_) {
ReadSequential(thread, db);
for (const auto& db_with_cfh : multi_dbs_) {
ReadSequential(thread, db_with_cfh.db);
}
}
}
@ -1981,11 +2027,11 @@ class Benchmark {
}
void ReadReverse(ThreadState* thread) {
if (db_ != nullptr) {
ReadReverse(thread, db_);
if (db_.db != nullptr) {
ReadReverse(thread, db_.db);
} else {
for (DB* db : multi_dbs_) {
ReadReverse(thread, db);
for (const auto& db_with_cfh : multi_dbs_) {
ReadReverse(thread, db_with_cfh.db);
}
}
}
@ -1996,7 +2042,7 @@ class Benchmark {
int64_t bytes = 0;
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedOps(db_, 1);
thread->stats.FinishedOps(db, 1);
++i;
}
delete iter;
@ -2013,13 +2059,24 @@ class Benchmark {
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
// We use same key_rand as seed for key and column family so that we can
// deterministically find the cfh corresponding to a particular key, as it
// is done in DoWrite method.
int64_t key_rand = thread->rand.Next() % FLAGS_num;
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
read++;
if (db->Get(options, key, &value).ok()) {
Status s;
if (FLAGS_num_column_families > 1) {
s = db_with_cfh->db->Get(options,
db_with_cfh->cfh[key_rand % db_with_cfh->cfh.size()], key, &value);
} else {
s = db_with_cfh->db->Get(options, key, &value);
}
if (s.ok()) {
found++;
}
thread->stats.FinishedOps(db_, 1);
thread->stats.FinishedOps(db_with_cfh->db, 1);
}
char msg[100];
@ -2061,6 +2118,7 @@ class Benchmark {
++found;
}
}
thread->stats.FinishedOps(db, entries_per_batch_);
}
for (auto& k : keys) {
delete k.data();
@ -2099,11 +2157,11 @@ class Benchmark {
Iterator* single_iter = nullptr;
std::vector<Iterator*> multi_iters;
if (db_ != nullptr) {
single_iter = db_->NewIterator(options);
if (db_.db != nullptr) {
single_iter = db_.db->NewIterator(options);
} else {
for (DB* db : multi_dbs_) {
multi_iters.push_back(db->NewIterator(options));
for (const auto& db_with_cfh : multi_dbs_) {
multi_iters.push_back(db_with_cfh.db->NewIterator(options));
}
}
uint64_t last_refresh = FLAGS_env->NowMicros();
@ -2116,16 +2174,16 @@ class Benchmark {
if (!FLAGS_use_tailing_iterator && FLAGS_iter_refresh_interval_us >= 0) {
uint64_t now = FLAGS_env->NowMicros();
if (now - last_refresh > (uint64_t)FLAGS_iter_refresh_interval_us) {
if (db_ != nullptr) {
if (db_.db != nullptr) {
delete single_iter;
single_iter = db_->NewIterator(options);
single_iter = db_.db->NewIterator(options);
} else {
for (auto iter : multi_iters) {
delete iter;
}
multi_iters.clear();
for (DB* db : multi_dbs_) {
multi_iters.push_back(db->NewIterator(options));
for (const auto& db_with_cfh : multi_dbs_) {
multi_iters.push_back(db_with_cfh.db->NewIterator(options));
}
}
}
@ -2143,7 +2201,7 @@ class Benchmark {
if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
found++;
}
thread->stats.FinishedOps(db_, 1);
thread->stats.FinishedOps(db_.db, 1);
}
delete single_iter;
for (auto iter : multi_iters) {
@ -2243,7 +2301,7 @@ class Benchmark {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedOps(db_, 1);
thread->stats.FinishedOps(db_.db, 1);
++num_writes;
if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) {
@ -2403,7 +2461,7 @@ class Benchmark {
deletes_done++;
}
thread->stats.FinishedOps(db_, 1);
thread->stats.FinishedOps(db_.db, 1);
}
char msg[100];
snprintf(msg, sizeof(msg),
@ -2542,7 +2600,7 @@ class Benchmark {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedOps(db_, 1);
thread->stats.FinishedOps(db, 1);
}
char msg[100];
@ -2578,7 +2636,7 @@ class Benchmark {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedOps(db_, 1);
thread->stats.FinishedOps(db, 1);
}
// Print some statistics
@ -2639,7 +2697,7 @@ class Benchmark {
}
thread->stats.FinishedOps(db_, 1);
thread->stats.FinishedOps(db, 1);
}
char msg[100];
@ -2656,11 +2714,11 @@ class Benchmark {
}
void PrintStats(const char* key) {
if (db_ != nullptr) {
PrintStats(db_, key, false);
if (db_.db != nullptr) {
PrintStats(db_.db, key, false);
}
for (DB* db : multi_dbs_) {
PrintStats(db, key, true);
for (const auto& db_with_cfh : multi_dbs_) {
PrintStats(db_with_cfh.db, key, true);
}
}