db_bench: add IteratorCreationWhileWriting mode and allow prefix_seek

Summary: as title

Test Plan: ran it

Reviewers: igor, haobo, yhchiang

Reviewed By: igor

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17655
This commit is contained in:
Lei Jin 2014-04-10 10:15:59 -07:00
parent ca4fa2047e
commit 7a92537fc4

View File

@ -47,6 +47,8 @@ DEFINE_string(benchmarks,
"overwrite," "overwrite,"
"readrandom," "readrandom,"
"readrandom," "readrandom,"
"newiterator,"
"newiteratorwhilewriting,"
"readseq," "readseq,"
"readreverse," "readreverse,"
"compact," "compact,"
@ -1172,6 +1174,9 @@ class Benchmark {
method = &Benchmark::ReadRandom; method = &Benchmark::ReadRandom;
} else if (name == Slice("newiterator")) { } else if (name == Slice("newiterator")) {
method = &Benchmark::IteratorCreation; method = &Benchmark::IteratorCreation;
} else if (name == Slice("newiteratorwhilewriting")) {
num_threads++; // Add extra thread for writing
method = &Benchmark::IteratorCreationWhileWriting;
} else if (name == Slice("seekrandom")) { } else if (name == Slice("seekrandom")) {
method = &Benchmark::SeekRandom; method = &Benchmark::SeekRandom;
} else if (name == Slice("readrandomsmall")) { } else if (name == Slice("readrandomsmall")) {
@ -1864,6 +1869,7 @@ class Benchmark {
void IteratorCreation(ThreadState* thread) { void IteratorCreation(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_); Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true); ReadOptions options(FLAGS_verify_checksum, true);
options.prefix_seek = (FLAGS_prefix_size > 0);
while (!duration.Done(1)) { while (!duration.Done(1)) {
Iterator* iter = db_->NewIterator(options); Iterator* iter = db_->NewIterator(options);
delete iter; delete iter;
@ -1871,6 +1877,14 @@ class Benchmark {
} }
} }
void IteratorCreationWhileWriting(ThreadState* thread) {
if (thread->tid > 0) {
IteratorCreation(thread);
} else {
BGWriter(thread);
}
}
void SeekRandom(ThreadState* thread) { void SeekRandom(ThreadState* thread) {
int64_t read = 0; int64_t read = 0;
int64_t found = 0; int64_t found = 0;
@ -1934,53 +1948,57 @@ class Benchmark {
if (thread->tid > 0) { if (thread->tid > 0) {
ReadRandom(thread); ReadRandom(thread);
} else { } else {
// Special thread that keeps writing until other threads are done. BGWriter(thread);
RandomGenerator gen; }
double last = FLAGS_env->NowMicros(); }
int writes_per_second_by_10 = 0;
int num_writes = 0;
// --writes_per_second rate limit is enforced per 100 milliseconds void BGWriter(ThreadState* thread) {
// intervals to avoid a burst of writes at the start of each second. // Special thread that keeps writing until other threads are done.
RandomGenerator gen;
double last = FLAGS_env->NowMicros();
int writes_per_second_by_10 = 0;
int num_writes = 0;
if (FLAGS_writes_per_second > 0) // --writes_per_second rate limit is enforced per 100 milliseconds
writes_per_second_by_10 = FLAGS_writes_per_second / 10; // intervals to avoid a burst of writes at the start of each second.
// Don't merge stats from this thread with the readers. if (FLAGS_writes_per_second > 0)
thread->stats.SetExcludeFromMerge(); writes_per_second_by_10 = FLAGS_writes_per_second / 10;
Slice key = AllocateKey(); // Don't merge stats from this thread with the readers.
std::unique_ptr<const char[]> key_guard(key.data()); thread->stats.SetExcludeFromMerge();
while (true) { Slice key = AllocateKey();
{ std::unique_ptr<const char[]> key_guard(key.data());
MutexLock l(&thread->shared->mu);
if (thread->shared->num_done + 1 >= thread->shared->num_initialized) { while (true) {
// Other threads have finished {
break; MutexLock l(&thread->shared->mu);
} if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
// Other threads have finished
break;
} }
}
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str()); fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1); exit(1);
} }
thread->stats.FinishedSingleOp(db_); thread->stats.FinishedSingleOp(db_);
++num_writes; ++num_writes;
if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) { if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) {
double now = FLAGS_env->NowMicros(); double now = FLAGS_env->NowMicros();
double usecs_since_last = now - last; double usecs_since_last = now - last;
num_writes = 0; num_writes = 0;
last = now; last = now;
if (usecs_since_last < 100000.0) { if (usecs_since_last < 100000.0) {
FLAGS_env->SleepForMicroseconds(100000.0 - usecs_since_last); FLAGS_env->SleepForMicroseconds(100000.0 - usecs_since_last);
last = FLAGS_env->NowMicros(); last = FLAGS_env->NowMicros();
}
} }
} }
} }