rocksdb/db_stress_tool/db_stress_driver.cc
Andrew Kryczka 775dc623ad add CompactionFilter to stress/crash tests (#6988)
Summary:
Added a `CompactionFilter` that is aware of the stress test's expected state. It only drops key versions that are already covered according to the expected state. It is incompatible with snapshots (same as all `CompactionFilter`s), so disables all snapshot-related features when used in the crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6988

Test Plan:
running a minified blackbox crash test

```
$ TEST_TMPDIR=/dev/shm python tools/db_crashtest.py blackbox --max_key=1000000 -write_buffer_size=1048576 -max_bytes_for_level_base=4194304 -target_file_size_base=1048576 -value_size_mult=33 --interval=10 --duration=3600
```

Reviewed By: anand1976

Differential Revision: D22072888

Pulled By: ajkr

fbshipit-source-id: 727b9d7a90d5eab18be0ec6cd5a810712ac13320
2020-06-18 09:54:55 -07:00

173 lines
5.1 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
namespace ROCKSDB_NAMESPACE {
void ThreadBody(void* v) {
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared;
if (!FLAGS_skip_verifydb && shared->ShouldVerifyAtBeginning()) {
thread->shared->GetStressTest()->VerifyDb(thread);
}
{
MutexLock l(shared->GetMutex());
shared->IncInitialized();
if (shared->AllInitialized()) {
shared->GetCondVar()->SignalAll();
}
while (!shared->Started()) {
shared->GetCondVar()->Wait();
}
}
thread->shared->GetStressTest()->OperateDb(thread);
{
MutexLock l(shared->GetMutex());
shared->IncOperated();
if (shared->AllOperated()) {
shared->GetCondVar()->SignalAll();
}
while (!shared->VerifyStarted()) {
shared->GetCondVar()->Wait();
}
}
if (!FLAGS_skip_verifydb) {
thread->shared->GetStressTest()->VerifyDb(thread);
}
{
MutexLock l(shared->GetMutex());
shared->IncDone();
if (shared->AllDone()) {
shared->GetCondVar()->SignalAll();
}
}
}
bool RunStressTest(StressTest* stress) {
stress->InitDb();
SharedState shared(db_stress_env, stress);
stress->FinishInitDb(&shared);
#ifndef NDEBUG
if (FLAGS_sync_fault_injection) {
fault_fs_guard->SetFilesystemDirectWritable(false);
}
#endif
uint32_t n = shared.GetNumThreads();
uint64_t now = db_stress_env->NowMicros();
fprintf(stdout, "%s Initializing worker threads\n",
db_stress_env->TimeToString(now / 1000000).c_str());
std::vector<ThreadState*> threads(n);
for (uint32_t i = 0; i < n; i++) {
threads[i] = new ThreadState(i, &shared);
db_stress_env->StartThread(ThreadBody, threads[i]);
}
ThreadState bg_thread(0, &shared);
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
}
ThreadState continuous_verification_thread(0, &shared);
if (FLAGS_continuous_verification_interval > 0) {
db_stress_env->StartThread(DbVerificationThread,
&continuous_verification_thread);
}
// Each thread goes through the following states:
// initializing -> wait for others to init -> read/populate/depopulate
// wait for others to operate -> verify -> done
{
MutexLock l(shared.GetMutex());
while (!shared.AllInitialized()) {
shared.GetCondVar()->Wait();
}
if (shared.ShouldVerifyAtBeginning()) {
if (shared.HasVerificationFailedYet()) {
fprintf(stderr, "Crash-recovery verification failed :(\n");
} else {
fprintf(stdout, "Crash-recovery verification passed :)\n");
}
}
now = db_stress_env->NowMicros();
fprintf(stdout, "%s Starting database operations\n",
db_stress_env->TimeToString(now / 1000000).c_str());
shared.SetStart();
shared.GetCondVar()->SignalAll();
while (!shared.AllOperated()) {
shared.GetCondVar()->Wait();
}
now = db_stress_env->NowMicros();
if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "%s Limited verification already done during gets\n",
db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
} else if (FLAGS_skip_verifydb) {
fprintf(stdout, "%s Verification skipped\n",
db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
} else {
fprintf(stdout, "%s Starting verification\n",
db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
}
shared.SetStartVerify();
shared.GetCondVar()->SignalAll();
while (!shared.AllDone()) {
shared.GetCondVar()->Wait();
}
}
for (unsigned int i = 1; i < n; i++) {
threads[0]->stats.Merge(threads[i]->stats);
}
threads[0]->stats.Report("Stress Test");
for (unsigned int i = 0; i < n; i++) {
delete threads[i];
threads[i] = nullptr;
}
now = db_stress_env->NowMicros();
if (!FLAGS_skip_verifydb && !FLAGS_test_batches_snapshots &&
!shared.HasVerificationFailedYet()) {
fprintf(stdout, "%s Verification successful\n",
db_stress_env->TimeToString(now / 1000000).c_str());
}
stress->PrintStatistics();
if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
FLAGS_continuous_verification_interval > 0) {
MutexLock l(shared.GetMutex());
shared.SetShouldStopBgThread();
while (!shared.BgThreadsFinished()) {
shared.GetCondVar()->Wait();
}
}
if (!stress->VerifySecondaries()) {
return false;
}
if (shared.HasVerificationFailedYet()) {
fprintf(stderr, "Verification failed :(\n");
return false;
}
return true;
}
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS