diff --git a/tools/db_stress.cc b/tools/db_stress.cc index e9d759f32..7e3101c07 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -162,6 +162,14 @@ DEFINE_int32(max_background_compactions, "The maximum number of concurrent background compactions " "that can occur in parallel."); +DEFINE_int32(compaction_thread_pool_adjust_interval, 0, + "The interval (in milliseconds) to adjust compaction thread pool " + "size. Don't change it periodically if the value is 0."); + +DEFINE_int32(compaction_thread_pool_varations, 2, + "Range of bakground thread pool size variations when adjusted " + "periodically."); + DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes, "The maximum number of concurrent background flushes " "that can occur in parallel."); @@ -567,6 +575,8 @@ class SharedState { num_done_(0), start_(false), start_verify_(false), + should_stop_bg_thread_(false), + bg_thread_finished_(false), stress_test_(stress_test), verification_failure_(false) { if (FLAGS_test_batches_snapshots) { @@ -694,6 +704,14 @@ class SharedState { uint32_t GetSeed() const { return seed_; } + void SetShouldStopBgThread() { should_stop_bg_thread_ = true; } + + bool ShoudStopBgThread() { return should_stop_bg_thread_; } + + void SetBgThreadFinish() { bg_thread_finished_ = true; } + + bool BgThreadFinished() const { return bg_thread_finished_; } + private: port::Mutex mu_; port::CondVar cv_; @@ -707,6 +725,8 @@ class SharedState { long num_done_; bool start_; bool start_verify_; + bool should_stop_bg_thread_; + bool bg_thread_finished_; StressTest* stress_test_; std::atomic verification_failure_; @@ -777,6 +797,11 @@ class StressTest { threads[i] = new ThreadState(i, &shared); FLAGS_env->StartThread(ThreadBody, threads[i]); } + ThreadState bg_thread(0, &shared); + if (FLAGS_compaction_thread_pool_adjust_interval > 0) { + FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread); + } + // Each thread goes through the following states: // initializing -> wait for others to init -> read/populate/depopulate // wait for others to operate -> verify -> done @@ -829,6 +854,14 @@ class StressTest { } PrintStatistics(); + if (FLAGS_compaction_thread_pool_adjust_interval > 0) { + MutexLock l(shared.GetMutex()); + shared.SetShouldStopBgThread(); + while (!shared.BgThreadFinished()) { + shared.GetCondVar()->Wait(); + } + } + if (shared.HasVerificationFailedYet()) { printf("Verification failed :(\n"); return false; @@ -879,6 +912,38 @@ class StressTest { } + static void PoolSizeChangeThread(void* v) { + assert(FLAGS_compaction_thread_pool_adjust_interval > 0); + ThreadState* thread = reinterpret_cast(v); + SharedState* shared = thread->shared; + + while (true) { + { + MutexLock l(shared->GetMutex()); + if (shared->ShoudStopBgThread()) { + shared->SetBgThreadFinish(); + shared->GetCondVar()->SignalAll(); + return; + } + } + + auto thread_pool_size_base = FLAGS_max_background_compactions; + auto thread_pool_size_var = FLAGS_compaction_thread_pool_varations; + int new_thread_pool_size = + thread_pool_size_base - thread_pool_size_var + + thread->rand.Next() % (thread_pool_size_var * 2 + 1); + if (new_thread_pool_size < 1) { + new_thread_pool_size = 1; + } + FLAGS_env->SetBackgroundThreads(new_thread_pool_size); + // Sleep up to 3 seconds + FLAGS_env->SleepForMicroseconds( + thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval * + 1000 + + 1); + } + } + // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ... // ("9"+K, "9"+V) in DB atomically i.e in a single batch. // Also refer MultiGet.