From e9b2af87f8db3537762821f75d0d716246a7d743 Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Fri, 26 Aug 2016 10:41:35 -0700 Subject: [PATCH] Expose ThreadPool under include/rocksdb/threadpool.h Summary: This diff split ThreadPool to -ThreadPool (abstract interface exposed in include/rocksdb/threadpool.h) -ThreadPoolImpl (actual implementation in util/threadpool_imp.h) This allow us to expose ThreadPool to the user so we can use it as an option later Test Plan: existing unit tests Reviewers: andrewkr, yiwu, yhchiang, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D62085 --- CMakeLists.txt | 2 +- include/rocksdb/threadpool.h | 37 +++++++++++++++++ port/win/env_win.h | 9 ++--- src.mk | 2 +- util/env_posix.cc | 12 +++--- util/{threadpool.cc => threadpool_imp.cc} | 48 ++++++++++++----------- util/{threadpool.h => threadpool_imp.h} | 13 +++--- 7 files changed, 82 insertions(+), 41 deletions(-) create mode 100644 include/rocksdb/threadpool.h rename util/{threadpool.cc => threadpool_imp.cc} (88%) rename util/{threadpool.h => threadpool_imp.h} (92%) diff --git a/CMakeLists.txt b/CMakeLists.txt index b5a89e001..ea939fcee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -240,7 +240,7 @@ set(SOURCES util/testharness.cc util/testutil.cc util/thread_local.cc - util/threadpool.cc + util/threadpool_imp.cc util/thread_status_impl.cc util/thread_status_updater.cc util/thread_status_util.cc diff --git a/include/rocksdb/threadpool.h b/include/rocksdb/threadpool.h new file mode 100644 index 000000000..9ef0aad14 --- /dev/null +++ b/include/rocksdb/threadpool.h @@ -0,0 +1,37 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +namespace rocksdb { + +/* + * ThreadPool is a component that will spawn N background threads that will + * be used to execute scheduled work, The number of background threads could + * be modified by calling SetBackgroundThreads(). + * */ +class ThreadPool { + public: + virtual ~ThreadPool() {} + + // Wait for all threads to finish. + virtual void JoinAllThreads() = 0; + + // Set the number of background threads that will be executing the + // scheduled jobs. + virtual void SetBackgroundThreads(int num) = 0; + + // Get the number of jobs scheduled in the ThreadPool queue. + virtual unsigned int GetQueueLen() const = 0; +}; + +// NewThreadPool() is a function that could be used to create a ThreadPool +// with `num_threads` background threads. +extern ThreadPool* NewThreadPool(int num_threads); + +} // namespace rocksdb diff --git a/port/win/env_win.h b/port/win/env_win.h index 9b1e012c9..2c994f2a4 100644 --- a/port/win/env_win.h +++ b/port/win/env_win.h @@ -17,7 +17,7 @@ #pragma once #include -#include "util/threadpool.h" +#include "util/threadpool_imp.h" #include #include @@ -63,7 +63,7 @@ private: Env* hosted_env_; mutable std::mutex mu_; - std::vector thread_pools_; + std::vector thread_pools_; std::vector threads_to_join_; }; @@ -268,9 +268,8 @@ public: private: WinEnvIO winenv_io_; - WinEnvThreads winenv_threads_; - + WinEnvThreads winenv_threads_; }; } -} \ No newline at end of file +} diff --git a/src.mk b/src.mk index 4d7a4cf7d..76299958e 100644 --- a/src.mk +++ b/src.mk @@ -110,7 +110,7 @@ LIB_SOURCES = \ util/iostats_context.cc \ util/io_posix.cc \ util/lru_cache.cc \ - util/threadpool.cc \ + util/threadpool_imp.cc \ util/transaction_test_util.cc \ util/sharded_cache.cc \ util/sst_file_manager_impl.cc \ diff --git a/util/env_posix.cc b/util/env_posix.cc index 6e1141d7a..692c2481f 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -42,7 +42,6 @@ #include "rocksdb/slice.h" #include "util/coding.h" #include "util/io_posix.h" -#include "util/threadpool.h" #include "util/iostats_context_imp.h" #include "util/logging.h" #include "util/posix_logger.h" @@ -51,6 +50,7 @@ #include "util/sync_point.h" #include "util/thread_local.h" #include "util/thread_status_updater.h" +#include "util/threadpool_imp.h" #if !defined(TMPFS_MAGIC) #define TMPFS_MAGIC 0x01021994 @@ -739,7 +739,7 @@ class PosixEnv : public Env { size_t page_size_; - std::vector thread_pools_; + std::vector thread_pools_; pthread_mutex_t mu_; std::vector threads_to_join_; }; @@ -749,7 +749,7 @@ PosixEnv::PosixEnv() forceMmapOff(false), page_size_(getpagesize()), thread_pools_(Priority::TOTAL) { - ThreadPool::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); + ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].SetThreadPriority( static_cast(pool_id)); @@ -791,11 +791,11 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { StartThreadState* state = new StartThreadState; state->user_function = function; state->arg = arg; - ThreadPool::PthreadCall( + ThreadPoolImpl::PthreadCall( "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); - ThreadPool::PthreadCall("lock", pthread_mutex_lock(&mu_)); + ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_)); threads_to_join_.push_back(t); - ThreadPool::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } void PosixEnv::WaitForJoin() { diff --git a/util/threadpool.cc b/util/threadpool_imp.cc similarity index 88% rename from util/threadpool.cc rename to util/threadpool_imp.cc index 9b5752d4c..1c8153477 100644 --- a/util/threadpool.cc +++ b/util/threadpool_imp.cc @@ -7,9 +7,9 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "util/threadpool.h" -#include +#include "util/threadpool_imp.h" #include +#include #ifndef OS_WIN # include @@ -26,7 +26,7 @@ namespace rocksdb { -void ThreadPool::PthreadCall(const char* label, int result) { +void ThreadPoolImpl::PthreadCall(const char* label, int result) { if (result != 0) { fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); abort(); @@ -38,7 +38,7 @@ namespace { struct Lock { std::unique_lock ul_; - Lock(std::mutex& m) : ul_(m, std::defer_lock) {} + explicit Lock(const std::mutex& m) : ul_(m, std::defer_lock) {} }; using Condition = std::condition_variable; @@ -124,7 +124,7 @@ int ThreadDetach(pthread_t& thread) { #endif } -ThreadPool::ThreadPool() +ThreadPoolImpl::ThreadPoolImpl() : total_threads_limit_(1), bgthreads_(0), queue_(), @@ -138,10 +138,9 @@ ThreadPool::ThreadPool() #endif } -ThreadPool::~ThreadPool() { assert(bgthreads_.size() == 0U); } - -void ThreadPool::JoinAllThreads() { +ThreadPoolImpl::~ThreadPoolImpl() { assert(bgthreads_.size() == 0U); } +void ThreadPoolImpl::JoinAllThreads() { Lock lock(mu_); PthreadCall("lock", ThreadPoolMutexLock(lock)); assert(!exit_all_threads_); @@ -156,7 +155,7 @@ void ThreadPool::JoinAllThreads() { bgthreads_.clear(); } -void ThreadPool::LowerIOPriority() { +void ThreadPoolImpl::LowerIOPriority() { #ifdef OS_LINUX PthreadCall("lock", pthread_mutex_lock(&mu_)); low_io_priority_ = true; @@ -164,7 +163,7 @@ void ThreadPool::LowerIOPriority() { #endif } -void ThreadPool::BGThread(size_t thread_id) { +void ThreadPoolImpl::BGThread(size_t thread_id) { bool low_io_priority = false; while (true) { // Wait until there is an item that is ready to run @@ -233,16 +232,16 @@ void ThreadPool::BGThread(size_t thread_id) { // Helper struct for passing arguments when creating threads. struct BGThreadMetadata { - ThreadPool* thread_pool_; + ThreadPoolImpl* thread_pool_; size_t thread_id_; // Thread count in the thread. - BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) + BGThreadMetadata(ThreadPoolImpl* thread_pool, size_t thread_id) : thread_pool_(thread_pool), thread_id_(thread_id) {} }; static void* BGThreadWrapper(void* arg) { BGThreadMetadata* meta = reinterpret_cast(arg); size_t thread_id = meta->thread_id_; - ThreadPool* tp = meta->thread_pool_; + ThreadPoolImpl* tp = meta->thread_pool_; #if ROCKSDB_USING_THREAD_STATUS // for thread-status ThreadStatusUtil::RegisterThread( @@ -258,11 +257,11 @@ static void* BGThreadWrapper(void* arg) { return nullptr; } -void ThreadPool::WakeUpAllThreads() { +void ThreadPoolImpl::WakeUpAllThreads() { PthreadCall("signalall", ConditionSignalAll(bgsignal_)); } -void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) { +void ThreadPoolImpl::SetBackgroundThreadsInternal(int num, bool allow_reduce) { Lock lock(mu_); PthreadCall("lock", ThreadPoolMutexLock(lock)); if (exit_all_threads_) { @@ -278,15 +277,15 @@ void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) { PthreadCall("unlock", MutexUnlock(lock)); } -void ThreadPool::IncBackgroundThreadsIfNeeded(int num) { +void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { SetBackgroundThreadsInternal(num, false); } -void ThreadPool::SetBackgroundThreads(int num) { +void ThreadPoolImpl::SetBackgroundThreads(int num) { SetBackgroundThreadsInternal(num, true); } -void ThreadPool::StartBGThreads() { +void ThreadPoolImpl::StartBGThreads() { // Start background thread if necessary while ((int)bgthreads_.size() < total_threads_limit_) { #ifdef ROCKSDB_STD_THREADPOOL @@ -313,9 +312,8 @@ void ThreadPool::StartBGThreads() { } } -void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag, - void (*unschedFunction)(void* arg)) { - +void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg, + void* tag, void (*unschedFunction)(void* arg)) { Lock lock(mu_); PthreadCall("lock", ThreadPoolMutexLock(lock)); @@ -347,7 +345,7 @@ void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag, PthreadCall("unlock", MutexUnlock(lock)); } -int ThreadPool::UnSchedule(void* arg) { +int ThreadPoolImpl::UnSchedule(void* arg) { int count = 0; Lock lock(mu_); @@ -374,4 +372,10 @@ int ThreadPool::UnSchedule(void* arg) { return count; } +ThreadPool* NewThreadPool(int num_threads) { + ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); + thread_pool->SetBackgroundThreads(num_threads); + return thread_pool; +} + } // namespace rocksdb diff --git a/util/threadpool.h b/util/threadpool_imp.h similarity index 92% rename from util/threadpool.h rename to util/threadpool_imp.h index bc6b4c69e..ce1589a51 100644 --- a/util/threadpool.h +++ b/util/threadpool_imp.h @@ -13,6 +13,7 @@ #endif #include "rocksdb/env.h" +#include "rocksdb/threadpool.h" #include "util/thread_status_util.h" #ifdef ROCKSDB_STD_THREADPOOL @@ -26,23 +27,23 @@ namespace rocksdb { -class ThreadPool { +class ThreadPoolImpl : public ThreadPool { public: - ThreadPool(); - ~ThreadPool(); + ThreadPoolImpl(); + ~ThreadPoolImpl(); - void JoinAllThreads(); + void JoinAllThreads() override; void LowerIOPriority(); void BGThread(size_t thread_id); void WakeUpAllThreads(); void IncBackgroundThreadsIfNeeded(int num); - void SetBackgroundThreads(int num); + void SetBackgroundThreads(int num) override; void StartBGThreads(); void Schedule(void (*function)(void* arg1), void* arg, void* tag, void (*unschedFunction)(void* arg)); int UnSchedule(void* arg); - unsigned int GetQueueLen() const { + unsigned int GetQueueLen() const override { return queue_len_.load(std::memory_order_relaxed); }