rocksdb/db_stress_tool/db_stress_driver.cc

182 lines
5.3 KiB
C++
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
void ThreadBody(void* v) {
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared;
if (!FLAGS_skip_verifydb && shared->ShouldVerifyAtBeginning()) {
thread->shared->GetStressTest()->VerifyDb(thread);
}
{
MutexLock l(shared->GetMutex());
shared->IncInitialized();
if (shared->AllInitialized()) {
shared->GetCondVar()->SignalAll();
}
while (!shared->Started()) {
shared->GetCondVar()->Wait();
}
}
thread->shared->GetStressTest()->OperateDb(thread);
{
MutexLock l(shared->GetMutex());
shared->IncOperated();
if (shared->AllOperated()) {
shared->GetCondVar()->SignalAll();
}
while (!shared->VerifyStarted()) {
shared->GetCondVar()->Wait();
}
}
if (!FLAGS_skip_verifydb) {
thread->shared->GetStressTest()->VerifyDb(thread);
}
{
MutexLock l(shared->GetMutex());
shared->IncDone();
if (shared->AllDone()) {
shared->GetCondVar()->SignalAll();
}
}
}
bool RunStressTest(StressTest* stress) {
SystemClock* clock = db_stress_env->GetSystemClock().get();
stress->InitDb();
SharedState shared(db_stress_env, stress);
stress->FinishInitDb(&shared);
#ifndef NDEBUG
if (FLAGS_sync_fault_injection) {
fault_fs_guard->SetFilesystemDirectWritable(false);
}
#endif
uint32_t n = FLAGS_threads;
uint64_t now = clock->NowMicros();
fprintf(stdout, "%s Initializing worker threads\n",
clock->TimeToString(now / 1000000).c_str());
Fix race condition in db_stress thread setup (#9314) Summary: We need to grab `SharedState`'s mutex while calling `IncThreads()` or `IncBgThreads()`. Otherwise the newly launched threads can simultaneously access the thread counters to check if every thread has finished initializing. Repro command: ``` $ rm -rf /dev/shm/rocksdb/rocksdb_crashtest_{whitebox,expected}/ && mkdir -p /dev/shm/rocksdb/rocksdb_crashtest_{whitebox,expected}/ && ./db_stress --acquire_snapshot_one_in=10000 --atomic_flush=1 --avoid_flush_during_recovery=0 --avoid_unnecessary_blocking_io=1 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=0 --block_size=16384 --bloom_bits=131.8094496796033 --bottommost_compression_type=zlib --cache_index_and_filter_blocks=1 --cache_size=1048576 --checkpoint_one_in=1000000 --checksum_type=kCRC32c --clear_column_family_one_in=0 --compact_files_one_in=1000000 --compact_range_one_in=1000000 --compaction_style=1 --compaction_ttl=0 --compression_max_dict_buffer_bytes=134217727 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=zstd --compression_zstd_max_train_bytes=65536 --continuous_verification_interval=0 --db=/dev/shm/rocksdb/rocksdb_crashtest_whitebox --db_write_buffer_size=8388608 --delpercent=5 --delrangepercent=0 --destroy_db_initially=0 --disable_wal=1 --enable_compaction_filter=0 --enable_pipelined_write=0 --fail_if_options_file_error=1 --file_checksum_impl=crc32c --flush_one_in=1000000 --format_version=5 --get_current_wal_file_one_in=0 --get_live_files_one_in=1000000 --get_property_one_in=1000000 --get_sorted_wal_files_one_in=0 --index_block_restart_interval=15 --index_type=3 --iterpercent=10 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True --log2_keys_per_lock=22 --long_running_snapshots=0 --mark_for_compaction_one_file_in=10 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=1000000 --max_key_len=3 --max_manifest_file_size=1073741824 --max_write_batch_group_size_bytes=1048576 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=4194304 --memtablerep=skip_list --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --open_files=500000 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=32 --open_write_fault_one_in=0 --ops_per_thread=20000 --optimize_filters_for_memory=1 --paranoid_file_checks=0 --partition_filters=0 --partition_pinning=0 --pause_background_one_in=1000000 --periodic_compaction_seconds=0 --prefixpercent=5 --prepopulate_block_cache=1 --progress_reports=0 --read_fault_one_in=1000 --readpercent=45 --recycle_log_file_num=1 --reopen=0 --ribbon_starting_level=999 --secondary_cache_fault_one_in=32 --snapshot_hold_ops=100000 --sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=1048576 --subcompactions=2 --sync=0 --sync_fault_injection=False --target_file_size_base=2097152 --target_file_size_multiplier=2 --test_batches_snapshots=1 --test_cf_consistency=1 --top_level_index_pinning=0 --unpartitioned_pinning=0 --use_block_based_filter=1 --use_clock_cache=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=1 --use_merge=0 --use_multiget=1 --user_timestamp_size=0 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_db_one_in=100000 --write_buffer_size=1048576 --write_dbid_to_manifest=1 --write_fault_one_in=0 --writepercent=35 ``` TSAN error: ``` WARNING: ThreadSanitizer: data race (pid=2750142) Read of size 4 at 0x7ffc21d7f58c by thread T39 (mutexes: write M670895590377780496): #0 rocksdb::SharedState::AllInitialized() const db_stress_tool/db_stress_shared_state.h:204 (db_stress+0x4fd307) https://github.com/facebook/rocksdb/issues/1 rocksdb::ThreadBody(void*) db_stress_tool/db_stress_driver.cc:26 (db_stress+0x4fd307) https://github.com/facebook/rocksdb/issues/2 StartThreadWrapper env/env_posix.cc:454 (db_stress+0x84472f) Previous write of size 4 at 0x7ffc21d7f58c by main thread: #0 rocksdb::SharedState::IncThreads() db_stress_tool/db_stress_shared_state.h:194 (db_stress+0x4fd779) https://github.com/facebook/rocksdb/issues/1 rocksdb::RunStressTest(rocksdb::StressTest*) db_stress_tool/db_stress_driver.cc:78 (db_stress+0x4fd779) https://github.com/facebook/rocksdb/issues/2 rocksdb::db_stress_tool(int, char**) db_stress_tool/db_stress_tool.cc:348 (db_stress+0x4b97dc) https://github.com/facebook/rocksdb/issues/3 main db_stress_tool/db_stress.cc:21 (db_stress+0x47a351) Location is stack of main thread. Location is global '<null>' at 0x000000000000 ([stack]+0x00000001d58c) Mutex M670895590377780496 is already destroyed. Thread T39 (tid=2750211, running) created by main thread at: #0 pthread_create /home/engshare/third-party2/gcc/9.x/src/gcc-10.x/libsanitizer/tsan/tsan_interceptors.cc:964 (libtsan.so.0+0x613c3) https://github.com/facebook/rocksdb/issues/1 StartThread env/env_posix.cc:464 (db_stress+0x8463c2) https://github.com/facebook/rocksdb/issues/2 rocksdb::CompositeEnvWrapper::StartThread(void (*)(void*), void*) env/composite_env_wrapper.h:288 (db_stress+0x4bcd20) https://github.com/facebook/rocksdb/issues/3 rocksdb::EnvWrapper::StartThread(void (*)(void*), void*) include/rocksdb/env.h:1475 (db_stress+0x4bb950) https://github.com/facebook/rocksdb/issues/4 rocksdb::RunStressTest(rocksdb::StressTest*) db_stress_tool/db_stress_driver.cc:80 (db_stress+0x4fd9d2) https://github.com/facebook/rocksdb/issues/5 rocksdb::db_stress_tool(int, char**) db_stress_tool/db_stress_tool.cc:348 (db_stress+0x4b97dc) https://github.com/facebook/rocksdb/issues/6 main db_stress_tool/db_stress.cc:21 (db_stress+0x47a351) ThreadSanitizer: data race db_stress_tool/db_stress_shared_state.h:204 in rocksdb::SharedState::AllInitialized() const ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/9314 Test Plan: verified repro command works after this PR. Reviewed By: jay-zhuang Differential Revision: D33217698 Pulled By: ajkr fbshipit-source-id: 79358fe5adb779fc9dcf80643cc102d4b467fc38
2021-12-20 13:04:08 -08:00
ThreadState bg_thread(0, &shared);
ThreadState continuous_verification_thread(0, &shared);
Fix race condition in db_stress thread setup (#9314) Summary: We need to grab `SharedState`'s mutex while calling `IncThreads()` or `IncBgThreads()`. Otherwise the newly launched threads can simultaneously access the thread counters to check if every thread has finished initializing. Repro command: ``` $ rm -rf /dev/shm/rocksdb/rocksdb_crashtest_{whitebox,expected}/ && mkdir -p /dev/shm/rocksdb/rocksdb_crashtest_{whitebox,expected}/ && ./db_stress --acquire_snapshot_one_in=10000 --atomic_flush=1 --avoid_flush_during_recovery=0 --avoid_unnecessary_blocking_io=1 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=0 --block_size=16384 --bloom_bits=131.8094496796033 --bottommost_compression_type=zlib --cache_index_and_filter_blocks=1 --cache_size=1048576 --checkpoint_one_in=1000000 --checksum_type=kCRC32c --clear_column_family_one_in=0 --compact_files_one_in=1000000 --compact_range_one_in=1000000 --compaction_style=1 --compaction_ttl=0 --compression_max_dict_buffer_bytes=134217727 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=zstd --compression_zstd_max_train_bytes=65536 --continuous_verification_interval=0 --db=/dev/shm/rocksdb/rocksdb_crashtest_whitebox --db_write_buffer_size=8388608 --delpercent=5 --delrangepercent=0 --destroy_db_initially=0 --disable_wal=1 --enable_compaction_filter=0 --enable_pipelined_write=0 --fail_if_options_file_error=1 --file_checksum_impl=crc32c --flush_one_in=1000000 --format_version=5 --get_current_wal_file_one_in=0 --get_live_files_one_in=1000000 --get_property_one_in=1000000 --get_sorted_wal_files_one_in=0 --index_block_restart_interval=15 --index_type=3 --iterpercent=10 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True --log2_keys_per_lock=22 --long_running_snapshots=0 --mark_for_compaction_one_file_in=10 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=1000000 --max_key_len=3 --max_manifest_file_size=1073741824 --max_write_batch_group_size_bytes=1048576 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=4194304 --memtablerep=skip_list --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --open_files=500000 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=32 --open_write_fault_one_in=0 --ops_per_thread=20000 --optimize_filters_for_memory=1 --paranoid_file_checks=0 --partition_filters=0 --partition_pinning=0 --pause_background_one_in=1000000 --periodic_compaction_seconds=0 --prefixpercent=5 --prepopulate_block_cache=1 --progress_reports=0 --read_fault_one_in=1000 --readpercent=45 --recycle_log_file_num=1 --reopen=0 --ribbon_starting_level=999 --secondary_cache_fault_one_in=32 --snapshot_hold_ops=100000 --sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=1048576 --subcompactions=2 --sync=0 --sync_fault_injection=False --target_file_size_base=2097152 --target_file_size_multiplier=2 --test_batches_snapshots=1 --test_cf_consistency=1 --top_level_index_pinning=0 --unpartitioned_pinning=0 --use_block_based_filter=1 --use_clock_cache=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=1 --use_merge=0 --use_multiget=1 --user_timestamp_size=0 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_db_one_in=100000 --write_buffer_size=1048576 --write_dbid_to_manifest=1 --write_fault_one_in=0 --writepercent=35 ``` TSAN error: ``` WARNING: ThreadSanitizer: data race (pid=2750142) Read of size 4 at 0x7ffc21d7f58c by thread T39 (mutexes: write M670895590377780496): #0 rocksdb::SharedState::AllInitialized() const db_stress_tool/db_stress_shared_state.h:204 (db_stress+0x4fd307) https://github.com/facebook/rocksdb/issues/1 rocksdb::ThreadBody(void*) db_stress_tool/db_stress_driver.cc:26 (db_stress+0x4fd307) https://github.com/facebook/rocksdb/issues/2 StartThreadWrapper env/env_posix.cc:454 (db_stress+0x84472f) Previous write of size 4 at 0x7ffc21d7f58c by main thread: #0 rocksdb::SharedState::IncThreads() db_stress_tool/db_stress_shared_state.h:194 (db_stress+0x4fd779) https://github.com/facebook/rocksdb/issues/1 rocksdb::RunStressTest(rocksdb::StressTest*) db_stress_tool/db_stress_driver.cc:78 (db_stress+0x4fd779) https://github.com/facebook/rocksdb/issues/2 rocksdb::db_stress_tool(int, char**) db_stress_tool/db_stress_tool.cc:348 (db_stress+0x4b97dc) https://github.com/facebook/rocksdb/issues/3 main db_stress_tool/db_stress.cc:21 (db_stress+0x47a351) Location is stack of main thread. Location is global '<null>' at 0x000000000000 ([stack]+0x00000001d58c) Mutex M670895590377780496 is already destroyed. Thread T39 (tid=2750211, running) created by main thread at: #0 pthread_create /home/engshare/third-party2/gcc/9.x/src/gcc-10.x/libsanitizer/tsan/tsan_interceptors.cc:964 (libtsan.so.0+0x613c3) https://github.com/facebook/rocksdb/issues/1 StartThread env/env_posix.cc:464 (db_stress+0x8463c2) https://github.com/facebook/rocksdb/issues/2 rocksdb::CompositeEnvWrapper::StartThread(void (*)(void*), void*) env/composite_env_wrapper.h:288 (db_stress+0x4bcd20) https://github.com/facebook/rocksdb/issues/3 rocksdb::EnvWrapper::StartThread(void (*)(void*), void*) include/rocksdb/env.h:1475 (db_stress+0x4bb950) https://github.com/facebook/rocksdb/issues/4 rocksdb::RunStressTest(rocksdb::StressTest*) db_stress_tool/db_stress_driver.cc:80 (db_stress+0x4fd9d2) https://github.com/facebook/rocksdb/issues/5 rocksdb::db_stress_tool(int, char**) db_stress_tool/db_stress_tool.cc:348 (db_stress+0x4b97dc) https://github.com/facebook/rocksdb/issues/6 main db_stress_tool/db_stress.cc:21 (db_stress+0x47a351) ThreadSanitizer: data race db_stress_tool/db_stress_shared_state.h:204 in rocksdb::SharedState::AllInitialized() const ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/9314 Test Plan: verified repro command works after this PR. Reviewed By: jay-zhuang Differential Revision: D33217698 Pulled By: ajkr fbshipit-source-id: 79358fe5adb779fc9dcf80643cc102d4b467fc38
2021-12-20 13:04:08 -08:00
std::vector<ThreadState*> threads(n);
{
MutexLock l(shared.GetMutex());
for (uint32_t i = 0; i < n; i++) {
shared.IncThreads();
threads[i] = new ThreadState(i, &shared);
db_stress_env->StartThread(ThreadBody, threads[i]);
}
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
shared.IncBgThreads();
db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
}
if (FLAGS_continuous_verification_interval > 0) {
shared.IncBgThreads();
db_stress_env->StartThread(DbVerificationThread,
&continuous_verification_thread);
}
}
// Each thread goes through the following states:
// initializing -> wait for others to init -> read/populate/depopulate
// wait for others to operate -> verify -> done
{
MutexLock l(shared.GetMutex());
while (!shared.AllInitialized()) {
shared.GetCondVar()->Wait();
}
if (shared.ShouldVerifyAtBeginning()) {
if (shared.HasVerificationFailedYet()) {
fprintf(stderr, "Crash-recovery verification failed :(\n");
} else {
fprintf(stdout, "Crash-recovery verification passed :)\n");
}
}
now = clock->NowMicros();
fprintf(stdout, "%s Starting database operations\n",
clock->TimeToString(now / 1000000).c_str());
shared.SetStart();
shared.GetCondVar()->SignalAll();
while (!shared.AllOperated()) {
shared.GetCondVar()->Wait();
}
now = clock->NowMicros();
if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "%s Limited verification already done during gets\n",
clock->TimeToString((uint64_t)now / 1000000).c_str());
} else if (FLAGS_skip_verifydb) {
fprintf(stdout, "%s Verification skipped\n",
clock->TimeToString((uint64_t)now / 1000000).c_str());
} else {
fprintf(stdout, "%s Starting verification\n",
clock->TimeToString((uint64_t)now / 1000000).c_str());
}
shared.SetStartVerify();
shared.GetCondVar()->SignalAll();
while (!shared.AllDone()) {
shared.GetCondVar()->Wait();
}
}
for (unsigned int i = 1; i < n; i++) {
threads[0]->stats.Merge(threads[i]->stats);
}
threads[0]->stats.Report("Stress Test");
for (unsigned int i = 0; i < n; i++) {
delete threads[i];
threads[i] = nullptr;
}
now = clock->NowMicros();
if (!FLAGS_skip_verifydb && !FLAGS_test_batches_snapshots &&
!shared.HasVerificationFailedYet()) {
fprintf(stdout, "%s Verification successful\n",
clock->TimeToString(now / 1000000).c_str());
}
stress->PrintStatistics();
if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
FLAGS_continuous_verification_interval > 0) {
MutexLock l(shared.GetMutex());
shared.SetShouldStopBgThread();
while (!shared.BgThreadsFinished()) {
shared.GetCondVar()->Wait();
}
}
if (!stress->VerifySecondaries()) {
return false;
}
if (shared.HasVerificationFailedYet()) {
fprintf(stderr, "Verification failed :(\n");
return false;
}
return true;
}
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS