Add option to use MultiGet in db_stress (#5264)
Summary: The new option will pick a batch size randomly in the range 1-64. It will then space the keys in the batch by random intervals. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5264 Differential Revision: D15175522 Pulled By: anand1976 fbshipit-source-id: c16baa69d0f1ff4cf53c55c813ddd82c8aeb58fc
This commit is contained in:
parent
d51eb0b583
commit
434ccf2df4
@ -65,6 +65,7 @@ default_params = {
|
||||
"writepercent": 35,
|
||||
"format_version": lambda: random.randint(2, 4),
|
||||
"index_block_restart_interval": lambda: random.choice(range(1, 16)),
|
||||
"use_multiget" : lambda: random.randint(0, 1),
|
||||
}
|
||||
|
||||
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
|
||||
|
@ -455,6 +455,9 @@ DEFINE_uint64(snapshot_hold_ops, 0,
|
||||
"If non-zero, then releases snapshots N operations after they're "
|
||||
"acquired.");
|
||||
|
||||
DEFINE_bool(use_multiget, false,
|
||||
"If set, use the batched MultiGet API for reads");
|
||||
|
||||
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
|
||||
if (value < 0 || value>100) {
|
||||
fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
|
||||
@ -1725,6 +1728,27 @@ class StressTest {
|
||||
return base_key + thread->rand.Next() % FLAGS_active_width;
|
||||
}
|
||||
|
||||
static std::vector<int64_t> GenerateNKeys(
|
||||
ThreadState* thread,
|
||||
int num_keys,
|
||||
uint64_t iteration) {
|
||||
const double completed_ratio =
|
||||
static_cast<double>(iteration) / FLAGS_ops_per_thread;
|
||||
const int64_t base_key = static_cast<int64_t>(
|
||||
completed_ratio * (FLAGS_max_key - FLAGS_active_width));
|
||||
std::vector<int64_t> keys;
|
||||
keys.reserve(num_keys);
|
||||
int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
|
||||
keys.push_back(next_key);
|
||||
for (int i = 1; i < num_keys; ++i) {
|
||||
// This may result in some duplicate keys
|
||||
next_key = next_key + thread->rand.Next() %
|
||||
(FLAGS_active_width - (next_key - base_key));
|
||||
keys.push_back(next_key);
|
||||
}
|
||||
return keys;
|
||||
}
|
||||
|
||||
static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
|
||||
size_t value_sz =
|
||||
((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
|
||||
@ -2162,7 +2186,14 @@ class StressTest {
|
||||
int prob_op = thread->rand.Uniform(100);
|
||||
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
|
||||
// OPERATION read
|
||||
if (FLAGS_use_multiget) {
|
||||
int num_keys = thread->rand.Uniform(64);
|
||||
rand_keys = GenerateNKeys(thread, num_keys, i);
|
||||
TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
|
||||
i += num_keys - 1;
|
||||
} else {
|
||||
TestGet(thread, read_opts, rand_column_families, rand_keys);
|
||||
}
|
||||
} else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
|
||||
// OPERATION prefix scan
|
||||
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
|
||||
@ -2211,6 +2242,11 @@ class StressTest {
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) = 0;
|
||||
|
||||
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) = 0;
|
||||
|
||||
virtual Status TestPrefixScan(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
@ -2546,6 +2582,8 @@ class StressTest {
|
||||
fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
|
||||
fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
|
||||
FLAGS_subcompactions);
|
||||
fprintf(stdout, "Use MultiGet : %s\n",
|
||||
FLAGS_use_multiget ? "true" : "false");
|
||||
|
||||
const char* memtablerep = "";
|
||||
switch (FLAGS_rep_factory) {
|
||||
@ -3012,6 +3050,38 @@ class NonBatchedOpsStressTest : public StressTest {
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) {
|
||||
size_t num_keys = rand_keys.size();
|
||||
std::vector<std::string> key_str;
|
||||
std::vector<Slice> keys;
|
||||
std::vector<PinnableSlice> values(num_keys);
|
||||
std::vector<Status> statuses(num_keys);
|
||||
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
key_str.emplace_back(Key(rand_keys[i]));
|
||||
keys.emplace_back(key_str.back());
|
||||
}
|
||||
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
|
||||
statuses.data());
|
||||
for (const auto& s : statuses) {
|
||||
if (s.ok()) {
|
||||
// found case
|
||||
thread->stats.AddGets(1, 1);
|
||||
} else if (s.IsNotFound()) {
|
||||
// not found case
|
||||
thread->stats.AddGets(1, 0);
|
||||
} else {
|
||||
// errors case
|
||||
thread->stats.AddErrors(1);
|
||||
}
|
||||
}
|
||||
return statuses;
|
||||
}
|
||||
|
||||
virtual Status TestPrefixScan(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
@ -3532,6 +3602,70 @@ class BatchedOpsStressTest : public StressTest {
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
|
||||
const ReadOptions& readoptions,
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) {
|
||||
int num_keys = rand_keys.size();
|
||||
std::vector<Status> statuses(num_keys);
|
||||
std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
|
||||
for (int key = 0; key < 10; ++key) {
|
||||
std::vector<Slice> key_slices;
|
||||
std::vector<PinnableSlice> values(num_keys);
|
||||
ReadOptions readoptionscopy = readoptions;
|
||||
readoptionscopy.snapshot = db_->GetSnapshot();
|
||||
std::vector<std::string> key_str;
|
||||
std::string from_db;
|
||||
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
|
||||
|
||||
for (int rand_key = 0; rand_key < num_keys; ++rand_key) {
|
||||
key_str.emplace_back(keys[key] + Key(rand_keys[rand_key]));
|
||||
key_slices.emplace_back(key_str.back());
|
||||
}
|
||||
db_->MultiGet(readoptionscopy, cfh, num_keys, key_slices.data(),
|
||||
values.data(), statuses.data());
|
||||
for (int i = 0; i < num_keys; i++) {
|
||||
Status s = statuses[i];
|
||||
if (!s.ok() && !s.IsNotFound()) {
|
||||
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
|
||||
thread->stats.AddErrors(1);
|
||||
// we continue after error rather than exiting so that we can
|
||||
// find more errors if any
|
||||
} else if (s.IsNotFound()) {
|
||||
thread->stats.AddGets(1, 0);
|
||||
} else {
|
||||
char expected_prefix = (keys[key])[0];
|
||||
char actual_prefix = (values[i])[0];
|
||||
if (actual_prefix != expected_prefix) {
|
||||
fprintf(stderr, "error expected prefix = %c actual = %c\n",
|
||||
expected_prefix, actual_prefix);
|
||||
}
|
||||
std::string str;
|
||||
str.assign(values[i].data(), values[i].size());
|
||||
values[i].Reset();
|
||||
str[0] = ' '; // blank out the differing character
|
||||
values[i].PinSelf(str);
|
||||
thread->stats.AddGets(1, 1);
|
||||
}
|
||||
}
|
||||
db_->ReleaseSnapshot(readoptionscopy.snapshot);
|
||||
|
||||
// Now that we retrieved all values, check that they all match
|
||||
for (int i = 1; i < num_keys; i++) {
|
||||
if (values[i] != values[0]) {
|
||||
fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
|
||||
key_str[i].c_str(),
|
||||
StringToHex(values[0].ToString()).c_str(),
|
||||
StringToHex(values[i].ToString()).c_str());
|
||||
// we continue after error rather than exiting so that we can
|
||||
// find more errors if any
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return statuses;
|
||||
}
|
||||
|
||||
// Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
|
||||
// in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
|
||||
// of the key. Each of these 10 scans returns a series of values;
|
||||
@ -3747,6 +3881,37 @@ class AtomicFlushStressTest : public StressTest {
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) {
|
||||
int num_keys = rand_keys.size();
|
||||
std::vector<std::string> key_str;
|
||||
std::vector<Slice> keys;
|
||||
std::vector<PinnableSlice> values(num_keys);
|
||||
std::vector<Status> statuses(num_keys);
|
||||
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
|
||||
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
key_str.emplace_back(Key(rand_keys[i]));
|
||||
keys.emplace_back(key_str.back());
|
||||
}
|
||||
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), statuses.data());
|
||||
for (auto s : statuses) {
|
||||
if (s.ok()) {
|
||||
// found case
|
||||
thread->stats.AddGets(1, 1);
|
||||
} else if (s.IsNotFound()) {
|
||||
// not found case
|
||||
thread->stats.AddGets(1, 0);
|
||||
} else {
|
||||
// errors case
|
||||
thread->stats.AddErrors(1);
|
||||
}
|
||||
}
|
||||
return statuses;
|
||||
}
|
||||
|
||||
virtual Status TestPrefixScan(ThreadState* thread,
|
||||
const ReadOptions& readoptions,
|
||||
const std::vector<int>& rand_column_families,
|
||||
|
Loading…
Reference in New Issue
Block a user