Add option to use MultiGet in db_stress (#5264)

Summary:
The new option will pick a batch size randomly in the range 1-64. It will then space the keys in the batch by random intervals.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5264

Differential Revision: D15175522

Pulled By: anand1976

fbshipit-source-id: c16baa69d0f1ff4cf53c55c813ddd82c8aeb58fc
This commit is contained in:
anand76 2019-05-01 23:04:03 -07:00
parent 8baa66acf6
commit 5f703af1ea
2 changed files with 167 additions and 1 deletions

View File

@ -65,6 +65,7 @@ default_params = {
"writepercent": 35,
"format_version": lambda: random.randint(2, 4),
"index_block_restart_interval": lambda: random.choice(range(1, 16)),
"use_multiget" : lambda: random.randint(0, 1),
}
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'

View File

@ -455,6 +455,9 @@ DEFINE_uint64(snapshot_hold_ops, 0,
"If non-zero, then releases snapshots N operations after they're "
"acquired.");
DEFINE_bool(use_multiget, false,
"If set, use the batched MultiGet API for reads");
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
if (value < 0 || value>100) {
fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
@ -1725,6 +1728,27 @@ class StressTest {
return base_key + thread->rand.Next() % FLAGS_active_width;
}
static std::vector<int64_t> GenerateNKeys(
ThreadState* thread,
int num_keys,
uint64_t iteration) {
const double completed_ratio =
static_cast<double>(iteration) / FLAGS_ops_per_thread;
const int64_t base_key = static_cast<int64_t>(
completed_ratio * (FLAGS_max_key - FLAGS_active_width));
std::vector<int64_t> keys;
keys.reserve(num_keys);
int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
keys.push_back(next_key);
for (int i = 1; i < num_keys; ++i) {
// This may result in some duplicate keys
next_key = next_key + thread->rand.Next() %
(FLAGS_active_width - (next_key - base_key));
keys.push_back(next_key);
}
return keys;
}
static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
size_t value_sz =
((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
@ -2162,7 +2186,14 @@ class StressTest {
int prob_op = thread->rand.Uniform(100);
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
// OPERATION read
TestGet(thread, read_opts, rand_column_families, rand_keys);
if (FLAGS_use_multiget) {
int num_keys = thread->rand.Uniform(64);
rand_keys = GenerateNKeys(thread, num_keys, i);
TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
i += num_keys - 1;
} else {
TestGet(thread, read_opts, rand_column_families, rand_keys);
}
} else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
// OPERATION prefix scan
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
@ -2211,6 +2242,11 @@ class StressTest {
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
@ -2546,6 +2582,8 @@ class StressTest {
fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
FLAGS_subcompactions);
fprintf(stdout, "Use MultiGet : %s\n",
FLAGS_use_multiget ? "true" : "false");
const char* memtablerep = "";
switch (FLAGS_rep_factory) {
@ -3012,6 +3050,38 @@ class NonBatchedOpsStressTest : public StressTest {
return s;
}
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
size_t num_keys = rand_keys.size();
std::vector<std::string> key_str;
std::vector<Slice> keys;
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (size_t i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back());
}
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
statuses.data());
for (const auto& s : statuses) {
if (s.ok()) {
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else {
// errors case
thread->stats.AddErrors(1);
}
}
return statuses;
}
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
@ -3532,6 +3602,70 @@ class BatchedOpsStressTest : public StressTest {
return s;
}
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
int num_keys = rand_keys.size();
std::vector<Status> statuses(num_keys);
std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
for (int key = 0; key < 10; ++key) {
std::vector<Slice> key_slices;
std::vector<PinnableSlice> values(num_keys);
ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = db_->GetSnapshot();
std::vector<std::string> key_str;
std::string from_db;
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (int rand_key = 0; rand_key < num_keys; ++rand_key) {
key_str.emplace_back(keys[key] + Key(rand_keys[rand_key]));
key_slices.emplace_back(key_str.back());
}
db_->MultiGet(readoptionscopy, cfh, num_keys, key_slices.data(),
values.data(), statuses.data());
for (int i = 0; i < num_keys; i++) {
Status s = statuses[i];
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
// we continue after error rather than exiting so that we can
// find more errors if any
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
} else {
char expected_prefix = (keys[key])[0];
char actual_prefix = (values[i])[0];
if (actual_prefix != expected_prefix) {
fprintf(stderr, "error expected prefix = %c actual = %c\n",
expected_prefix, actual_prefix);
}
std::string str;
str.assign(values[i].data(), values[i].size());
values[i].Reset();
str[0] = ' '; // blank out the differing character
values[i].PinSelf(str);
thread->stats.AddGets(1, 1);
}
}
db_->ReleaseSnapshot(readoptionscopy.snapshot);
// Now that we retrieved all values, check that they all match
for (int i = 1; i < num_keys; i++) {
if (values[i] != values[0]) {
fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
key_str[i].c_str(),
StringToHex(values[0].ToString()).c_str(),
StringToHex(values[i].ToString()).c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
}
}
}
return statuses;
}
// Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
// in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
// of the key. Each of these 10 scans returns a series of values;
@ -3747,6 +3881,37 @@ class AtomicFlushStressTest : public StressTest {
return s;
}
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
int num_keys = rand_keys.size();
std::vector<std::string> key_str;
std::vector<Slice> keys;
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (int i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back());
}
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), statuses.data());
for (auto s : statuses) {
if (s.ok()) {
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else {
// errors case
thread->stats.AddErrors(1);
}
}
return statuses;
}
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,