c7ce03dce1
Summary: Previously we enabled tracking expected state changes during `FinishInitDb()`, as soon as the DB was opened. This meant tracing was enabled during `VerifyDb()`. This cost extra CPU by requiring `DBImpl::trace_mutex_` to be acquired on each read operation. It was unnecessary since we know there are no expected state changes during the `VerifyDb()` phase. So, this PR delays tracking expected state changes until after the `VerifyDb()` phase has completed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9470 Test Plan: Measured this PR reduced `VerifyDb()` 76% (387 -> 92 seconds) with `-disable_wal=1` (i.e., expected state tracking enabled). - benchmark command: `./db_stress -max_key=100000000 -ops_per_thread=1 -destroy_db_initially=1 -expected_values_dir=/dev/shm/dbstress_expected/ -db=/dev/shm/dbstress/ --clear_column_family_one_in=0 --disable_wal=1 --reopen=0` - without this PR, `VerifyDb()` takes 387 seconds: ``` 2022/01/30-21:43:04 Initializing worker threads Crash-recovery verification passed :) 2022/01/30-21:49:31 Starting database operations ``` - with this PR, `VerifyDb()` takes 92 seconds ``` 2022/01/30-21:59:06 Initializing worker threads Crash-recovery verification passed :) 2022/01/30-22:00:38 Starting database operations ``` Reviewed By: riversand963 Differential Revision: D33884596 Pulled By: ajkr fbshipit-source-id: 5f259de8087de5b0531f088e11297f37ed2f7685
191 lines
5.5 KiB
C++
191 lines
5.5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
//
|
|
|
|
#ifdef GFLAGS
|
|
#include "db_stress_tool/db_stress_common.h"
|
|
#include "utilities/fault_injection_fs.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
void ThreadBody(void* v) {
|
|
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
|
|
SharedState* shared = thread->shared;
|
|
|
|
if (!FLAGS_skip_verifydb && shared->ShouldVerifyAtBeginning()) {
|
|
thread->shared->GetStressTest()->VerifyDb(thread);
|
|
}
|
|
{
|
|
MutexLock l(shared->GetMutex());
|
|
shared->IncInitialized();
|
|
if (shared->AllInitialized()) {
|
|
shared->GetCondVar()->SignalAll();
|
|
}
|
|
while (!shared->Started()) {
|
|
shared->GetCondVar()->Wait();
|
|
}
|
|
}
|
|
thread->shared->GetStressTest()->OperateDb(thread);
|
|
|
|
{
|
|
MutexLock l(shared->GetMutex());
|
|
shared->IncOperated();
|
|
if (shared->AllOperated()) {
|
|
shared->GetCondVar()->SignalAll();
|
|
}
|
|
while (!shared->VerifyStarted()) {
|
|
shared->GetCondVar()->Wait();
|
|
}
|
|
}
|
|
|
|
if (!FLAGS_skip_verifydb) {
|
|
thread->shared->GetStressTest()->VerifyDb(thread);
|
|
}
|
|
|
|
{
|
|
MutexLock l(shared->GetMutex());
|
|
shared->IncDone();
|
|
if (shared->AllDone()) {
|
|
shared->GetCondVar()->SignalAll();
|
|
}
|
|
}
|
|
}
|
|
|
|
bool RunStressTest(StressTest* stress) {
|
|
SystemClock* clock = db_stress_env->GetSystemClock().get();
|
|
stress->InitDb();
|
|
SharedState shared(db_stress_env, stress);
|
|
stress->FinishInitDb(&shared);
|
|
|
|
#ifndef NDEBUG
|
|
if (FLAGS_sync_fault_injection) {
|
|
fault_fs_guard->SetFilesystemDirectWritable(false);
|
|
}
|
|
#endif
|
|
|
|
uint32_t n = FLAGS_threads;
|
|
uint64_t now = clock->NowMicros();
|
|
fprintf(stdout, "%s Initializing worker threads\n",
|
|
clock->TimeToString(now / 1000000).c_str());
|
|
|
|
shared.SetThreads(n);
|
|
|
|
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
|
|
shared.IncBgThreads();
|
|
}
|
|
|
|
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
|
|
shared.IncBgThreads();
|
|
}
|
|
|
|
std::vector<ThreadState*> threads(n);
|
|
for (uint32_t i = 0; i < n; i++) {
|
|
threads[i] = new ThreadState(i, &shared);
|
|
db_stress_env->StartThread(ThreadBody, threads[i]);
|
|
}
|
|
|
|
ThreadState bg_thread(0, &shared);
|
|
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
|
|
db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
|
|
}
|
|
|
|
ThreadState continuous_verification_thread(0, &shared);
|
|
if (FLAGS_continuous_verification_interval > 0) {
|
|
db_stress_env->StartThread(DbVerificationThread,
|
|
&continuous_verification_thread);
|
|
}
|
|
|
|
// Each thread goes through the following states:
|
|
// initializing -> wait for others to init -> read/populate/depopulate
|
|
// wait for others to operate -> verify -> done
|
|
|
|
{
|
|
MutexLock l(shared.GetMutex());
|
|
while (!shared.AllInitialized()) {
|
|
shared.GetCondVar()->Wait();
|
|
}
|
|
if (shared.ShouldVerifyAtBeginning()) {
|
|
if (shared.HasVerificationFailedYet()) {
|
|
fprintf(stderr, "Crash-recovery verification failed :(\n");
|
|
} else {
|
|
fprintf(stdout, "Crash-recovery verification passed :)\n");
|
|
}
|
|
}
|
|
|
|
// This is after the verification step to avoid making all those `Get()`s
|
|
// and `MultiGet()`s contend on the DB-wide trace mutex.
|
|
stress->TrackExpectedState(&shared);
|
|
|
|
now = clock->NowMicros();
|
|
fprintf(stdout, "%s Starting database operations\n",
|
|
clock->TimeToString(now / 1000000).c_str());
|
|
|
|
shared.SetStart();
|
|
shared.GetCondVar()->SignalAll();
|
|
while (!shared.AllOperated()) {
|
|
shared.GetCondVar()->Wait();
|
|
}
|
|
|
|
now = clock->NowMicros();
|
|
if (FLAGS_test_batches_snapshots) {
|
|
fprintf(stdout, "%s Limited verification already done during gets\n",
|
|
clock->TimeToString((uint64_t)now / 1000000).c_str());
|
|
} else if (FLAGS_skip_verifydb) {
|
|
fprintf(stdout, "%s Verification skipped\n",
|
|
clock->TimeToString((uint64_t)now / 1000000).c_str());
|
|
} else {
|
|
fprintf(stdout, "%s Starting verification\n",
|
|
clock->TimeToString((uint64_t)now / 1000000).c_str());
|
|
}
|
|
|
|
shared.SetStartVerify();
|
|
shared.GetCondVar()->SignalAll();
|
|
while (!shared.AllDone()) {
|
|
shared.GetCondVar()->Wait();
|
|
}
|
|
}
|
|
|
|
for (unsigned int i = 1; i < n; i++) {
|
|
threads[0]->stats.Merge(threads[i]->stats);
|
|
}
|
|
threads[0]->stats.Report("Stress Test");
|
|
|
|
for (unsigned int i = 0; i < n; i++) {
|
|
delete threads[i];
|
|
threads[i] = nullptr;
|
|
}
|
|
now = clock->NowMicros();
|
|
if (!FLAGS_skip_verifydb && !FLAGS_test_batches_snapshots &&
|
|
!shared.HasVerificationFailedYet()) {
|
|
fprintf(stdout, "%s Verification successful\n",
|
|
clock->TimeToString(now / 1000000).c_str());
|
|
}
|
|
stress->PrintStatistics();
|
|
|
|
if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
|
|
FLAGS_continuous_verification_interval > 0) {
|
|
MutexLock l(shared.GetMutex());
|
|
shared.SetShouldStopBgThread();
|
|
while (!shared.BgThreadsFinished()) {
|
|
shared.GetCondVar()->Wait();
|
|
}
|
|
}
|
|
|
|
if (!stress->VerifySecondaries()) {
|
|
return false;
|
|
}
|
|
|
|
if (shared.HasVerificationFailedYet()) {
|
|
fprintf(stderr, "Verification failed :(\n");
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
#endif // GFLAGS
|