Add prefix scans to db_stress (and bug fix in prefix scan)

Summary: Added support for prefix scans.

Test Plan: ./db_stress --max_key=4096 --ops_per_thread=10000

Reviewers: dhruba, vamsi

Reviewed By: vamsi

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12267
This commit is contained in:
Tyler Harter 2013-08-14 16:58:36 -07:00
parent 0307c5fe3a
commit 7612d496ff
2 changed files with 199 additions and 37 deletions

View File

@ -64,9 +64,9 @@ void FilterBlockBuilder::AddKey(const Slice& key) {
// prefix(last entry) to get the prefix of the last key. // prefix(last entry) to get the prefix of the last key.
if (prev.size() == 0 || ! SamePrefix(key, prev)) { if (prev.size() == 0 || ! SamePrefix(key, prev)) {
Slice prefix = prefix_extractor_->Transform(key); Slice prefix = prefix_extractor_->Transform(key);
assert(comparator_->Compare(prefix, key) <= 0);
InternalKey internal_prefix_tmp(prefix, 0, kTypeValue); InternalKey internal_prefix_tmp(prefix, 0, kTypeValue);
Slice internal_prefix = internal_prefix_tmp.Encode(); Slice internal_prefix = internal_prefix_tmp.Encode();
assert(comparator_->Compare(internal_prefix, key) <= 0);
start_.push_back(entries_.size()); start_.push_back(entries_.size());
entries_.append(internal_prefix.data(), internal_prefix.size()); entries_.append(internal_prefix.data(), internal_prefix.size());
} }

View File

@ -4,7 +4,7 @@
// //
//The test uses an array to compare against values written to the database. //The test uses an array to compare against values written to the database.
//Keys written to the array are in 1:1 correspondence to the actual values in //Keys written to the array are in 1:1 correspondence to the actual values in
//the database according to the formula in the functino GenerateValue //the database according to the formula in the function GenerateValue
//Space is reserved in the array from 0 to FLAGS_max_key and values are randomly //Space is reserved in the array from 0 to FLAGS_max_key and values are randomly
//written/deleted/read from those positions. During verification we compare all //written/deleted/read from those positions. During verification we compare all
@ -26,6 +26,7 @@
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
#include "leveldb/statistics.h" #include "leveldb/statistics.h"
#include "port/port.h" #include "port/port.h"
#include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/histogram.h" #include "util/histogram.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
@ -43,12 +44,13 @@ static uint32_t FLAGS_seed = 2341234;
// Max number of key/values to place in database // Max number of key/values to place in database
static long FLAGS_max_key = 2 * KB * KB * KB; static long FLAGS_max_key = 2 * KB * KB * KB;
// If set, the test uses MultiGet, MultiPut and MultiDelete that // If set, the test uses MultiGet, MultiPrefixScan, MultiPut and
// do a different kind of validation during the test itself, // MultiDelete that do a different kind of validation during the test
// rather than at the end. This is meant to solve the following // itself, rather than at the end. This is meant to solve the
// problems at the expense of doing less degree of validation. // following problems at the expense of doing less degree of
// (a) No need to acquire mutexes during writes (less cache flushes // validation.
// in multi-core leading to speed up) // (a) No need to acquire mutexes during writes (less cache flushes in
// multi-core leading to speed up)
// (b) No long validation at the end (more speed up) // (b) No long validation at the end (more speed up)
// (c) Also test snapshot and atomicity of batch writes // (c) Also test snapshot and atomicity of batch writes
static bool FLAGS_test_batches_snapshots = false; static bool FLAGS_test_batches_snapshots = false;
@ -156,8 +158,14 @@ static int FLAGS_level0_slowdown_writes_trigger = 8;
// Ratio of reads to total workload (expressed as a percentage) // Ratio of reads to total workload (expressed as a percentage)
static unsigned int FLAGS_readpercent = 10; static unsigned int FLAGS_readpercent = 10;
// Ratio of prefix iterators to total workload (expressed as a percentage)
static unsigned int FLAGS_prefixpercent = 25;
// Ratio of deletes to total workload (expressed as a percentage) // Ratio of deletes to total workload (expressed as a percentage)
static unsigned int FLAGS_delpercent = 30; static unsigned int FLAGS_writepercent = 50;
// Ratio of deletes to total workload (expressed as a percentage)
static unsigned int FLAGS_delpercent = 15;
// Option to disable compation triggered by read. // Option to disable compation triggered by read.
static int FLAGS_disable_seek_compaction = false; static int FLAGS_disable_seek_compaction = false;
@ -191,6 +199,19 @@ static int FLAGS_level0_file_num_compaction_trigger = 0;
namespace leveldb { namespace leveldb {
// convert long to a big-endian slice key
static std::string Key(long val) {
std::string little_endian_key;
std::string big_endian_key;
PutFixed64(&little_endian_key, val);
assert(little_endian_key.size() == sizeof(val));
big_endian_key.resize(sizeof(val));
for (int i=0; i<(int)sizeof(val); i++) {
big_endian_key[i] = little_endian_key[sizeof(val) - 1 - i];
}
return big_endian_key;
}
class StressTest; class StressTest;
namespace { namespace {
@ -200,9 +221,11 @@ class Stats {
double finish_; double finish_;
double seconds_; double seconds_;
long done_; long done_;
long gets_;
long prefixes_;
long writes_; long writes_;
long deletes_; long deletes_;
long gets_; long iterator_size_sums_;
long founds_; long founds_;
long errors_; long errors_;
int next_report_; int next_report_;
@ -217,9 +240,11 @@ class Stats {
next_report_ = 100; next_report_ = 100;
hist_.Clear(); hist_.Clear();
done_ = 0; done_ = 0;
gets_ = 0;
prefixes_ = 0;
writes_ = 0; writes_ = 0;
deletes_ = 0; deletes_ = 0;
gets_ = 0; iterator_size_sums_ = 0;
founds_ = 0; founds_ = 0;
errors_ = 0; errors_ = 0;
bytes_ = 0; bytes_ = 0;
@ -232,9 +257,11 @@ class Stats {
void Merge(const Stats& other) { void Merge(const Stats& other) {
hist_.Merge(other.hist_); hist_.Merge(other.hist_);
done_ += other.done_; done_ += other.done_;
gets_ += other.gets_;
prefixes_ += other.prefixes_;
writes_ += other.writes_; writes_ += other.writes_;
deletes_ += other.deletes_; deletes_ += other.deletes_;
gets_ += other.gets_; iterator_size_sums_ += other.iterator_size_sums_;
founds_ += other.founds_; founds_ += other.founds_;
errors_ += other.errors_; errors_ += other.errors_;
bytes_ += other.bytes_; bytes_ += other.bytes_;
@ -277,15 +304,20 @@ class Stats {
bytes_ += nbytes; bytes_ += nbytes;
} }
void AddDeletes(int n) {
deletes_ += n;
}
void AddGets(int ngets, int nfounds) { void AddGets(int ngets, int nfounds) {
founds_ += nfounds; founds_ += nfounds;
gets_ += ngets; gets_ += ngets;
} }
void AddPrefixes(int nprefixes, int count) {
prefixes_ += nprefixes;
iterator_size_sums_ += count;
}
void AddDeletes(int n) {
deletes_ += n;
}
void AddErrors(int n) { void AddErrors(int n) {
errors_ += n; errors_ += n;
} }
@ -310,6 +342,9 @@ class Stats {
fprintf(stdout, "%-12s: Wrote %ld times\n", "", writes_); fprintf(stdout, "%-12s: Wrote %ld times\n", "", writes_);
fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_); fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_);
fprintf(stdout, "%-12s: %ld/%ld gets found the key\n", "", founds_, gets_); fprintf(stdout, "%-12s: %ld/%ld gets found the key\n", "", founds_, gets_);
fprintf(stdout, "%-12s: Prefix scanned %ld times\n", "", prefixes_);
fprintf(stdout, "%-12s: Iterator size sum is %ld\n", "",
iterator_size_sums_);
fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_); fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_);
if (FLAGS_histogram) { if (FLAGS_histogram) {
@ -492,6 +527,9 @@ class StressTest {
filter_policy_(FLAGS_bloom_bits >= 0 filter_policy_(FLAGS_bloom_bits >= 0
? NewBloomFilterPolicy(FLAGS_bloom_bits) ? NewBloomFilterPolicy(FLAGS_bloom_bits)
: nullptr), : nullptr),
prefix_extractor_(NewFixedPrefixTransform(
FLAGS_test_batches_snapshots ?
sizeof(long) : sizeof(long)-1)),
db_(nullptr), db_(nullptr),
num_times_reopened_(0) { num_times_reopened_(0) {
if (FLAGS_destroy_db_initially) { if (FLAGS_destroy_db_initially) {
@ -509,6 +547,7 @@ class StressTest {
~StressTest() { ~StressTest() {
delete db_; delete db_;
delete filter_policy_; delete filter_policy_;
delete prefix_extractor_;
} }
void Run() { void Run() {
@ -733,6 +772,82 @@ class StressTest {
return s; return s;
} }
// Given a prefix P, this does prefix scans for "0"+P, "1"+P,..."9"+P
// in the same snapshot. Each of these 10 scans returns a series of
// values; each series should be the same length, and it is verified
// for each index i that all the i'th values are of the form "0"+V,
// "1"+V,..."9"+V.
// ASSUMES that MultiPut was used to put (K, V)
Status MultiPrefixScan(ThreadState* thread,
const ReadOptions& readoptions,
const Slice& prefix) {
std::string prefixes[10] = {"0", "1", "2", "3", "4",
"5", "6", "7", "8", "9"};
Slice prefix_slices[10];
ReadOptions readoptionscopy[10];
const Snapshot* snapshot = db_->GetSnapshot();
Iterator* iters[10];
Status s = Status::OK();
for (int i = 0; i < 10; i++) {
prefixes[i] += prefix.ToString();
prefix_slices[i] = prefixes[i];
readoptionscopy[i] = readoptions;
readoptionscopy[i].prefix = &prefix_slices[i];
readoptionscopy[i].snapshot = snapshot;
iters[i] = db_->NewIterator(readoptionscopy[i]);
iters[i]->SeekToFirst();
}
int count = 0;
while (iters[0]->Valid()) {
count++;
std::string values[10];
// get list of all values for this iteration
for (int i = 0; i < 10; i++) {
// no iterator should finish before the first one
assert(iters[i]->Valid());
values[i] = iters[i]->value().ToString();
char expected_first = (prefixes[i])[0];
char actual_first = (values[i])[0];
if (actual_first != expected_first) {
fprintf(stderr, "error expected first = %c actual = %c\n",
expected_first, actual_first);
}
(values[i])[0] = ' '; // blank out the differing character
}
// make sure all values are equivalent
for (int i = 0; i < 10; i++) {
if (values[i] != values[0]) {
fprintf(stderr, "error : inconsistent values for prefix %s: %s, %s\n",
prefix.ToString().c_str(), values[0].c_str(),
values[i].c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
}
iters[i]->Next();
}
}
// cleanup iterators and snapshot
for (int i = 0; i < 10; i++) {
// if the first iterator finished, they should have all finished
assert(!iters[i]->Valid());
assert(iters[i]->status().ok());
delete iters[i];
}
db_->ReleaseSnapshot(snapshot);
if (s.ok()) {
thread->stats.AddPrefixes(1, count);
} else {
thread->stats.AddErrors(1);
}
return s;
}
void OperateDb(ThreadState* thread) { void OperateDb(ThreadState* thread) {
ReadOptions read_opts(FLAGS_verify_checksum, true); ReadOptions read_opts(FLAGS_verify_checksum, true);
WriteOptions write_opts; WriteOptions write_opts;
@ -764,11 +879,12 @@ class StressTest {
} }
long rand_key = thread->rand.Next() % max_key; long rand_key = thread->rand.Next() % max_key;
Slice key((char*)&rand_key, sizeof(rand_key)); std::string keystr = Key(rand_key);
//Read:10%;Delete:30%;Write:60% Slice key = keystr;
unsigned int probability_operation = thread->rand.Uniform(100); int prob_op = thread->rand.Uniform(100);
if (probability_operation < FLAGS_readpercent) {
// read load // OPERATION read?
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
if (!FLAGS_test_batches_snapshots) { if (!FLAGS_test_batches_snapshots) {
Status s = db_->Get(read_opts, key, &from_db); Status s = db_->Get(read_opts, key, &from_db);
if (s.ok()) { if (s.ok()) {
@ -784,19 +900,38 @@ class StressTest {
} else { } else {
MultiGet(thread, read_opts, key, &from_db); MultiGet(thread, read_opts, key, &from_db);
} }
} else if (probability_operation < FLAGS_delpercent + FLAGS_readpercent) { }
//introduce delete load prob_op -= FLAGS_readpercent;
if (!FLAGS_test_batches_snapshots) {
MutexLock l(thread->shared->GetMutexForKey(rand_key));
thread->shared->Delete(rand_key);
db_->Delete(write_opts, key);
thread->stats.AddDeletes(1);
} else {
MultiDelete(thread, write_opts, key);
}
} else { // OPERATION prefix scan?
// write load if (prob_op >= 0 && prob_op < (int)FLAGS_prefixpercent) {
// keys are longs (e.g., 8 bytes), so we let prefixes be
// everything except the last byte. So there will be 2^8=256
// keys per prefix.
Slice prefix = Slice(key.data(), key.size() - 1);
if (!FLAGS_test_batches_snapshots) {
read_opts.prefix = &prefix;
Iterator* iter = db_->NewIterator(read_opts);
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
assert(iter->key().starts_with(prefix));
count++;
}
assert(count <= 256);
if (iter->status().ok()) {
thread->stats.AddPrefixes(1, count);
} else {
thread->stats.AddErrors(1);
}
delete iter;
} else {
MultiPrefixScan(thread, read_opts, prefix);
}
}
prob_op -= FLAGS_prefixpercent;
// OPERATION write?
if (prob_op >= 0 && prob_op < (int)FLAGS_writepercent) {
uint32_t value_base = thread->rand.Next(); uint32_t value_base = thread->rand.Next();
size_t sz = GenerateValue(value_base, value, sizeof(value)); size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz); Slice v(value, sz);
@ -811,9 +946,23 @@ class StressTest {
} else { } else {
MultiPut(thread, write_opts, key, v, sz); MultiPut(thread, write_opts, key, v, sz);
} }
PrintKeyValue(rand_key, value, sz); PrintKeyValue(rand_key, value, sz);
} }
prob_op -= FLAGS_writepercent;
// OPERATION delete?
if (prob_op >= 0 && prob_op < (int)FLAGS_delpercent) {
if (!FLAGS_test_batches_snapshots) {
MutexLock l(thread->shared->GetMutexForKey(rand_key));
thread->shared->Delete(rand_key);
db_->Delete(write_opts, key);
thread->stats.AddDeletes(1);
} else {
MultiDelete(thread, write_opts, key);
}
}
prob_op -= FLAGS_delpercent;
thread->stats.FinishedSingleOp(); thread->stats.FinishedSingleOp();
} }
thread->stats.Stop(); thread->stats.Stop();
@ -840,7 +989,8 @@ class StressTest {
void VerifyValue(long key, const ReadOptions &opts, const SharedState &shared, void VerifyValue(long key, const ReadOptions &opts, const SharedState &shared,
std::string *value_from_db, bool strict=false) const { std::string *value_from_db, bool strict=false) const {
Slice k((char*)&key, sizeof(key)); std::string keystr = Key(key);
Slice k = keystr;
char value[100]; char value[100];
uint32_t value_base = shared.Get(key); uint32_t value_base = shared.Get(key);
if (value_base == SharedState::SENTINEL && !strict) { if (value_base == SharedState::SENTINEL && !strict) {
@ -896,6 +1046,9 @@ class StressTest {
} }
fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str()); fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent); fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent);
fprintf(stdout, "Prefix percentage : %d\n", FLAGS_prefixpercent);
fprintf(stdout, "Write percentage : %d\n", FLAGS_writepercent);
fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent);
fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size); fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent); fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent);
fprintf(stdout, "Max key : %ld\n", FLAGS_max_key); fprintf(stdout, "Max key : %ld\n", FLAGS_max_key);
@ -941,6 +1094,7 @@ class StressTest {
options.compaction_style = FLAGS_compaction_style; options.compaction_style = FLAGS_compaction_style;
options.block_size = FLAGS_block_size; options.block_size = FLAGS_block_size;
options.filter_policy = filter_policy_; options.filter_policy = filter_policy_;
options.prefix_extractor = prefix_extractor_;
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
options.statistics = dbstats; options.statistics = dbstats;
options.env = FLAGS_env; options.env = FLAGS_env;
@ -1012,6 +1166,7 @@ class StressTest {
private: private:
shared_ptr<Cache> cache_; shared_ptr<Cache> cache_;
const FilterPolicy* filter_policy_; const FilterPolicy* filter_policy_;
const SliceTransform* prefix_extractor_;
DB* db_; DB* db_;
StackableDB* sdb_; StackableDB* sdb_;
int num_times_reopened_; int num_times_reopened_;
@ -1110,6 +1265,12 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--readpercent=%d%c", &n, &junk) == 1 && } else if (sscanf(argv[i], "--readpercent=%d%c", &n, &junk) == 1 &&
(n >= 0 && n <= 100)) { (n >= 0 && n <= 100)) {
FLAGS_readpercent = n; FLAGS_readpercent = n;
} else if (sscanf(argv[i], "--prefixpercent=%d%c", &n, &junk) == 1 &&
(n >= 0 && n <= 100)) {
FLAGS_prefixpercent = n;
} else if (sscanf(argv[i], "--writepercent=%d%c", &n, &junk) == 1 &&
(n >= 0 && n <= 100)) {
FLAGS_writepercent = n;
} else if (sscanf(argv[i], "--delpercent=%d%c", &n, &junk) == 1 && } else if (sscanf(argv[i], "--delpercent=%d%c", &n, &junk) == 1 &&
(n >= 0 && n <= 100)) { (n >= 0 && n <= 100)) {
FLAGS_delpercent = n; FLAGS_delpercent = n;
@ -1183,8 +1344,9 @@ int main(int argc, char** argv) {
// max number of concurrent compactions. // max number of concurrent compactions.
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
if ((FLAGS_readpercent + FLAGS_delpercent) > 100) { if ((FLAGS_readpercent + FLAGS_prefixpercent +
fprintf(stderr, "Error: Read + Delete percents > 100!\n"); FLAGS_writepercent + FLAGS_delpercent) != 100) {
fprintf(stderr, "Error: Read+Prefix+Write+Delete percents != 100!\n");
exit(1); exit(1);
} }
if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) { if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) {