rocksdb/db_stress_tool/db_stress_driver.cc
Yanqin Jin 670a916d01 Add more verification to db_stress (#6173)
Summary:
Currently, db_stress performs verification by calling `VerifyDb()` at the end of test and optionally before tests start. In case of corruption or incorrect result, it will be too late. This PR adds more verification in two ways.
1. For cf consistency test, each test thread takes a snapshot and verifies every N ops. N is configurable via `-verify_db_one_in`. This option is not supported in other stress tests.
2. For cf consistency test, we use another background thread in which a secondary instance periodically tails the primary (interval is configurable). We verify the secondary. Once an error is detected, we terminate the test and report. This does not affect other stress tests.

Test plan (devserver)
```
$./db_stress -test_cf_consistency -verify_db_one_in=0 -ops_per_thread=100000 -continuous_verification_interval=100
$./db_stress -test_cf_consistency -verify_db_one_in=1000 -ops_per_thread=10000 -continuous_verification_interval=0
$make crash_test
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6173

Differential Revision: D19047367

Pulled By: riversand963

fbshipit-source-id: aeed584ad71f9310c111445f34975e5ab47a0615
2019-12-20 08:49:29 -08:00

164 lines
4.7 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
namespace rocksdb {
void ThreadBody(void* v) {
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared;
if (shared->ShouldVerifyAtBeginning()) {
thread->shared->GetStressTest()->VerifyDb(thread);
}
{
MutexLock l(shared->GetMutex());
shared->IncInitialized();
if (shared->AllInitialized()) {
shared->GetCondVar()->SignalAll();
}
while (!shared->Started()) {
shared->GetCondVar()->Wait();
}
}
thread->shared->GetStressTest()->OperateDb(thread);
{
MutexLock l(shared->GetMutex());
shared->IncOperated();
if (shared->AllOperated()) {
shared->GetCondVar()->SignalAll();
}
while (!shared->VerifyStarted()) {
shared->GetCondVar()->Wait();
}
}
thread->shared->GetStressTest()->VerifyDb(thread);
{
MutexLock l(shared->GetMutex());
shared->IncDone();
if (shared->AllDone()) {
shared->GetCondVar()->SignalAll();
}
}
}
bool RunStressTest(StressTest* stress) {
stress->InitDb();
SharedState shared(FLAGS_env, stress);
if (FLAGS_read_only) {
stress->InitReadonlyDb(&shared);
}
uint32_t n = shared.GetNumThreads();
uint64_t now = FLAGS_env->NowMicros();
fprintf(stdout, "%s Initializing worker threads\n",
FLAGS_env->TimeToString(now / 1000000).c_str());
std::vector<ThreadState*> threads(n);
for (uint32_t i = 0; i < n; i++) {
threads[i] = new ThreadState(i, &shared);
FLAGS_env->StartThread(ThreadBody, threads[i]);
}
ThreadState bg_thread(0, &shared);
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread);
}
ThreadState continuous_verification_thread(0, &shared);
if (FLAGS_continuous_verification_interval > 0) {
FLAGS_env->StartThread(DbVerificationThread,
&continuous_verification_thread);
}
// Each thread goes through the following states:
// initializing -> wait for others to init -> read/populate/depopulate
// wait for others to operate -> verify -> done
{
MutexLock l(shared.GetMutex());
while (!shared.AllInitialized()) {
shared.GetCondVar()->Wait();
}
if (shared.ShouldVerifyAtBeginning()) {
if (shared.HasVerificationFailedYet()) {
fprintf(stderr, "Crash-recovery verification failed :(\n");
} else {
fprintf(stdout, "Crash-recovery verification passed :)\n");
}
}
now = FLAGS_env->NowMicros();
fprintf(stdout, "%s Starting database operations\n",
FLAGS_env->TimeToString(now / 1000000).c_str());
shared.SetStart();
shared.GetCondVar()->SignalAll();
while (!shared.AllOperated()) {
shared.GetCondVar()->Wait();
}
now = FLAGS_env->NowMicros();
if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "%s Limited verification already done during gets\n",
FLAGS_env->TimeToString((uint64_t)now / 1000000).c_str());
} else {
fprintf(stdout, "%s Starting verification\n",
FLAGS_env->TimeToString((uint64_t)now / 1000000).c_str());
}
shared.SetStartVerify();
shared.GetCondVar()->SignalAll();
while (!shared.AllDone()) {
shared.GetCondVar()->Wait();
}
}
for (unsigned int i = 1; i < n; i++) {
threads[0]->stats.Merge(threads[i]->stats);
}
threads[0]->stats.Report("Stress Test");
for (unsigned int i = 0; i < n; i++) {
delete threads[i];
threads[i] = nullptr;
}
now = FLAGS_env->NowMicros();
if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) {
fprintf(stdout, "%s Verification successful\n",
FLAGS_env->TimeToString(now / 1000000).c_str());
}
stress->PrintStatistics();
if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
FLAGS_continuous_verification_interval > 0) {
MutexLock l(shared.GetMutex());
shared.SetShouldStopBgThread();
while (!shared.BgThreadsFinished()) {
shared.GetCondVar()->Wait();
}
}
if (!stress->VerifySecondaries()) {
return false;
}
if (shared.HasVerificationFailedYet()) {
fprintf(stderr, "Verification failed :(\n");
return false;
}
return true;
}
} // namespace rocksdb
#endif // GFLAGS