Move GetThreadList() feature under Env.

Summary:
GetThreadList() feature depends on the thread creation and destruction, which is currently handled under Env.
This patch moves GetThreadList() feature under Env to better manage the dependency of GetThreadList() feature
on thread creation and destruction.

Renamed ThreadStatusImpl to ThreadStatusUpdater.  Add ThreadStatusUtil, which is a static class contains
utility functions for ThreadStatusUpdater.

Test Plan: run db_test, thread_list_test and db_bench and verify the life cycle of Env and ThreadStatusUpdater is properly managed.

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: ljin, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D30057
This commit is contained in:
Yueh-Hsuan Chiang 2014-12-22 12:20:17 -08:00
parent 4fd26f287c
commit 45bab305f9
13 changed files with 338 additions and 98 deletions

View File

@ -78,7 +78,8 @@
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/sync_point.h" #include "util/sync_point.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/thread_status_impl.h" #include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
namespace rocksdb { namespace rocksdb {
@ -3844,30 +3845,27 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
} }
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
void DBImpl::NewThreadStatusCfInfo( void DBImpl::NewThreadStatusCfInfo(
ColumnFamilyData* cfd) const { ColumnFamilyData* cfd) const {
if (db_options_.enable_thread_tracking) { if (db_options_.enable_thread_tracking) {
ThreadStatusImpl::NewColumnFamilyInfo( ThreadStatusUtil::NewColumnFamilyInfo(this, cfd);
this, GetName(), cfd, cfd->GetName());
} }
} }
void DBImpl::EraseThreadStatusCfInfo( void DBImpl::EraseThreadStatusCfInfo(
ColumnFamilyData* cfd) const { ColumnFamilyData* cfd) const {
if (db_options_.enable_thread_tracking) { if (db_options_.enable_thread_tracking) {
ThreadStatusImpl::EraseColumnFamilyInfo(cfd); ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
} }
} }
void DBImpl::EraseThreadStatusDbInfo() const { void DBImpl::EraseThreadStatusDbInfo() const {
if (db_options_.enable_thread_tracking) { if (db_options_.enable_thread_tracking) {
ThreadStatusImpl::EraseDatabaseInfo(this); ThreadStatusUtil::EraseDatabaseInfo(this);
} }
} }
Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
return thread_local_status.GetThreadList(thread_list);
}
#else #else
void DBImpl::NewThreadStatusCfInfo( void DBImpl::NewThreadStatusCfInfo(
ColumnFamilyData* cfd) const { ColumnFamilyData* cfd) const {

View File

@ -10,7 +10,6 @@
#include "db/merge_context.h" #include "db/merge_context.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/thread_status_impl.h"
namespace rocksdb { namespace rocksdb {

View File

@ -51,7 +51,7 @@
#include "util/testutil.h" #include "util/testutil.h"
#include "util/mock_env.h" #include "util/mock_env.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/thread_status_impl.h" #include "util/thread_status_updater.h"
namespace rocksdb { namespace rocksdb {
@ -9418,7 +9418,7 @@ TEST(DBTest, GetThreadList) {
TryReopen(options); TryReopen(options);
std::vector<ThreadStatus> thread_list; std::vector<ThreadStatus> thread_list;
Status s = GetThreadList(&thread_list); Status s = env_->GetThreadList(&thread_list);
for (int i = 0; i < 2; ++i) { for (int i = 0; i < 2; ++i) {
// repeat the test with differet number of high / low priority threads // repeat the test with differet number of high / low priority threads
@ -9431,7 +9431,7 @@ TEST(DBTest, GetThreadList) {
env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW); env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW);
// Wait to ensure the all threads has been registered // Wait to ensure the all threads has been registered
env_->SleepForMicroseconds(100000); env_->SleepForMicroseconds(100000);
s = GetThreadList(&thread_list); s = env_->GetThreadList(&thread_list);
ASSERT_OK(s); ASSERT_OK(s);
unsigned int thread_type_counts[ThreadStatus::ThreadType::TOTAL]; unsigned int thread_type_counts[ThreadStatus::ThreadType::TOTAL];
memset(thread_type_counts, 0, sizeof(thread_type_counts)); memset(thread_type_counts, 0, sizeof(thread_type_counts));
@ -9455,15 +9455,18 @@ TEST(DBTest, GetThreadList) {
if (i == 0) { if (i == 0) {
// repeat the test with multiple column families // repeat the test with multiple column families
CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options); CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(
handles_, true);
} }
} }
db_->DropColumnFamily(handles_[2]); db_->DropColumnFamily(handles_[2]);
delete handles_[2]; delete handles_[2];
handles_.erase(handles_.begin() + 2); handles_.erase(handles_.begin() + 2);
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(
handles_, true);
Close(); Close();
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(
handles_, true);
} }
TEST(DBTest, DisableThreadList) { TEST(DBTest, DisableThreadList) {
@ -9473,7 +9476,8 @@ TEST(DBTest, DisableThreadList) {
TryReopen(options); TryReopen(options);
CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options); CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
// Verify non of the column family info exists // Verify non of the column family info exists
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, false); env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(
handles_, false);
} }
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS

View File

@ -548,12 +548,6 @@ Status DestroyDB(const std::string& name, const Options& options);
Status RepairDB(const std::string& dbname, const Options& options); Status RepairDB(const std::string& dbname, const Options& options);
#endif #endif
#if ROCKSDB_USING_THREAD_STATUS
// Obtain the status of all rocksdb-related threads.
Status GetThreadList(std::vector<ThreadStatus>* thread_list);
#endif
} // namespace rocksdb } // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_DB_H_ #endif // STORAGE_ROCKSDB_INCLUDE_DB_H_

View File

@ -24,6 +24,7 @@
#include <vector> #include <vector>
#include <stdint.h> #include <stdint.h>
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/thread_status.h"
namespace rocksdb { namespace rocksdb {
@ -37,6 +38,7 @@ class RandomRWFile;
class Directory; class Directory;
struct DBOptions; struct DBOptions;
class RateLimiter; class RateLimiter;
class ThreadStatusUpdater;
using std::unique_ptr; using std::unique_ptr;
using std::shared_ptr; using std::shared_ptr;
@ -83,7 +85,8 @@ struct EnvOptions {
class Env { class Env {
public: public:
Env() { } Env() : thread_status_updater_(nullptr) {}
virtual ~Env(); virtual ~Env();
// Return a default environment suitable for the current operating // Return a default environment suitable for the current operating
@ -302,12 +305,34 @@ class Env {
virtual EnvOptions OptimizeForManifestWrite(const EnvOptions& env_options) virtual EnvOptions OptimizeForManifestWrite(const EnvOptions& env_options)
const; const;
// Returns the status of all threads that belong to the current Env.
virtual Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
return Status::NotSupported("Not supported.");
}
// Returns the pointer to ThreadStatusUpdater. This function will be
// used in RocksDB internally to update thread status and supports
// GetThreadList().
virtual ThreadStatusUpdater* GetThreadStatusUpdater() const {
return thread_status_updater_;
}
protected:
// The pointer to an internal structure that will update the
// status of each thread.
ThreadStatusUpdater* thread_status_updater_;
private: private:
// No copying allowed // No copying allowed
Env(const Env&); Env(const Env&);
void operator=(const Env&); void operator=(const Env&);
}; };
// The factory function to construct a ThreadStatusUpdater. Any Env
// that supports GetThreadList() feature should call this function in its
// constructor to initialize thread_status_updater_.
ThreadStatusUpdater* CreateThreadStatusUpdater();
// A file abstraction for reading sequentially through a file // A file abstraction for reading sequentially through a file
class SequentialFile { class SequentialFile {
public: public:
@ -805,10 +830,19 @@ class EnvWrapper : public Env {
void LowerThreadPoolIOPriority(Priority pool = LOW) override { void LowerThreadPoolIOPriority(Priority pool = LOW) override {
target_->LowerThreadPoolIOPriority(pool); target_->LowerThreadPoolIOPriority(pool);
} }
std::string TimeToString(uint64_t time) { std::string TimeToString(uint64_t time) {
return target_->TimeToString(time); return target_->TimeToString(time);
} }
Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
return target_->GetThreadList(thread_list);
}
ThreadStatusUpdater* GetThreadStatusUpdater() const override {
return target_->GetThreadStatusUpdater();
}
private: private:
Env* target_; Env* target_;
}; };

View File

@ -42,7 +42,8 @@
#include "util/random.h" #include "util/random.h"
#include "util/iostats_context_imp.h" #include "util/iostats_context_imp.h"
#include "util/rate_limiter.h" #include "util/rate_limiter.h"
#include "util/thread_status_impl.h" #include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
// Get nano time for mach systems // Get nano time for mach systems
#ifdef __MACH__ #ifdef __MACH__
@ -76,10 +77,6 @@ int rocksdb_kill_odds = 0;
namespace rocksdb { namespace rocksdb {
#if ROCKSDB_USING_THREAD_STATUS
extern ThreadStatusImpl thread_local_status;
#endif
namespace { namespace {
// A wrapper for fadvise, if the platform doesn't support fadvise, // A wrapper for fadvise, if the platform doesn't support fadvise,
@ -92,6 +89,10 @@ int Fadvise(int fd, off_t offset, size_t len, int advice) {
#endif #endif
} }
ThreadStatusUpdater* CreateThreadStatusUpdater() {
return new ThreadStatusUpdater();
}
// list of pathnames that are locked // list of pathnames that are locked
static std::set<std::string> lockedFiles; static std::set<std::string> lockedFiles;
static port::Mutex mutex_lockedFiles; static port::Mutex mutex_lockedFiles;
@ -1076,10 +1077,16 @@ class PosixEnv : public Env {
public: public:
PosixEnv(); PosixEnv();
virtual ~PosixEnv(){ virtual ~PosixEnv() {
for (const auto tid : threads_to_join_) { for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr); pthread_join(tid, nullptr);
} }
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].JoinAllThreads();
}
// All threads must be joined before the deletion of
// thread_status_updater_.
delete thread_status_updater_;
} }
void SetFD_CLOEXEC(int fd, const EnvOptions* options) { void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
@ -1356,6 +1363,12 @@ class PosixEnv : public Env {
return Status::OK(); return Status::OK();
} }
virtual Status GetThreadList(
std::vector<ThreadStatus>* thread_list) override {
assert(thread_status_updater_);
return thread_status_updater_->GetThreadList(thread_list);
}
static uint64_t gettid(pthread_t tid) { static uint64_t gettid(pthread_t tid) {
uint64_t thread_id = 0; uint64_t thread_id = 0;
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
@ -1534,12 +1547,17 @@ class PosixEnv : public Env {
queue_(), queue_(),
queue_len_(0), queue_len_(0),
exit_all_threads_(false), exit_all_threads_(false),
low_io_priority_(false) { low_io_priority_(false),
env_(nullptr) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
} }
~ThreadPool() { ~ThreadPool() {
assert(bgthreads_.size() == 0U);
}
void JoinAllThreads() {
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
assert(!exit_all_threads_); assert(!exit_all_threads_);
exit_all_threads_ = true; exit_all_threads_ = true;
@ -1548,6 +1566,11 @@ class PosixEnv : public Env {
for (const auto tid : bgthreads_) { for (const auto tid : bgthreads_) {
pthread_join(tid, nullptr); pthread_join(tid, nullptr);
} }
bgthreads_.clear();
}
void SetHostEnv(Env* env) {
env_ = env;
} }
void LowerIOPriority() { void LowerIOPriority() {
@ -1669,7 +1692,7 @@ class PosixEnv : public Env {
ThreadPool* tp = meta->thread_pool_; ThreadPool* tp = meta->thread_pool_;
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
// for thread-status // for thread-status
thread_local_status.SetThreadType( ThreadStatusUtil::SetThreadType(tp->env_,
(tp->GetThreadPriority() == Env::Priority::HIGH ? (tp->GetThreadPriority() == Env::Priority::HIGH ?
ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY : ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY :
ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY)); ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY));
@ -1677,7 +1700,7 @@ class PosixEnv : public Env {
delete meta; delete meta;
tp->BGThread(thread_id); tp->BGThread(thread_id);
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
thread_local_status.UnregisterThread(); ThreadStatusUtil::UnregisterThread();
#endif #endif
return nullptr; return nullptr;
} }
@ -1779,6 +1802,7 @@ class PosixEnv : public Env {
bool exit_all_threads_; bool exit_all_threads_;
bool low_io_priority_; bool low_io_priority_;
Env::Priority priority_; Env::Priority priority_;
Env* env_;
}; };
std::vector<ThreadPool> thread_pools_; std::vector<ThreadPool> thread_pools_;
@ -1796,7 +1820,10 @@ PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority( thread_pools_[pool_id].SetThreadPriority(
static_cast<Env::Priority>(pool_id)); static_cast<Env::Priority>(pool_id));
// This allows later initializing the thread-local-env of each thread.
thread_pools_[pool_id].SetHostEnv(this);
} }
thread_status_updater_ = CreateThreadStatusUpdater();
} }
void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) { void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) {

View File

@ -6,7 +6,7 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include "util/thread_status_impl.h" #include "util/thread_status_updater.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -21,16 +21,16 @@ class SleepingBackgroundTask {
: db_key_(db_key), db_name_(db_name), : db_key_(db_key), db_name_(db_name),
cf_key_(cf_key), cf_name_(cf_name), cf_key_(cf_key), cf_name_(cf_name),
should_sleep_(true), sleeping_count_(0) { should_sleep_(true), sleeping_count_(0) {
ThreadStatusImpl::NewColumnFamilyInfo( Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo(
db_key_, db_name_, cf_key_, cf_name_); db_key_, db_name_, cf_key_, cf_name_);
} }
~SleepingBackgroundTask() { ~SleepingBackgroundTask() {
ThreadStatusImpl::EraseDatabaseInfo(db_key_); Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_);
} }
void DoSleep() { void DoSleep() {
thread_local_status.SetColumnFamilyInfoKey(cf_key_); Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_);
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
sleeping_count_++; sleeping_count_++;
while (should_sleep_) { while (should_sleep_) {
@ -38,7 +38,7 @@ class SleepingBackgroundTask {
} }
sleeping_count_--; sleeping_count_--;
bg_cv_.notify_all(); bg_cv_.notify_all();
thread_local_status.SetColumnFamilyInfoKey(0); Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(0);
} }
void WakeUp() { void WakeUp() {
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
@ -101,7 +101,7 @@ TEST(ThreadListTest, SimpleColumnFamilyInfoTest) {
std::vector<ThreadStatus> thread_list; std::vector<ThreadStatus> thread_list;
// Verify the number of sleeping threads in each pool. // Verify the number of sleeping threads in each pool.
GetThreadList(&thread_list); env->GetThreadList(&thread_list);
int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0}; int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0};
for (auto thread_status : thread_list) { for (auto thread_status : thread_list) {
if (thread_status.cf_name == "pikachu" && if (thread_status.cf_name == "pikachu" &&
@ -122,7 +122,7 @@ TEST(ThreadListTest, SimpleColumnFamilyInfoTest) {
sleeping_task.WaitUntilDone(); sleeping_task.WaitUntilDone();
// Verify none of the threads are sleeping // Verify none of the threads are sleeping
GetThreadList(&thread_list); env->GetThreadList(&thread_list);
for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) { for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) {
sleeping_count[i] = 0; sleeping_count[i] = 0;
} }

View File

@ -5,26 +5,15 @@
#include "port/likely.h" #include "port/likely.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/thread_status_impl.h" #include "util/thread_status_updater.h"
namespace rocksdb { namespace rocksdb {
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
__thread ThreadStatusData* ThreadStatusImpl::thread_status_data_ = nullptr;
std::mutex ThreadStatusImpl::thread_list_mutex_;
std::unordered_set<ThreadStatusData*> ThreadStatusImpl::thread_data_set_;
std::unordered_map<const void*, std::unique_ptr<ConstantColumnFamilyInfo>>
ThreadStatusImpl::cf_info_map_;
std::unordered_map<const void*, std::unordered_set<const void*>>
ThreadStatusImpl::db_key_map_;
ThreadStatusImpl thread_local_status; __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr;
ThreadStatusImpl::~ThreadStatusImpl() { void ThreadStatusUpdater::UnregisterThread() {
assert(thread_data_set_.size() == 0);
}
void ThreadStatusImpl::UnregisterThread() {
if (thread_status_data_ != nullptr) { if (thread_status_data_ != nullptr) {
std::lock_guard<std::mutex> lck(thread_list_mutex_); std::lock_guard<std::mutex> lck(thread_list_mutex_);
thread_data_set_.erase(thread_status_data_); thread_data_set_.erase(thread_status_data_);
@ -33,26 +22,26 @@ void ThreadStatusImpl::UnregisterThread() {
} }
} }
void ThreadStatusImpl::SetThreadType( void ThreadStatusUpdater::SetThreadType(
ThreadStatus::ThreadType ttype) { ThreadStatus::ThreadType ttype) {
auto* data = InitAndGet(); auto* data = InitAndGet();
data->thread_type.store(ttype, std::memory_order_relaxed); data->thread_type.store(ttype, std::memory_order_relaxed);
} }
void ThreadStatusImpl::SetColumnFamilyInfoKey( void ThreadStatusUpdater::SetColumnFamilyInfoKey(
const void* cf_key) { const void* cf_key) {
auto* data = InitAndGet(); auto* data = InitAndGet();
data->cf_key.store(cf_key, std::memory_order_relaxed); data->cf_key.store(cf_key, std::memory_order_relaxed);
} }
void ThreadStatusImpl::SetEventInfoPtr( void ThreadStatusUpdater::SetEventInfoPtr(
const ThreadEventInfo* event_info) { const ThreadEventInfo* event_info) {
auto* data = InitAndGet(); auto* data = InitAndGet();
data->event_info.store(event_info, std::memory_order_relaxed); data->event_info.store(event_info, std::memory_order_relaxed);
} }
Status ThreadStatusImpl::GetThreadList( Status ThreadStatusUpdater::GetThreadList(
std::vector<ThreadStatus>* thread_list) const { std::vector<ThreadStatus>* thread_list) {
thread_list->clear(); thread_list->clear();
std::vector<std::shared_ptr<ThreadStatusData>> valid_list; std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
@ -90,7 +79,7 @@ Status ThreadStatusImpl::GetThreadList(
return Status::OK(); return Status::OK();
} }
ThreadStatusData* ThreadStatusImpl::InitAndGet() { ThreadStatusData* ThreadStatusUpdater::InitAndGet() {
if (UNLIKELY(thread_status_data_ == nullptr)) { if (UNLIKELY(thread_status_data_ == nullptr)) {
thread_status_data_ = new ThreadStatusData(); thread_status_data_ = new ThreadStatusData();
thread_status_data_->thread_id = reinterpret_cast<uint64_t>( thread_status_data_->thread_id = reinterpret_cast<uint64_t>(
@ -101,7 +90,7 @@ ThreadStatusData* ThreadStatusImpl::InitAndGet() {
return thread_status_data_; return thread_status_data_;
} }
void ThreadStatusImpl::NewColumnFamilyInfo( void ThreadStatusUpdater::NewColumnFamilyInfo(
const void* db_key, const std::string& db_name, const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name) { const void* cf_key, const std::string& cf_name) {
std::lock_guard<std::mutex> lck(thread_list_mutex_); std::lock_guard<std::mutex> lck(thread_list_mutex_);
@ -111,7 +100,7 @@ void ThreadStatusImpl::NewColumnFamilyInfo(
db_key_map_[db_key].insert(cf_key); db_key_map_[db_key].insert(cf_key);
} }
void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
std::lock_guard<std::mutex> lck(thread_list_mutex_); std::lock_guard<std::mutex> lck(thread_list_mutex_);
auto cf_pair = cf_info_map_.find(cf_key); auto cf_pair = cf_info_map_.find(cf_key);
assert(cf_pair != cf_info_map_.end()); assert(cf_pair != cf_info_map_.end());
@ -132,7 +121,7 @@ void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) {
assert(result); assert(result);
} }
void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
std::lock_guard<std::mutex> lck(thread_list_mutex_); std::lock_guard<std::mutex> lck(thread_list_mutex_);
auto db_pair = db_key_map_.find(db_key); auto db_pair = db_key_map_.find(db_key);
if (UNLIKELY(db_pair == db_key_map_.end())) { if (UNLIKELY(db_pair == db_key_map_.end())) {
@ -154,41 +143,37 @@ void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) {
#else #else
ThreadStatusImpl::~ThreadStatusImpl() { void ThreadStatusUpdater::UnregisterThread() {
} }
void ThreadStatusImpl::UnregisterThread() { void ThreadStatusUpdater::SetThreadType(
}
void ThreadStatusImpl::SetThreadType(
ThreadStatus::ThreadType ttype) { ThreadStatus::ThreadType ttype) {
} }
void ThreadStatusImpl::SetColumnFamilyInfoKey( void ThreadStatusUpdater::SetColumnFamilyInfoKey(
const void* cf_key) { const void* cf_key) {
} }
void ThreadStatusImpl::SetEventInfoPtr( void ThreadStatusUpdater::SetEventInfoPtr(
const ThreadEventInfo* event_info) { const ThreadEventInfo* event_info) {
} }
Status ThreadStatusImpl::GetThreadList( Status ThreadStatusUpdater::GetThreadList(
std::vector<ThreadStatus>* thread_list) const { std::vector<ThreadStatus>* thread_list) {
return Status::NotSupported( return Status::NotSupported(
"GetThreadList is not supported in the current running environment."); "GetThreadList is not supported in the current running environment.");
} }
void ThreadStatusImpl::NewColumnFamilyInfo( void ThreadStatusUpdater::NewColumnFamilyInfo(
const void* db_key, const std::string& db_name, const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name) { const void* cf_key, const std::string& cf_name) {
} }
void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
} }
void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
} }
ThreadStatusImpl thread_local_status;
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
} // namespace rocksdb } // namespace rocksdb

View File

@ -3,8 +3,7 @@
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
// //
// The implementation of ThreadStatus. It is implemented via combination // The implementation of ThreadStatus.
// of macros and thread-local variables.
// //
// Note that we make get and set access to ThreadStatusData lockless. // Note that we make get and set access to ThreadStatusData lockless.
// As a result, ThreadStatusData as a whole is not atomic. However, // As a result, ThreadStatusData as a whole is not atomic. However,
@ -43,10 +42,7 @@ namespace rocksdb {
class ColumnFamilyHandle; class ColumnFamilyHandle;
// The mutable version of ThreadStatus. It has a static set maintaining // The structure that keeps constant information about a column family.
// the set of current registered threades.
//
// Note that it is suggested to call the above macros.
struct ConstantColumnFamilyInfo { struct ConstantColumnFamilyInfo {
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
public: public:
@ -61,6 +57,7 @@ struct ConstantColumnFamilyInfo {
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
}; };
// The structure that describes an event.
struct ThreadEventInfo { struct ThreadEventInfo {
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
public: public:
@ -84,13 +81,22 @@ struct ThreadStatusData {
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
}; };
class ThreadStatusImpl { // The class that stores and updates the status of the current thread
// using a thread-local ThreadStatusData.
//
// In most of the case, you should use ThreadStatusUtil to update
// the status of the current thread instead of using ThreadSatusUpdater
// directly.
//
// @see ThreadStatusUtil
class ThreadStatusUpdater {
public: public:
ThreadStatusImpl() {} ThreadStatusUpdater() {}
// Releases all ThreadStatusData of all active threads. // Releases all ThreadStatusData of all active threads.
~ThreadStatusImpl(); virtual ~ThreadStatusUpdater() {}
// Unregister the current thread.
void UnregisterThread(); void UnregisterThread();
// Set the thread type of the current thread. // Set the thread type of the current thread.
@ -104,29 +110,30 @@ class ThreadStatusImpl {
// its thread-local pointer of ThreadEventInfo to the correct entry. // its thread-local pointer of ThreadEventInfo to the correct entry.
void SetEventInfoPtr(const ThreadEventInfo* event_info); void SetEventInfoPtr(const ThreadEventInfo* event_info);
// Obtain the status of all active registered threads.
Status GetThreadList( Status GetThreadList(
std::vector<ThreadStatus>* thread_list) const; std::vector<ThreadStatus>* thread_list);
// Create an entry in the global ColumnFamilyInfo table for the // Create an entry in the global ColumnFamilyInfo table for the
// specified column family. This function should be called only // specified column family. This function should be called only
// when the current thread does not hold db_mutex. // when the current thread does not hold db_mutex.
static void NewColumnFamilyInfo( void NewColumnFamilyInfo(
const void* db_key, const std::string& db_name, const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name); const void* cf_key, const std::string& cf_name);
// Erase all ConstantColumnFamilyInfo that is associated with the // Erase all ConstantColumnFamilyInfo that is associated with the
// specified db instance. This function should be called only when // specified db instance. This function should be called only when
// the current thread does not hold db_mutex. // the current thread does not hold db_mutex.
static void EraseDatabaseInfo(const void* db_key); void EraseDatabaseInfo(const void* db_key);
// Erase the ConstantColumnFamilyInfo that is associated with the // Erase the ConstantColumnFamilyInfo that is associated with the
// specified ColumnFamilyData. This function should be called only // specified ColumnFamilyData. This function should be called only
// when the current thread does not hold db_mutex. // when the current thread does not hold db_mutex.
static void EraseColumnFamilyInfo(const void* cf_key); void EraseColumnFamilyInfo(const void* cf_key);
// Verifies whether the input ColumnFamilyHandles matches // Verifies whether the input ColumnFamilyHandles matches
// the information stored in the current cf_info_map. // the information stored in the current cf_info_map.
static void TEST_VerifyColumnFamilyInfoMap( void TEST_VerifyColumnFamilyInfoMap(
const std::vector<ColumnFamilyHandle*>& handles, const std::vector<ColumnFamilyHandle*>& handles,
bool check_exist); bool check_exist);
@ -141,27 +148,25 @@ class ThreadStatusImpl {
ThreadStatusData* InitAndGet(); ThreadStatusData* InitAndGet();
// The mutex that protects cf_info_map and db_key_map. // The mutex that protects cf_info_map and db_key_map.
static std::mutex thread_list_mutex_; std::mutex thread_list_mutex_;
// The current status data of all active threads. // The current status data of all active threads.
static std::unordered_set<ThreadStatusData*> thread_data_set_; std::unordered_set<ThreadStatusData*> thread_data_set_;
// A global map that keeps the column family information. It is stored // A global map that keeps the column family information. It is stored
// globally instead of inside DB is to avoid the situation where DB is // globally instead of inside DB is to avoid the situation where DB is
// closing while GetThreadList function already get the pointer to its // closing while GetThreadList function already get the pointer to its
// CopnstantColumnFamilyInfo. // CopnstantColumnFamilyInfo.
static std::unordered_map< std::unordered_map<
const void*, std::unique_ptr<ConstantColumnFamilyInfo>> cf_info_map_; const void*, std::unique_ptr<ConstantColumnFamilyInfo>> cf_info_map_;
// A db_key to cf_key map that allows erasing elements in cf_info_map // A db_key to cf_key map that allows erasing elements in cf_info_map
// associated to the same db_key faster. // associated to the same db_key faster.
static std::unordered_map< std::unordered_map<
const void*, std::unordered_set<const void*>> db_key_map_; const void*, std::unordered_set<const void*>> db_key_map_;
#else #else
static ThreadStatusData* thread_status_data_; static ThreadStatusData* thread_status_data_;
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
}; };
extern ThreadStatusImpl thread_local_status;
} // namespace rocksdb } // namespace rocksdb

View File

@ -5,12 +5,12 @@
#include <mutex> #include <mutex>
#include "util/thread_status_impl.h" #include "util/thread_status_updater.h"
#include "db/column_family.h" #include "db/column_family.h"
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
namespace rocksdb { namespace rocksdb {
void ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap( void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap(
const std::vector<ColumnFamilyHandle*>& handles, const std::vector<ColumnFamilyHandle*>& handles,
bool check_exist) { bool check_exist) {
std::unique_lock<std::mutex> lock(thread_list_mutex_); std::unique_lock<std::mutex> lock(thread_list_mutex_);

102
util/thread_status_util.cc Normal file
View File

@ -0,0 +1,102 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/env.h"
#include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
namespace rocksdb {
#if ROCKSDB_USING_THREAD_STATUS
__thread ThreadStatusUpdater*
ThreadStatusUtil::thread_updater_local_cache_ = nullptr;
__thread bool ThreadStatusUtil::thread_updater_initialized_ = false;
void ThreadStatusUtil::SetThreadType(
const Env* env, ThreadStatus::ThreadType thread_type) {
if (!MaybeInitThreadLocalUpdater(env)) {
return;
}
assert(thread_updater_local_cache_);
thread_updater_local_cache_->SetThreadType(thread_type);
}
void ThreadStatusUtil::UnregisterThread() {
thread_updater_initialized_ = false;
if (thread_updater_local_cache_ != nullptr) {
thread_updater_local_cache_->UnregisterThread();
thread_updater_local_cache_ = nullptr;
}
}
void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) {
return;
}
assert(thread_updater_local_cache_);
thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd);
}
void ThreadStatusUtil::NewColumnFamilyInfo(
const DB* db, const ColumnFamilyData* cfd) {
if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) {
return;
}
assert(thread_updater_local_cache_);
if (thread_updater_local_cache_) {
thread_updater_local_cache_->NewColumnFamilyInfo(
db, db->GetName(), cfd, cfd->GetName());
}
}
void ThreadStatusUtil::EraseColumnFamilyInfo(
const ColumnFamilyData* cfd) {
if (thread_updater_local_cache_ == nullptr) {
return;
}
thread_updater_local_cache_->EraseColumnFamilyInfo(cfd);
}
void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) {
if (thread_updater_local_cache_ == nullptr) {
return;
}
thread_updater_local_cache_->EraseDatabaseInfo(db);
}
bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) {
if (!thread_updater_initialized_ && env != nullptr) {
thread_updater_initialized_ = true;
thread_updater_local_cache_ = env->GetThreadStatusUpdater();
}
return (thread_updater_local_cache_ != nullptr);
}
#else
ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr;
bool ThreadStatusUtil::thread_updater_initialized_ = false;
bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) {
return false;
}
void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
}
void ThreadStatusUtil::NewColumnFamilyInfo(
const DB* db, const ColumnFamilyData* cfd) {
}
void ThreadStatusUtil::EraseColumnFamilyInfo(
const ColumnFamilyData* cfd) {
}
void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) {
}
#endif // ROCKSDB_USING_THREAD_STATUS
} // namespace rocksdb

93
util/thread_status_util.h Normal file
View File

@ -0,0 +1,93 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include "db/column_family.h"
#include "rocksdb/env.h"
#include "rocksdb/thread_status.h"
#include "util/thread_status_updater.h"
namespace rocksdb {
// The static utility class for updating thread-local status.
//
// The thread-local status is updated via the thread-local cached
// pointer thread_updater_local_cache_. During each function call,
// when ThreadStatusUtil finds thread_updater_local_cache_ is
// left uninitialized (determined by thread_updater_initialized_),
// it will tries to initialize it using the return value of
// Env::GetThreadStatusUpdater(). When thread_updater_local_cache_
// is initialized by a non-null pointer, each function call will
// then update the status of the current thread. Otherwise,
// all function calls to ThreadStatusUtil will be no-op.
class ThreadStatusUtil {
public:
// Set the thread type of the current thread.
static void SetThreadType(
const Env* env, ThreadStatus::ThreadType thread_type);
// Unregister the current thread.
static void UnregisterThread();
// Create an entry in the global ColumnFamilyInfo table for the
// specified column family. This function should be called only
// when the current thread does not hold db_mutex.
static void NewColumnFamilyInfo(
const DB* db, const ColumnFamilyData* cfd);
// Erase the ConstantColumnFamilyInfo that is associated with the
// specified ColumnFamilyData. This function should be called only
// when the current thread does not hold db_mutex.
static void EraseColumnFamilyInfo(const ColumnFamilyData* cfd);
// Erase all ConstantColumnFamilyInfo that is associated with the
// specified db instance. This function should be called only when
// the current thread does not hold db_mutex.
static void EraseDatabaseInfo(const DB* db);
// Update the thread status to indicate the current thread is doing
// something related to the specified column family.
static void SetColumnFamily(const ColumnFamilyData* cfd);
protected:
// Initialize the thread-local ThreadStatusUpdater when it finds
// the cached value is nullptr. Returns true if it has cached
// a non-null pointer.
static bool MaybeInitThreadLocalUpdater(const Env* env);
#if ROCKSDB_USING_THREAD_STATUS
// A boolean flag indicating whether thread_updater_local_cache_
// is initialized. It is set to true when an Env uses any
// ThreadStatusUtil functions using the current thread other
// than UnregisterThread(). It will be set to false when
// UnregisterThread() is called.
//
// When this variable is set to true, thread_updater_local_cache_
// will not be updated until this variable is again set to false
// in UnregisterThread().
static __thread bool thread_updater_initialized_;
// The thread-local cached ThreadStatusUpdater that caches the
// thread_status_updater_ of the first Env that uses any ThreadStatusUtil
// function other than UnregisterThread(). This variable will
// be cleared when UnregisterThread() is called.
//
// When this variable is set to a non-null pointer, then the status
// of the current thread will be updated when a function of
// ThreadStatusUtil is called. Otherwise, all functions of
// ThreadStatusUtil will be no-op.
//
// When thread_updater_initialized_ is set to true, this variable
// will not be updated until this thread_updater_initialized_ is
// again set to false in UnregisterThread().
static __thread ThreadStatusUpdater* thread_updater_local_cache_;
#else
static bool thread_updater_initialized_;
static ThreadStatusUpdater* thread_updater_local_cache_;
#endif
};
} // namespace rocksdb

View File

@ -8,7 +8,6 @@
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "table/get_context.h" #include "table/get_context.h"
#include "util/thread_status_impl.h"
namespace rocksdb { namespace rocksdb {