From 3df07d1703297cf8bf98327c1dc65f48d18957f5 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 19 May 2014 10:40:18 -0700 Subject: [PATCH 1/4] ThreadPool to allow decrease number of threads and increase of number of threads is to be instantly scheduled Summary: Add a feature to decrease the number of threads in thread pool. Also instantly schedule more threads if number of threads is increased. Here is the way it is implemented: each background thread needs its thread ID. After decreasing number of threads, all threads are woken up. The thread with the largest thread ID will terminate. If there are more threads to terminate, the thread will wake up all threads again. Another change is made so that when number of threads is increased, more threads are created and all previous excessive threads are woken up to do the work. Test Plan: Add a unit test. Reviewers: haobo, dhruba Reviewed By: haobo CC: yhchiang, igor, nkg-, leveldb Differential Revision: https://reviews.facebook.net/D18675 --- util/env_posix.cc | 100 +++++++++++++++++++----- util/env_test.cc | 191 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 273 insertions(+), 18 deletions(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index 5cbd5bd00..52787517a 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1470,17 +1470,50 @@ class PosixEnv : public Env { } } - void BGThread() { + // Return true if there is at least one thread needs to terminate. + bool HasExcessiveThread() { + return static_cast(bgthreads_.size()) > total_threads_limit_; + } + + // Return true iff the current thread is the excessive thread to terminate. + // Always terminate the running thread that is added last, even if there are + // more than one thread to terminate. + bool IsLastExcessiveThread(size_t thread_id) { + return HasExcessiveThread() && + thread_id == bgthreads_.size() - 1; + } + + // Is one of the threads to terminate. + bool IsExcessiveThread(size_t thread_id) { + return static_cast(thread_id) >= total_threads_limit_; + } + + void BGThread(size_t thread_id) { while (true) { // Wait until there is an item that is ready to run PthreadCall("lock", pthread_mutex_lock(&mu_)); - while (queue_.empty() && !exit_all_threads_) { + // Stop waiting if the thread needs to do work or needs to terminate. + while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && + (queue_.empty() || IsExcessiveThread(thread_id))) { PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); } if (exit_all_threads_) { // mechanism to let BG threads exit safely PthreadCall("unlock", pthread_mutex_unlock(&mu_)); break; } + if (IsLastExcessiveThread(thread_id)) { + // Current thread is the last generated one and is excessive. + // We always terminate excessive thread in the reverse order of + // generation time. + pthread_detach(bgthreads_.back()); + bgthreads_.pop_back(); + if (HasExcessiveThread()) { + // There is still at least more excessive thread to terminate. + WakeUpAllThreads(); + } + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + break; + } void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); @@ -1491,36 +1524,50 @@ class PosixEnv : public Env { } } + // Helper struct for passing arguments when creating threads. + struct BGThreadMetadata { + ThreadPool* thread_pool_; + size_t thread_id_; // Thread count in the thread. + explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) + : thread_pool_(thread_pool), thread_id_(thread_id) {} + }; + static void* BGThreadWrapper(void* arg) { - reinterpret_cast(arg)->BGThread(); + BGThreadMetadata* meta = reinterpret_cast(arg); + size_t thread_id = meta->thread_id_; + ThreadPool* tp = meta->thread_pool_; + delete meta; + tp->BGThread(thread_id); return nullptr; } + void WakeUpAllThreads() { + PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); + } + void SetBackgroundThreads(int num) { PthreadCall("lock", pthread_mutex_lock(&mu_)); - if (num > total_threads_limit_) { + if (exit_all_threads_) { + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return; + } + if (num != total_threads_limit_) { total_threads_limit_ = num; + WakeUpAllThreads(); + StartBGThreads(); } assert(total_threads_limit_ > 0); PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } - void Schedule(void (*function)(void*), void* arg) { - PthreadCall("lock", pthread_mutex_lock(&mu_)); - - if (exit_all_threads_) { - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - return; - } + void StartBGThreads() { // Start background thread if necessary while ((int)bgthreads_.size() < total_threads_limit_) { pthread_t t; PthreadCall( - "create thread", - pthread_create(&t, - nullptr, - &ThreadPool::BGThreadWrapper, - this)); + "create thread", + pthread_create(&t, nullptr, &ThreadPool::BGThreadWrapper, + new BGThreadMetadata(this, bgthreads_.size()))); // Set the thread name to aid debugging #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) @@ -1534,6 +1581,17 @@ class PosixEnv : public Env { bgthreads_.push_back(t); } + } + + void Schedule(void (*function)(void*), void* arg) { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + + if (exit_all_threads_) { + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return; + } + + StartBGThreads(); // Add to priority queue queue_.push_back(BGItem()); @@ -1541,8 +1599,14 @@ class PosixEnv : public Env { queue_.back().arg = arg; queue_len_.store(queue_.size(), std::memory_order_relaxed); - // always wake up at least one waiting thread. - PthreadCall("signal", pthread_cond_signal(&bgsignal_)); + if (!HasExcessiveThread()) { + // Wake up at least one waiting thread. + PthreadCall("signal", pthread_cond_signal(&bgsignal_)); + } else { + // Need to wake up all threads to make sure the one woken + // up is not the one to terminate. + WakeUpAllThreads(); + } PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } diff --git a/util/env_test.cc b/util/env_test.cc index 1ac3773b2..2abce6f3a 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -200,6 +200,197 @@ TEST(EnvPosixTest, TwoPools) { ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); } +TEST(EnvPosixTest, DecreaseNumBgThreads) { + class SleepingBackgroundTask { + public: + explicit SleepingBackgroundTask() + : bg_cv_(&mutex_), should_sleep_(true), sleeping_(false) {} + void DoSleep() { + MutexLock l(&mutex_); + sleeping_ = true; + while (should_sleep_) { + bg_cv_.Wait(); + } + sleeping_ = false; + bg_cv_.SignalAll(); + } + + void WakeUp() { + MutexLock l(&mutex_); + should_sleep_ = false; + bg_cv_.SignalAll(); + + while (sleeping_) { + bg_cv_.Wait(); + } + } + + bool IsSleeping() { + MutexLock l(&mutex_); + return sleeping_; + } + + static void DoSleepTask(void* arg) { + reinterpret_cast(arg)->DoSleep(); + } + + private: + port::Mutex mutex_; + port::CondVar bg_cv_; // Signalled when background work finishes + bool should_sleep_; + bool sleeping_; + }; + + std::vector tasks(10); + + // Set number of thread to 1 first. + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + + // Schedule 3 tasks. 0 running; Task 1, 2 waiting. + for (size_t i = 0; i < 3; i++) { + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i], + Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + } + ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(!tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Increase to 2 threads. Task 0, 1 running; 2 waiting + env_->SetBackgroundThreads(2, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // The last task finishes. Task 0 running, 2 waiting. + tasks[1].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(!tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Increase to 5 threads. Task 0 and 2 running. + env_->SetBackgroundThreads(5, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(tasks[2].IsSleeping()); + + // Change number of threads a couple of times while there is no sufficient + // tasks. + env_->SetBackgroundThreads(7, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + tasks[2].WakeUp(); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(3, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(5, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + Env::Default()->SleepForMicroseconds(kDelayMicros * 50); + + // Enqueue 5 more tasks. Thread pool size now is 4. + // Task 0, 3, 4, 5 running;6, 7 waiting. + for (size_t i = 3; i < 8; i++) { + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i], + Env::Priority::HIGH); + } + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[3].IsSleeping()); + ASSERT_TRUE(tasks[4].IsSleeping()); + ASSERT_TRUE(tasks[5].IsSleeping()); + ASSERT_TRUE(!tasks[6].IsSleeping()); + ASSERT_TRUE(!tasks[7].IsSleeping()); + + // Wake up task 0, 3 and 4. Task 5, 6, 7 running. + tasks[0].WakeUp(); + tasks[3].WakeUp(); + tasks[4].WakeUp(); + + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + for (size_t i = 5; i < 8; i++) { + ASSERT_TRUE(tasks[i].IsSleeping()); + } + + // Shrink back to 1 thread. Still task 5, 6, 7 running + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(tasks[5].IsSleeping()); + ASSERT_TRUE(tasks[6].IsSleeping()); + ASSERT_TRUE(tasks[7].IsSleeping()); + + // Wake up task 6. Task 5, 7 running + tasks[6].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(tasks[5].IsSleeping()); + ASSERT_TRUE(!tasks[6].IsSleeping()); + ASSERT_TRUE(tasks[7].IsSleeping()); + + // Wake up threads 7. Task 5 running + tasks[7].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[7].IsSleeping()); + + // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running. + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[8], + Env::Priority::HIGH); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9], + Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), 0); + ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping()); + + // Increase to 4 threads. Task 5, 8, 9 running. + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[8].IsSleeping()); + ASSERT_TRUE(tasks[9].IsSleeping()); + + // Shrink to 1 thread + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + + // Wake up thread 9. + tasks[9].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[9].IsSleeping()); + ASSERT_TRUE(tasks[8].IsSleeping()); + + // Wake up thread 8 + tasks[8].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[8].IsSleeping()); + + // Wake up the last thread + tasks[5].WakeUp(); + + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[5].IsSleeping()); +} + #ifdef OS_LINUX // To make sure the Env::GetUniqueId() related tests work correctly, The files // should be stored in regular storage like "hard disk" or "flash device". From bd1105aa5a550224b1655ad38fcac60546df8a92 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 19 May 2014 14:25:11 -0700 Subject: [PATCH 2/4] Print out thread ID while thread terminates for decreased pool size. Summary: Per request from @nkg-, temporarily print thread ID when a thread terminates. It is a temp solution as we try to minimized stderr messages. Test Plan: env_test Reviewers: haobo, igor, dhruba Reviewed By: igor CC: nkg-, leveldb Differential Revision: https://reviews.facebook.net/D18753 --- util/env_posix.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index 52787517a..7ffba6f53 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1479,8 +1479,7 @@ class PosixEnv : public Env { // Always terminate the running thread that is added last, even if there are // more than one thread to terminate. bool IsLastExcessiveThread(size_t thread_id) { - return HasExcessiveThread() && - thread_id == bgthreads_.size() - 1; + return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; } // Is one of the threads to terminate. @@ -1505,13 +1504,18 @@ class PosixEnv : public Env { // Current thread is the last generated one and is excessive. // We always terminate excessive thread in the reverse order of // generation time. - pthread_detach(bgthreads_.back()); + auto terminating_thread = bgthreads_.back(); + pthread_detach(terminating_thread); bgthreads_.pop_back(); if (HasExcessiveThread()) { // There is still at least more excessive thread to terminate. WakeUpAllThreads(); } PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + // TODO(sdong): temp logging. Need to help debugging. Remove it when + // the feature is proved to be stable. + fprintf(stdout, "Bg thread %zu terminates %llx\n", thread_id, + static_cast(terminating_thread)); break; } void (*function)(void*) = queue_.front().function; From b2cf95fe38a7d3d767494eb0254638f0f75b1bbe Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 20 May 2014 14:28:51 -0700 Subject: [PATCH 3/4] Call EnableFileDeletions with false as argument --- db/db_filesnapshot.cc | 9 ++++++++- include/rocksdb/db.h | 2 +- utilities/backupable/backupable_db.cc | 4 ++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 1e1ec9757..582355ccd 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -29,8 +29,11 @@ Status DBImpl::DisableFileDeletions() { MutexLock l(&mutex_); ++disable_delete_obsolete_files_; if (disable_delete_obsolete_files_ == 1) { - // if not, it has already been disabled, so don't log anything Log(options_.info_log, "File Deletions Disabled"); + } else { + Log(options_.info_log, + "File Deletions Disabled, but already disabled. Counter: %d", + disable_delete_obsolete_files_); } return Status::OK(); } @@ -50,6 +53,10 @@ Status DBImpl::EnableFileDeletions(bool force) { Log(options_.info_log, "File Deletions Enabled"); should_purge_files = true; FindObsoleteFiles(deletion_state, true); + } else { + Log(options_.info_log, + "File Deletions Enable, but not really enabled. Counter: %d", + disable_delete_obsolete_files_); } } if (should_purge_files) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index e743b4c88..33b443f40 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -396,7 +396,7 @@ class DB { // times have the same effect as calling it once. virtual Status DisableFileDeletions() = 0; - // Allow compactions to delete obselete files. + // Allow compactions to delete obsolete files. // If force == true, the call to EnableFileDeletions() will guarantee that // file deletions are enabled after the call, even if DisableFileDeletions() // was called multiple times before. diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 87901e0ef..3ac1d90a1 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -426,7 +426,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { s = db->GetSortedWalFiles(live_wal_files); } if (!s.ok()) { - db->EnableFileDeletions(); + db->EnableFileDeletions(false); return s; } @@ -495,7 +495,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { } // we copied all the files, enable file deletions - db->EnableFileDeletions(); + db->EnableFileDeletions(false); if (s.ok()) { // move tmp private backup to real backup folder From f725e4fe1f84abd4ecb0ddf8e4eeddad2e1378ba Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 20 May 2014 17:09:38 -0700 Subject: [PATCH 4/4] Make RateLimiting unit test less flakey --- utilities/backupable/backupable_db_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 6f0c6bc88..e6874fe5d 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -930,7 +930,6 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / backupable_options_->restore_rate_limit; ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); - ASSERT_LT(restore_time, 2.5 * rate_limited_restore_time); AssertBackupConsistency(0, 0, 100000, 100010); }