Choose unique keys faster in db_stress (#3990)
Summary: db_stress initialization randomly chooses a set of keys to not overwrite. It was doing it separately for each column family. That caused 30+ second initialization times for the non-simple crash tests, which have 10 CFs. This PR: - reuses the same set of randomly chosen no-overwrite keys across all CFs - logs a couple more timestamps so we can more easily see initialization time Closes https://github.com/facebook/rocksdb/pull/3990 Differential Revision: D8393821 Pulled By: ajkr fbshipit-source-id: d0b263a298df607285ffdd8b0983ff6575cc6c34
This commit is contained in:
parent
a720401877
commit
dd216dd76a
@ -827,25 +827,19 @@ class SharedState {
|
|||||||
for (int64_t i = 0; i < max_key_; i++) {
|
for (int64_t i = 0; i < max_key_; i++) {
|
||||||
permutation[i] = i;
|
permutation[i] = i;
|
||||||
}
|
}
|
||||||
|
// Now do the Knuth shuffle
|
||||||
for (auto& cf_ids : no_overwrite_ids_) {
|
int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
|
||||||
// Now do the Knuth shuffle
|
// Only need to figure out first num_no_overwrite_keys of permutation
|
||||||
int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
|
no_overwrite_ids_.reserve(num_no_overwrite_keys);
|
||||||
// Only need to figure out first num_no_overwrite_keys of permutation
|
for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
|
||||||
for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
|
int64_t rand_index = i + rnd.Next() % (max_key_ - i);
|
||||||
int64_t rand_index = i + rnd.Next() % (max_key_ - 1 - i);
|
// Swap i and rand_index;
|
||||||
// Swap i and rand_index;
|
int64_t temp = permutation[i];
|
||||||
int64_t temp = permutation[i];
|
permutation[i] = permutation[rand_index];
|
||||||
permutation[i] = permutation[rand_index];
|
permutation[rand_index] = temp;
|
||||||
permutation[rand_index] = temp;
|
// Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
|
||||||
}
|
// permutation
|
||||||
|
no_overwrite_ids_.insert(permutation[i]);
|
||||||
// Now fill cf_ids with the first num_no_overwrite_keys of permutation
|
|
||||||
cf_ids.reserve(num_no_overwrite_keys);
|
|
||||||
for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
|
|
||||||
cf_ids.insert(permutation[i]);
|
|
||||||
}
|
|
||||||
assert(cf_ids.size() == static_cast<size_t>(num_no_overwrite_keys));
|
|
||||||
}
|
}
|
||||||
delete[] permutation;
|
delete[] permutation;
|
||||||
|
|
||||||
@ -1074,8 +1068,8 @@ class SharedState {
|
|||||||
return covered;
|
return covered;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AllowsOverwrite(int cf, int64_t key) {
|
bool AllowsOverwrite(int64_t key) {
|
||||||
return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end();
|
return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Exists(int cf, int64_t key) {
|
bool Exists(int cf, int64_t key) {
|
||||||
@ -1121,7 +1115,7 @@ class SharedState {
|
|||||||
std::atomic<bool> verification_failure_;
|
std::atomic<bool> verification_failure_;
|
||||||
|
|
||||||
// Keys that should not be overwritten
|
// Keys that should not be overwritten
|
||||||
std::vector<std::unordered_set<size_t> > no_overwrite_ids_;
|
std::unordered_set<size_t> no_overwrite_ids_;
|
||||||
|
|
||||||
std::atomic<uint32_t>* values_;
|
std::atomic<uint32_t>* values_;
|
||||||
std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_;
|
std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_;
|
||||||
@ -1413,12 +1407,18 @@ class StressTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool Run() {
|
bool Run() {
|
||||||
|
uint64_t now = FLAGS_env->NowMicros();
|
||||||
|
fprintf(stdout, "%s Initializing db_stress\n",
|
||||||
|
FLAGS_env->TimeToString(now / 1000000).c_str());
|
||||||
PrintEnv();
|
PrintEnv();
|
||||||
Open();
|
Open();
|
||||||
BuildOptionsTable();
|
BuildOptionsTable();
|
||||||
SharedState shared(this);
|
SharedState shared(this);
|
||||||
uint32_t n = shared.GetNumThreads();
|
uint32_t n = shared.GetNumThreads();
|
||||||
|
|
||||||
|
now = FLAGS_env->NowMicros();
|
||||||
|
fprintf(stdout, "%s Initializing worker threads\n",
|
||||||
|
FLAGS_env->TimeToString(now / 1000000).c_str());
|
||||||
std::vector<ThreadState*> threads(n);
|
std::vector<ThreadState*> threads(n);
|
||||||
for (uint32_t i = 0; i < n; i++) {
|
for (uint32_t i = 0; i < n; i++) {
|
||||||
threads[i] = new ThreadState(i, &shared);
|
threads[i] = new ThreadState(i, &shared);
|
||||||
@ -1446,7 +1446,7 @@ class StressTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto now = FLAGS_env->NowMicros();
|
now = FLAGS_env->NowMicros();
|
||||||
fprintf(stdout, "%s Starting database operations\n",
|
fprintf(stdout, "%s Starting database operations\n",
|
||||||
FLAGS_env->TimeToString(now/1000000).c_str());
|
FLAGS_env->TimeToString(now/1000000).c_str());
|
||||||
|
|
||||||
@ -1481,7 +1481,7 @@ class StressTest {
|
|||||||
delete threads[i];
|
delete threads[i];
|
||||||
threads[i] = nullptr;
|
threads[i] = nullptr;
|
||||||
}
|
}
|
||||||
auto now = FLAGS_env->NowMicros();
|
now = FLAGS_env->NowMicros();
|
||||||
if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) {
|
if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) {
|
||||||
fprintf(stdout, "%s Verification successful\n",
|
fprintf(stdout, "%s Verification successful\n",
|
||||||
FLAGS_env->TimeToString(now/1000000).c_str());
|
FLAGS_env->TimeToString(now/1000000).c_str());
|
||||||
@ -2488,8 +2488,8 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||||||
int64_t max_key = shared->GetMaxKey();
|
int64_t max_key = shared->GetMaxKey();
|
||||||
int64_t rand_key = rand_keys[0];
|
int64_t rand_key = rand_keys[0];
|
||||||
int rand_column_family = rand_column_families[0];
|
int rand_column_family = rand_column_families[0];
|
||||||
while (!shared->AllowsOverwrite(rand_column_family, rand_key) &&
|
while (!shared->AllowsOverwrite(rand_key) &&
|
||||||
(FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
|
(FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
|
||||||
lock.reset();
|
lock.reset();
|
||||||
rand_key = thread->rand.Next() % max_key;
|
rand_key = thread->rand.Next() % max_key;
|
||||||
rand_column_family = thread->rand.Next() % FLAGS_column_families;
|
rand_column_family = thread->rand.Next() % FLAGS_column_families;
|
||||||
@ -2570,8 +2570,8 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||||||
// OPERATION delete
|
// OPERATION delete
|
||||||
// If the chosen key does not allow overwrite and it does not exist,
|
// If the chosen key does not allow overwrite and it does not exist,
|
||||||
// choose another key.
|
// choose another key.
|
||||||
while (!shared->AllowsOverwrite(rand_column_family, rand_key) &&
|
while (!shared->AllowsOverwrite(rand_key) &&
|
||||||
!shared->Exists(rand_column_family, rand_key)) {
|
!shared->Exists(rand_column_family, rand_key)) {
|
||||||
lock.reset();
|
lock.reset();
|
||||||
rand_key = thread->rand.Next() % max_key;
|
rand_key = thread->rand.Next() % max_key;
|
||||||
rand_column_family = thread->rand.Next() % FLAGS_column_families;
|
rand_column_family = thread->rand.Next() % FLAGS_column_families;
|
||||||
@ -2586,7 +2586,7 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||||||
// Use delete if the key may be overwritten and a single deletion
|
// Use delete if the key may be overwritten and a single deletion
|
||||||
// otherwise.
|
// otherwise.
|
||||||
Status s;
|
Status s;
|
||||||
if (shared->AllowsOverwrite(rand_column_family, rand_key)) {
|
if (shared->AllowsOverwrite(rand_key)) {
|
||||||
shared->Delete(rand_column_family, rand_key, true /* pending */);
|
shared->Delete(rand_column_family, rand_key, true /* pending */);
|
||||||
if (!FLAGS_use_txn) {
|
if (!FLAGS_use_txn) {
|
||||||
s = db_->Delete(write_opts, cfh, key);
|
s = db_->Delete(write_opts, cfh, key);
|
||||||
|
Loading…
Reference in New Issue
Block a user