[Rocksdb] [Multiget] Introduced multiget into db_bench

Summary:
Preliminary! Introduced the --use_multiget=1 and --keys_per_multiget=n
flags for db_bench. Also updated and tested the ReadRandom() method
to include an option to use multiget. By default,
keys_per_multiget=100.

Preliminary tests imply that multiget is at least 1.25x faster per
key than regular get.

Will continue adding Multiget for ReadMissing, ReadHot,
RandomWithVerify, ReadRandomWriteRandom; soon. Will also think
about ways to better verify benchmarks.

Test Plan:
1. make db_bench
2. ./db_bench --benchmarks=fillrandom
3. ./db_bench --benchmarks=readrandom --use_existing_db=1
	      --use_multiget=1 --threads=4 --keys_per_multiget=100
4. ./db_bench --benchmarks=readrandom --use_existing_db=1
	      --threads=4
5. Verify ops/sec (and 1000000 of 1000000 keys found)

Reviewers: haobo, MarkCallaghan, dhruba

Reviewed By: MarkCallaghan

CC: leveldb

Differential Revision: https://reviews.facebook.net/D11127
This commit is contained in:
Deon Nicholas 2013-06-12 12:42:21 -07:00
parent bdf1085944
commit 4985a9f73b
2 changed files with 274 additions and 69 deletions

View File

@ -301,6 +301,17 @@ static bool FLAGS_advise_random_on_open =
static auto FLAGS_compaction_fadvice =
leveldb::Options().access_hint_on_compaction_start;
// Use multiget to access a series of keys instead of get
static bool FLAGS_use_multiget = false;
// If FLAGS_use_multiget is true, determines number of keys to group per call
// Arbitrary default. 90 is good because it agrees with FLAGS_readwritepercent
static long FLAGS_keys_per_multiget = 90;
// Print a message to user when a key is missing in a Get/MultiGet call
// TODO: Apply this flag to generic Get calls too. Currently only with Multiget
static bool FLAGS_warn_missing_keys = true;
// Use adaptive mutex
static auto FLAGS_use_adaptive_mutex =
leveldb::Options().use_adaptive_mutex;
@ -497,7 +508,7 @@ class Stats {
fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
name.ToString().c_str(),
seconds_ * 1e6 / done_,
elapsed * 1e6 / done_,
(long)throughput,
(extra.empty() ? "" : " "),
extra.c_str());
@ -550,10 +561,12 @@ class Duration {
}
bool Done(int increment) {
if (increment <= 0) increment = 1; // avoid Done(0) and infinite loops
ops_ += increment;
if (max_seconds_) {
if (!(ops_ % 1000)) {
// Recheck every appx 1000 ops (exact iff increment is factor of 1000)
if ((ops_/1000) != ((ops_-increment)/1000)) {
double now = FLAGS_env->NowMicros();
return ((now - start_at_) / 1000000.0) >= max_seconds_;
} else {
@ -712,7 +725,7 @@ class Benchmark {
HistogramData histogramData;
dbstats->histogramData(histogram_type, &histogramData);
fprintf(stdout, "%s statistics Percentiles :", name.c_str());
fprintf(stdout, "50 : %f ",histogramData.median);
fprintf(stdout, "50 : %f ", histogramData.median);
fprintf(stdout, "95 : %f ", histogramData.percentile95);
fprintf(stdout, "99 : %f\n", histogramData.percentile99);
}
@ -743,6 +756,7 @@ class Benchmark {
PrintHistogram(WAL_FILE_SYNC_MICROS, "WAL FILE SYNC MICROS");
PrintHistogram(MANIFEST_FILE_SYNC_MICROS, "Manifest SYNC MICROS");
PrintHistogram(TABLE_OPEN_IO_MICROS, "Table Open IO Micros");
PrintHistogram(DB_MULTIGET, "DB_MULTIGET");
}
}
@ -1244,80 +1258,185 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
thread->stats.AddBytes(bytes);
}
// Calls MultiGet over a list of keys from a random distribution.
// Returns the total number of keys found.
long MultiGetRandom(ReadOptions& options, int num_keys,
Random& rand, int range, const char* suffix) {
assert(num_keys > 0);
std::vector<Slice> keys(num_keys);
std::vector<std::string> values(num_keys);
std::vector<unique_ptr<char []> > gen_keys(num_keys);
int i;
long k;
// Fill the keys vector
for(i=0; i<num_keys; ++i) {
k = rand.Next() % range;
gen_keys[i] = GenerateKeyFromInt(k,suffix);
keys[i] = gen_keys[i].get();
}
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
// Apply the operation
std::vector<Status> statuses = db_->MultiGet(options, keys, &values);
assert((long)statuses.size() == num_keys);
assert((long)keys.size() == num_keys); // Should always be the case.
assert((long)values.size() == num_keys);
if (FLAGS_use_snapshot) {
db_->ReleaseSnapshot(options.snapshot);
options.snapshot = nullptr;
}
// Count number found
long found = 0;
for(i=0; i<num_keys; ++i) {
if (statuses[i].ok()){
++found;
} else if (FLAGS_warn_missing_keys == true) {
// Key not found, or error.
fprintf(stderr, "get error: %s\n", statuses[i].ToString().c_str());
}
}
return found;
}
void ReadRandom(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
Iterator* iter = db_->NewIterator(options);
Duration duration(FLAGS_duration, reads_);
std::string value;
long found = 0;
while (!duration.Done(1)) {
const int k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
if (FLAGS_use_multiget) { // MultiGet
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
long keys_left = reads_;
// Recalculate number of keys per group, and call MultiGet until done
long num_keys;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num,"");
thread->stats.FinishedSingleOp(db_);
keys_left -= num_keys;
}
if (FLAGS_read_range < 2) {
if (db_->Get(options, key.get(), &value).ok()) {
found++;
}
} else {
Slice skey(key.get());
int count = 1;
if (FLAGS_get_approx) {
unique_ptr<char []> key2 =
GenerateKeyFromInt(k + (int) FLAGS_read_range);
Slice skey2(key2.get());
Range range(skey, skey2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
} else { // Regular case. Do one "get" at a time Get
Iterator* iter = db_->NewIterator(options);
std::string value;
while (!duration.Done(1)) {
const int k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
for (iter->Seek(skey);
iter->Valid() && count <= FLAGS_read_range;
++count, iter->Next()) {
found++;
if (FLAGS_read_range < 2) {
if (db_->Get(options, key.get(), &value).ok()) {
found++;
}
} else {
Slice skey(key.get());
int count = 1;
if (FLAGS_get_approx) {
unique_ptr<char []> key2 =
GenerateKeyFromInt(k + (int) FLAGS_read_range);
Slice skey2(key2.get());
Range range(skey, skey2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}
for (iter->Seek(skey);
iter->Valid() && count <= FLAGS_read_range;
++count, iter->Next()) {
found++;
}
}
if (FLAGS_use_snapshot) {
db_->ReleaseSnapshot(options.snapshot);
options.snapshot = nullptr;
}
thread->stats.FinishedSingleOp(db_);
}
if (FLAGS_use_snapshot) {
db_->ReleaseSnapshot(options.snapshot);
options.snapshot = nullptr;
}
thread->stats.FinishedSingleOp(db_);
delete iter;
}
delete iter;
char msg[100];
snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_);
snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, reads_);
thread->stats.AddMessage(msg);
}
void ReadMissing(ThreadState* thread) {
void ReadMissing(ThreadState* thread) {
FLAGS_warn_missing_keys = false; // Never warn about missing keys
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
std::string value;
while (!duration.Done(1)) {
const int k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k, ".");
db_->Get(options, key.get(), &value);
thread->stats.FinishedSingleOp(db_);
if (FLAGS_use_multiget) {
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
long keys_left = reads_;
// Recalculate number of keys per group, and call MultiGet until done
long num_keys;
long found;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
found = MultiGetRandom(options, num_keys, thread->rand, FLAGS_num,".");
assert(!found);
thread->stats.FinishedSingleOp(db_);
keys_left -= num_keys;
}
} else { // Regular case (not MultiGet)
std::string value;
Status s;
while (!duration.Done(1)) {
const int k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k, ".");
s = db_->Get(options, key.get(), &value);
assert(!s.ok() && s.IsNotFound());
thread->stats.FinishedSingleOp(db_);
}
}
}
void ReadHot(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
std::string value;
const long range = (FLAGS_num + 99) / 100;
while (!duration.Done(1)) {
const int k = thread->rand.Next() % range;
unique_ptr<char []> key = GenerateKeyFromInt(k);
db_->Get(options, key.get(), &value);
thread->stats.FinishedSingleOp(db_);
long found = 0;
if (FLAGS_use_multiget) {
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
long keys_left = reads_;
// Recalculate number of keys per group, and call MultiGet until done
long num_keys;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
found += MultiGetRandom(options, num_keys, thread->rand, range, "");
thread->stats.FinishedSingleOp(db_);
keys_left -= num_keys;
}
} else {
std::string value;
while (!duration.Done(1)) {
const int k = thread->rand.Next() % range;
unique_ptr<char []> key = GenerateKeyFromInt(k);
if (db_->Get(options, key.get(), &value).ok()){
++found;
}
thread->stats.FinishedSingleOp(db_);
}
}
char msg[100];
snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, reads_);
thread->stats.AddMessage(msg);
}
void SeekRandom(ThreadState* thread) {
@ -1424,8 +1543,8 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
}
// Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
// in DB atomically i.e in a single batch. Also refer MultiGet.
Status MultiPut(const WriteOptions& writeoptions,
// in DB atomically i.e in a single batch. Also refer GetMany.
Status PutMany(const WriteOptions& writeoptions,
const Slice& key, const Slice& value) {
std::string suffixes[3] = {"2", "1", "0"};
std::string keys[3];
@ -1443,8 +1562,8 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
// Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
// in DB atomically i.e in a single batch. Also refer MultiGet.
Status MultiDelete(const WriteOptions& writeoptions,
// in DB atomically i.e in a single batch. Also refer GetMany.
Status DeleteMany(const WriteOptions& writeoptions,
const Slice& key) {
std::string suffixes[3] = {"1", "2", "0"};
std::string keys[3];
@ -1462,8 +1581,8 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
// Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
// in the same snapshot, and verifies that all the values are identical.
// ASSUMES that MultiPut was used to put (K, V) into the DB.
Status MultiGet(const ReadOptions& readoptions,
// ASSUMES that PutMany was used to put (K, V) into the DB.
Status GetMany(const ReadOptions& readoptions,
const Slice& key, std::string* value) {
std::string suffixes[3] = {"0", "1", "2"};
std::string keys[3];
@ -1501,11 +1620,12 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
}
// Differs from readrandomwriterandom in the following ways:
// (a) Uses MultiGet/MultiPut to read/write key values. Refer to those funcs.
// (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
// (b) Does deletes as well (per FLAGS_deletepercent)
// (c) In order to achieve high % of 'found' during lookups, and to do
// multiple writes (including puts and deletes) it uses upto
// FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
// (d) Does not have a MultiGet option.
void RandomWithVerify(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
@ -1517,21 +1637,22 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
long gets_done = 0;
long puts_done = 0;
long deletes_done = 0;
// the number of iterations is the larger of read_ or write_
for (long i = 0; i < readwrites_; i++) {
const int k = thread->rand.Next() % (FLAGS_numdistinct);
unique_ptr<char []> key = GenerateKeyFromInt(k);
if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
// one batch complated, reinitialize for next batch
// one batch completed, reinitialize for next batch
get_weight = FLAGS_readwritepercent;
delete_weight = FLAGS_deletepercent;
put_weight = 100 - get_weight - delete_weight;
}
if (get_weight > 0) {
// do all the gets first
Status s = MultiGet(options, key.get(), &value);
Status s = GetMany(options, key.get(), &value);
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
} else if (!s.IsNotFound()) {
@ -1542,17 +1663,17 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
Status s = MultiPut(write_options_, key.get(), gen.Generate(value_size_));
Status s = PutMany(write_options_, key.get(), gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
exit(1);
}
put_weight--;
puts_done++;
} else if (delete_weight > 0) {
Status s = MultiDelete(write_options_, key.get());
Status s = DeleteMany(write_options_, key.get());
if (!s.ok()) {
fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str());
fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
exit(1);
}
delete_weight--;
@ -1572,6 +1693,12 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
// an extra thread.
//
void ReadRandomWriteRandom(ThreadState* thread) {
if (FLAGS_use_multiget){
// Separate function for multiget (for ease of reading)
ReadRandomWriteRandomMultiGet(thread);
return;
}
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
@ -1617,9 +1744,6 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
found++;
}
if (db_->Get(options, key.get(), &value).ok()) {
found++;
}
get_weight--;
reads_done++;
@ -1646,9 +1770,85 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
thread->stats.AddMessage(msg);
}
// ReadRandomWriteRandom (with multiget)
// Does FLAGS_keys_per_multiget reads (per multiget), followed by some puts.
// FLAGS_readwritepercent will specify the ratio of gets to puts.
// e.g.: If FLAGS_keys_per_multiget == 100 and FLAGS_readwritepercent == 75
// Then each block will do 100 multigets and 33 puts
// So there are 133 operations in-total: 100 of them (75%) are gets, and 33
// of them (25%) are puts.
void ReadRandomWriteRandomMultiGet(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
// For multiget
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
long keys_left = readwrites_; // number of keys still left to read
long num_keys; // number of keys to read in current group
long num_put_keys; // number of keys to put in current group
long found = 0;
long reads_done = 0;
long writes_done = 0;
long multigets_done = 0;
// the number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while(true) {
// Read num_keys keys, then write num_put_keys keys.
// The ratio of num_keys to num_put_keys is always FLAGS_readwritepercent
// And num_keys is set to be FLAGS_keys_per_multiget (kpg)
// num_put_keys is calculated accordingly (to maintain the ratio)
// Note: On the final iteration, num_keys and num_put_keys will be smaller
num_keys = std::min(keys_left*(FLAGS_readwritepercent + 99)/100, kpg);
num_put_keys = num_keys * (100-FLAGS_readwritepercent)
/ FLAGS_readwritepercent;
// This will break the loop when duration is complete
if (duration.Done(num_keys + num_put_keys)) {
break;
}
// A quick check to make sure our formula doesn't break on edge cases
assert(num_keys >= 1);
assert(num_keys + num_put_keys <= keys_left);
// Apply the MultiGet operations
found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num,"");
++multigets_done;
reads_done+=num_keys;
thread->stats.FinishedSingleOp(db_);
// Now do the puts
int i;
long k;
for(i=0; i<num_put_keys; ++i) {
k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);
Status s = db_->Put(write_options_, key.get(),
gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
writes_done++;
thread->stats.FinishedSingleOp(db_);
}
keys_left -= (num_keys + num_put_keys);
}
char msg[100];
snprintf(msg, sizeof(msg),
"( reads:%ld writes:%ld total:%ld multiget_ops:%ld found:%ld)",
reads_done, writes_done, readwrites_, multigets_done, found);
thread->stats.AddMessage(msg);
}
//
// Read-modify-write for random keys
//
// TODO: Implement MergeOperator tests here (Read-modify-write)
void UpdateRandom(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
@ -1971,6 +2171,12 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--use_adaptive_mutex=%d%c", &n, &junk) == 1
&& (n == 0 || n ==1 )) {
FLAGS_use_adaptive_mutex = n;
} else if (sscanf(argv[i], "--use_multiget=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_use_multiget = n;
} else if (sscanf(argv[i], "--keys_per_multiget=%d%c",
&n, &junk) == 1) {
FLAGS_keys_per_multiget = n;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);

View File

@ -2087,7 +2087,6 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// value will contain the current merge operand in the latter case.
// TODO: Maybe these could be run concurrently?
for(int i=0; i<numKeys; ++i) {
Status& s = statList[i];
std::string* value = &(*values)[i];