Remove ThreadStatus API from 3.9

Summary: Remove ThreadStatus API from 3.9

Test Plan:
make dbg -j32
make OPT=-DROCKSDB_LITE shared_lib -j32

Reviewers: sdong, igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D30087
This commit is contained in:
Yueh-Hsuan Chiang 2014-12-10 15:50:13 -08:00
parent 046ba7d47c
commit 0d0d9bd3dc
14 changed files with 2 additions and 777 deletions

View File

@ -2,11 +2,6 @@
### 3.9.0 (12/8/2014)
### New Features
* Add rocksdb::GetThreadList(), which in the future will return the current status of all
rocksdb-related threads. We will have more code instruments in the following RocksDB
releases.
### Public API changes
* New API to create a checkpoint added. Given a directory name, creates a new
database which is an image of the existing database.

View File

@ -164,8 +164,7 @@ TESTS = \
flush_job_test \
wal_manager_test \
listener_test \
compaction_job_test \
thread_list_test
compaction_job_test
SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/)

View File

@ -77,7 +77,6 @@
#include "util/stop_watch.h"
#include "util/sync_point.h"
#include "util/string_util.h"
#include "util/thread_status_impl.h"
namespace rocksdb {
@ -246,7 +245,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
}
DBImpl::~DBImpl() {
EraseThreadStatusDbInfo();
mutex_.Lock();
if (flush_on_destroy_) {
@ -2505,11 +2503,6 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
}
} // MutexLock l(&mutex_)
// this is outside the mutex
if (s.ok()) {
NewThreadStatusCfInfo(
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
}
return s;
}
@ -2545,7 +2538,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
// Note that here we erase the associated cf_info of the to-be-dropped
// cfd before its ref-count goes to zero to avoid having to erase cf_info
// later inside db_mutex.
EraseThreadStatusCfInfo(cfd);
assert(cfd->IsDropped());
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
@ -3574,7 +3566,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (cfd != nullptr) {
handles->push_back(
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
impl->NewThreadStatusCfInfo(cfd);
} else {
if (db_options.create_missing_column_families) {
// missing column family, create it
@ -3739,44 +3730,6 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
return result;
}
#if ROCKSDB_USING_THREAD_STATUS
void DBImpl::NewThreadStatusCfInfo(
ColumnFamilyData* cfd) const {
if (db_options_.enable_thread_tracking) {
ThreadStatusImpl::NewColumnFamilyInfo(
this, GetName(), cfd, cfd->GetName());
}
}
void DBImpl::EraseThreadStatusCfInfo(
ColumnFamilyData* cfd) const {
if (db_options_.enable_thread_tracking) {
ThreadStatusImpl::EraseColumnFamilyInfo(cfd);
}
}
void DBImpl::EraseThreadStatusDbInfo() const {
if (db_options_.enable_thread_tracking) {
ThreadStatusImpl::EraseDatabaseInfo(this);
}
}
Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
return thread_local_status.GetThreadList(thread_list);
}
#else
void DBImpl::NewThreadStatusCfInfo(
ColumnFamilyData* cfd) const {
}
void DBImpl::EraseThreadStatusCfInfo(
ColumnFamilyData* cfd) const {
}
void DBImpl::EraseThreadStatusDbInfo() const {
}
#endif // ROCKSDB_USING_THREAD_STATUS
//
// A global method that can dump out the build version
void DumpRocksDBBuildVersion(Logger * log) {

View File

@ -267,12 +267,6 @@ class DBImpl : public DB {
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number,
const MutableCFOptions& mutable_cf_options);
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
void EraseThreadStatusDbInfo() const;
private:
friend class DB;
friend class InternalStats;

View File

@ -10,7 +10,6 @@
#include "db/merge_context.h"
#include "db/db_iter.h"
#include "util/perf_context_imp.h"
#include "util/thread_status_impl.h"
namespace rocksdb {
@ -152,13 +151,7 @@ Status DB::OpenForReadOnly(
}
}
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;
for (auto* h : *handles) {
impl->NewThreadStatusCfInfo(
reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
}
} else {
if (!s.ok()) {
for (auto h : *handles) {
delete h;
}

View File

@ -33,7 +33,6 @@
#include "rocksdb/table.h"
#include "rocksdb/options.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/thread_status.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/utilities/checkpoint.h"
#include "table/block_based_table_factory.h"
@ -51,7 +50,6 @@
#include "util/testutil.h"
#include "util/mock_env.h"
#include "util/string_util.h"
#include "util/thread_status_impl.h"
namespace rocksdb {
@ -9104,73 +9102,6 @@ TEST(DBTest, DynamicMemtableOptions) {
sleeping_task_low3.WaitUntilDone();
}
#if ROCKSDB_USING_THREAD_STATUS
TEST(DBTest, GetThreadList) {
Options options;
options.env = env_;
options.enable_thread_tracking = true;
TryReopen(options);
std::vector<ThreadStatus> thread_list;
Status s = GetThreadList(&thread_list);
for (int i = 0; i < 2; ++i) {
// repeat the test with differet number of high / low priority threads
const int kTestCount = 3;
const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5};
const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3};
for (int test = 0; test < kTestCount; ++test) {
// Change the number of threads in high / low priority pool.
env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH);
env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW);
// Wait to ensure the all threads has been registered
env_->SleepForMicroseconds(100000);
s = GetThreadList(&thread_list);
ASSERT_OK(s);
unsigned int thread_type_counts[ThreadStatus::ThreadType::TOTAL];
memset(thread_type_counts, 0, sizeof(thread_type_counts));
for (auto thread : thread_list) {
ASSERT_LT(thread.thread_type, ThreadStatus::ThreadType::TOTAL);
thread_type_counts[thread.thread_type]++;
}
// Verify the total number of threades
ASSERT_EQ(
thread_list.size(),
kHighPriCounts[test] + kLowPriCounts[test]);
// Verify the number of high-priority threads
ASSERT_EQ(
thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY],
kHighPriCounts[test]);
// Verify the number of low-priority threads
ASSERT_EQ(
thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY],
kLowPriCounts[test]);
}
if (i == 0) {
// repeat the test with multiple column families
CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true);
}
}
db_->DropColumnFamily(handles_[2]);
delete handles_[2];
handles_.erase(handles_.begin() + 2);
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true);
Close();
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true);
}
TEST(DBTest, DisableThreadList) {
Options options;
options.env = env_;
options.enable_thread_tracking = false;
TryReopen(options);
CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
// Verify non of the column family info exists
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, false);
}
#endif // ROCKSDB_USING_THREAD_STATUS
TEST(DBTest, DynamicCompactionOptions) {
// minimum write buffer size is enforced at 64KB
const uint64_t k32KB = 1 << 15;

View File

@ -22,7 +22,6 @@
#include "rocksdb/types.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/listener.h"
#include "rocksdb/thread_status.h"
namespace rocksdb {
@ -548,11 +547,6 @@ Status DestroyDB(const std::string& name, const Options& options);
Status RepairDB(const std::string& dbname, const Options& options);
#endif
#if ROCKSDB_USING_THREAD_STATUS
// Obtain the status of all rocksdb-related threads.
Status GetThreadList(std::vector<ThreadStatus>* thread_list);
#endif
} // namespace rocksdb

View File

@ -1,67 +0,0 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <cstddef>
#include <string>
#ifndef ROCKSDB_USING_THREAD_STATUS
#define ROCKSDB_USING_THREAD_STATUS \
!defined(ROCKSDB_LITE) && \
!defined(NROCKSDB_THREAD_STATUS) && \
!defined(OS_MACOSX) && \
!defined(IOS_CROSS_COMPILE)
#endif
namespace rocksdb {
// A structure that describes the current status of a thread.
// The status of active threads can be fetched using
// rocksdb::GetThreadList().
struct ThreadStatus {
enum ThreadType {
ROCKSDB_HIGH_PRIORITY = 0x0,
ROCKSDB_LOW_PRIORITY = 0x1,
USER_THREAD = 0x2,
TOTAL = 0x3
};
#if ROCKSDB_USING_THREAD_STATUS
ThreadStatus(const uint64_t _id,
const ThreadType _thread_type,
const std::string& _db_name,
const std::string& _cf_name,
const std::string& _event) :
thread_id(_id), thread_type(_thread_type),
db_name(_db_name),
cf_name(_cf_name),
event(_event) {}
// An unique ID for the thread.
const uint64_t thread_id;
// The type of the thread, it could be ROCKSDB_HIGH_PRIORITY,
// ROCKSDB_LOW_PRIORITY, and USER_THREAD
const ThreadType thread_type;
// The name of the DB instance where the thread is currently
// involved with. It would be set to empty string if the thread
// does not involve in any DB operation.
const std::string db_name;
// The name of the column family where the thread is currently
// It would be set to empty string if the thread does not involve
// in any column family.
const std::string cf_name;
// The event that the current thread is involved.
// It would be set to empty string if the information about event
// is not currently available.
const std::string event;
#endif // ROCKSDB_USING_THREAD_STATUS
};
} // namespace rocksdb

View File

@ -42,7 +42,6 @@
#include "util/random.h"
#include "util/iostats_context_imp.h"
#include "util/rate_limiter.h"
#include "util/thread_status_impl.h"
// Get nano time for mach systems
#ifdef __MACH__
@ -76,10 +75,6 @@ int rocksdb_kill_odds = 0;
namespace rocksdb {
#if ROCKSDB_USING_THREAD_STATUS
extern ThreadStatusImpl thread_local_status;
#endif
namespace {
// A wrapper for fadvise, if the platform doesn't support fadvise,
@ -1667,18 +1662,8 @@ class PosixEnv : public Env {
BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
size_t thread_id = meta->thread_id_;
ThreadPool* tp = meta->thread_pool_;
#if ROCKSDB_USING_THREAD_STATUS
// for thread-status
thread_local_status.SetThreadType(
(tp->GetThreadPriority() == Env::Priority::HIGH ?
ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY :
ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY));
#endif
delete meta;
tp->BGThread(thread_id);
#if ROCKSDB_USING_THREAD_STATUS
thread_local_status.UnregisterThread();
#endif
return nullptr;
}

View File

@ -1,156 +0,0 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <mutex>
#include <condition_variable>
#include "util/thread_status_impl.h"
#include "util/testharness.h"
#include "rocksdb/db.h"
#if ROCKSDB_USING_THREAD_STATUS
namespace rocksdb {
class SleepingBackgroundTask {
public:
SleepingBackgroundTask(const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name)
: db_key_(db_key), db_name_(db_name),
cf_key_(cf_key), cf_name_(cf_name),
should_sleep_(true), sleeping_count_(0) {
ThreadStatusImpl::NewColumnFamilyInfo(
db_key_, db_name_, cf_key_, cf_name_);
}
~SleepingBackgroundTask() {
ThreadStatusImpl::EraseDatabaseInfo(db_key_);
}
void DoSleep() {
thread_local_status.SetColumnFamilyInfoKey(cf_key_);
std::unique_lock<std::mutex> l(mutex_);
sleeping_count_++;
while (should_sleep_) {
bg_cv_.wait(l);
}
sleeping_count_--;
bg_cv_.notify_all();
thread_local_status.SetColumnFamilyInfoKey(0);
}
void WakeUp() {
std::unique_lock<std::mutex> l(mutex_);
should_sleep_ = false;
bg_cv_.notify_all();
}
void WaitUntilDone() {
std::unique_lock<std::mutex> l(mutex_);
while (sleeping_count_ > 0) {
bg_cv_.wait(l);
}
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
const void* db_key_;
const std::string db_name_;
const void* cf_key_;
const std::string cf_name_;
std::mutex mutex_;
std::condition_variable bg_cv_;
bool should_sleep_;
std::atomic<int> sleeping_count_;
};
class ThreadListTest {
public:
ThreadListTest() {
}
};
TEST(ThreadListTest, SimpleColumnFamilyInfoTest) {
Env* env = Env::Default();
const int kHighPriorityThreads = 3;
const int kLowPriorityThreads = 5;
const int kSleepingHighPriThreads = kHighPriorityThreads - 1;
const int kSleepingLowPriThreads = kLowPriorityThreads / 3;
env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
SleepingBackgroundTask sleeping_task(
reinterpret_cast<void*>(1234), "sleeping",
reinterpret_cast<void*>(5678), "pikachu");
for (int test = 0; test < kSleepingHighPriThreads; ++test) {
env->Schedule(&SleepingBackgroundTask::DoSleepTask,
&sleeping_task, Env::Priority::HIGH);
}
for (int test = 0; test < kSleepingLowPriThreads; ++test) {
env->Schedule(&SleepingBackgroundTask::DoSleepTask,
&sleeping_task, Env::Priority::LOW);
}
// make sure everything is scheduled.
env->SleepForMicroseconds(10000);
std::vector<ThreadStatus> thread_list;
// Verify the number of sleeping threads in each pool.
GetThreadList(&thread_list);
int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0};
for (auto thread_status : thread_list) {
if (thread_status.cf_name == "pikachu" &&
thread_status.db_name == "sleeping") {
sleeping_count[thread_status.thread_type]++;
}
}
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY],
kSleepingHighPriThreads);
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY],
kSleepingLowPriThreads);
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0);
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
// Verify none of the threads are sleeping
GetThreadList(&thread_list);
for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) {
sleeping_count[i] = 0;
}
for (auto thread_status : thread_list) {
if (thread_status.cf_name == "pikachu" &&
thread_status.db_name == "sleeping") {
sleeping_count[thread_status.thread_type]++;
}
}
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], 0);
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], 0);
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0);
}
} // namespace rocksdb
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}
#else
int main(int argc, char** argv) {
return 0;
}
#endif // ROCKSDB_USING_THREAD_STATUS

View File

@ -1,194 +0,0 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "port/likely.h"
#include "util/mutexlock.h"
#include "util/thread_status_impl.h"
namespace rocksdb {
#if ROCKSDB_USING_THREAD_STATUS
__thread ThreadStatusData* ThreadStatusImpl::thread_status_data_ = nullptr;
std::mutex ThreadStatusImpl::thread_list_mutex_;
std::unordered_set<ThreadStatusData*> ThreadStatusImpl::thread_data_set_;
std::unordered_map<const void*, std::unique_ptr<ConstantColumnFamilyInfo>>
ThreadStatusImpl::cf_info_map_;
std::unordered_map<const void*, std::unordered_set<const void*>>
ThreadStatusImpl::db_key_map_;
ThreadStatusImpl thread_local_status;
ThreadStatusImpl::~ThreadStatusImpl() {
assert(thread_data_set_.size() == 0);
}
void ThreadStatusImpl::UnregisterThread() {
if (thread_status_data_ != nullptr) {
std::lock_guard<std::mutex> lck(thread_list_mutex_);
thread_data_set_.erase(thread_status_data_);
delete thread_status_data_;
thread_status_data_ = nullptr;
}
}
void ThreadStatusImpl::SetThreadType(
ThreadStatus::ThreadType ttype) {
auto* data = InitAndGet();
data->thread_type.store(ttype, std::memory_order_relaxed);
}
void ThreadStatusImpl::SetColumnFamilyInfoKey(
const void* cf_key) {
auto* data = InitAndGet();
data->cf_key.store(cf_key, std::memory_order_relaxed);
}
void ThreadStatusImpl::SetEventInfoPtr(
const ThreadEventInfo* event_info) {
auto* data = InitAndGet();
data->event_info.store(event_info, std::memory_order_relaxed);
}
Status ThreadStatusImpl::GetThreadList(
std::vector<ThreadStatus>* thread_list) const {
thread_list->clear();
std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
std::lock_guard<std::mutex> lck(thread_list_mutex_);
for (auto* thread_data : thread_data_set_) {
assert(thread_data);
auto thread_type = thread_data->thread_type.load(
std::memory_order_relaxed);
auto cf_key = thread_data->cf_key.load(
std::memory_order_relaxed);
auto iter = cf_info_map_.find(cf_key);
assert(cf_key == 0 || iter != cf_info_map_.end());
auto* cf_info = iter != cf_info_map_.end() ?
iter->second.get() : nullptr;
auto* event_info = thread_data->event_info.load(
std::memory_order_relaxed);
const std::string* db_name = nullptr;
const std::string* cf_name = nullptr;
const std::string* event_name = nullptr;
if (cf_info != nullptr) {
db_name = &cf_info->db_name;
cf_name = &cf_info->cf_name;
// display lower-level info only when higher-level info is available.
if (event_info != nullptr) {
event_name = &event_info->event_name;
}
}
thread_list->emplace_back(
thread_data->thread_id, thread_type,
db_name ? *db_name : "",
cf_name ? *cf_name : "",
event_name ? *event_name : "");
}
return Status::OK();
}
ThreadStatusData* ThreadStatusImpl::InitAndGet() {
if (UNLIKELY(thread_status_data_ == nullptr)) {
thread_status_data_ = new ThreadStatusData();
thread_status_data_->thread_id = reinterpret_cast<uint64_t>(
thread_status_data_);
std::lock_guard<std::mutex> lck(thread_list_mutex_);
thread_data_set_.insert(thread_status_data_);
}
return thread_status_data_;
}
void ThreadStatusImpl::NewColumnFamilyInfo(
const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name) {
std::lock_guard<std::mutex> lck(thread_list_mutex_);
cf_info_map_[cf_key].reset(
new ConstantColumnFamilyInfo(db_key, db_name, cf_name));
db_key_map_[db_key].insert(cf_key);
}
void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) {
std::lock_guard<std::mutex> lck(thread_list_mutex_);
auto cf_pair = cf_info_map_.find(cf_key);
assert(cf_pair != cf_info_map_.end());
auto* cf_info = cf_pair->second.get();
assert(cf_info);
// Remove its entry from db_key_map_ by the following steps:
// 1. Obtain the entry in db_key_map_ whose set contains cf_key
// 2. Remove it from the set.
auto db_pair = db_key_map_.find(cf_info->db_key);
assert(db_pair != db_key_map_.end());
size_t result __attribute__((unused)) = db_pair->second.erase(cf_key);
assert(result);
cf_pair->second.reset();
result = cf_info_map_.erase(cf_key);
assert(result);
}
void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) {
std::lock_guard<std::mutex> lck(thread_list_mutex_);
auto db_pair = db_key_map_.find(db_key);
if (UNLIKELY(db_pair == db_key_map_.end())) {
// In some occasional cases such as DB::Open fails, we won't
// register ColumnFamilyInfo for a db.
return;
}
size_t result __attribute__((unused)) = 0;
for (auto cf_key : db_pair->second) {
auto cf_pair = cf_info_map_.find(cf_key);
assert(cf_pair != cf_info_map_.end());
cf_pair->second.reset();
result = cf_info_map_.erase(cf_key);
assert(result);
}
db_key_map_.erase(db_key);
}
#else
ThreadStatusImpl::~ThreadStatusImpl() {
}
void ThreadStatusImpl::UnregisterThread() {
}
void ThreadStatusImpl::SetThreadType(
ThreadStatus::ThreadType ttype) {
}
void ThreadStatusImpl::SetColumnFamilyInfoKey(
const void* cf_key) {
}
void ThreadStatusImpl::SetEventInfoPtr(
const ThreadEventInfo* event_info) {
}
Status ThreadStatusImpl::GetThreadList(
std::vector<ThreadStatus>* thread_list) const {
return Status::NotSupported(
"GetThreadList is not supported in the current running environment.");
}
void ThreadStatusImpl::NewColumnFamilyInfo(
const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name) {
}
void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) {
}
void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) {
}
ThreadStatusImpl thread_local_status;
#endif // ROCKSDB_USING_THREAD_STATUS
} // namespace rocksdb

View File

@ -1,167 +0,0 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// The implementation of ThreadStatus. It is implemented via combination
// of macros and thread-local variables.
//
// Note that we make get and set access to ThreadStatusData lockless.
// As a result, ThreadStatusData as a whole is not atomic. However,
// we guarantee consistent ThreadStatusData all the time whenever
// user call GetThreadList(). This consistency guarantee is done
// by having the following constraint in the internal implementation
// of set and get order:
//
// 1. When reset any information in ThreadStatusData, always start from
// clearing up the lower-level information first.
// 2. When setting any information in ThreadStatusData, always start from
// setting the higher-level information.
// 3. When returning ThreadStatusData to the user, fields are fetched from
// higher-level to lower-level. In addition, where there's a nullptr
// in one field, then all fields that has lower-level than that field
// should be ignored.
//
// The high to low level information would be:
// thread_id > thread_type > db > cf > event > event_count > event_details
//
// This means user might not always get full information, but whenever
// returned by the GetThreadList() is guaranteed to be consistent.
#pragma once
#include <unordered_set>
#include <atomic>
#include <string>
#include <unordered_map>
#include <mutex>
#include <list>
#include <vector>
#include "rocksdb/status.h"
#include "rocksdb/thread_status.h"
#include "port/port_posix.h"
namespace rocksdb {
class ColumnFamilyHandle;
// The mutable version of ThreadStatus. It has a static set maintaining
// the set of current registered threades.
//
// Note that it is suggested to call the above macros.
struct ConstantColumnFamilyInfo {
#if ROCKSDB_USING_THREAD_STATUS
public:
ConstantColumnFamilyInfo(
const void* _db_key,
const std::string& _db_name,
const std::string& _cf_name) :
db_key(_db_key), db_name(_db_name), cf_name(_cf_name) {}
const void* db_key;
const std::string db_name;
const std::string cf_name;
#endif // ROCKSDB_USING_THREAD_STATUS
};
struct ThreadEventInfo {
#if ROCKSDB_USING_THREAD_STATUS
public:
const std::string event_name;
#endif // ROCKSDB_USING_THREAD_STATUS
};
// the internal data-structure that is used to reflect the current
// status of a thread using a set of atomic pointers.
struct ThreadStatusData {
#if ROCKSDB_USING_THREAD_STATUS
explicit ThreadStatusData() : thread_id(0) {
thread_type.store(ThreadStatus::ThreadType::USER_THREAD);
cf_key.store(0);
event_info.store(nullptr);
}
uint64_t thread_id;
std::atomic<ThreadStatus::ThreadType> thread_type;
std::atomic<const void*> cf_key;
std::atomic<const ThreadEventInfo*> event_info;
#endif // ROCKSDB_USING_THREAD_STATUS
};
class ThreadStatusImpl {
public:
ThreadStatusImpl() {}
// Releases all ThreadStatusData of all active threads.
~ThreadStatusImpl();
void UnregisterThread();
// Set the thread type of the current thread.
void SetThreadType(ThreadStatus::ThreadType ttype);
// Update the column-family info of the current thread by setting
// its thread-local pointer of ThreadEventInfo to the correct entry.
void SetColumnFamilyInfoKey(const void* cf_key);
// Update the event info of the current thread by setting
// its thread-local pointer of ThreadEventInfo to the correct entry.
void SetEventInfoPtr(const ThreadEventInfo* event_info);
Status GetThreadList(
std::vector<ThreadStatus>* thread_list) const;
// Create an entry in the global ColumnFamilyInfo table for the
// specified column family. This function should be called only
// when the current thread does not hold db_mutex.
static void NewColumnFamilyInfo(
const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name);
// Erase all ConstantColumnFamilyInfo that is associated with the
// specified db instance. This function should be called only when
// the current thread does not hold db_mutex.
static void EraseDatabaseInfo(const void* db_key);
// Erase the ConstantColumnFamilyInfo that is associated with the
// specified ColumnFamilyData. This function should be called only
// when the current thread does not hold db_mutex.
static void EraseColumnFamilyInfo(const void* cf_key);
// Verifies whether the input ColumnFamilyHandles matches
// the information stored in the current cf_info_map.
static void TEST_VerifyColumnFamilyInfoMap(
const std::vector<ColumnFamilyHandle*>& handles,
bool check_exist);
protected:
#if ROCKSDB_USING_THREAD_STATUS
// The thread-local variable for storing thread status.
static __thread ThreadStatusData* thread_status_data_;
// Obtain the pointer to the thread status data. It also performs
// initialization when necessary.
ThreadStatusData* InitAndGet();
// The mutex that protects cf_info_map and db_key_map.
static std::mutex thread_list_mutex_;
// The current status data of all active threads.
static std::unordered_set<ThreadStatusData*> thread_data_set_;
// A global map that keeps the column family information. It is stored
// globally instead of inside DB is to avoid the situation where DB is
// closing while GetThreadList function already get the pointer to its
// CopnstantColumnFamilyInfo.
static std::unordered_map<
const void*, std::unique_ptr<ConstantColumnFamilyInfo>> cf_info_map_;
// A db_key to cf_key map that allows erasing elements in cf_info_map
// associated to the same db_key faster.
static std::unordered_map<
const void*, std::unordered_set<const void*>> db_key_map_;
#else
static ThreadStatusData* thread_status_data_;
#endif // ROCKSDB_USING_THREAD_STATUS
};
extern ThreadStatusImpl thread_local_status;
} // namespace rocksdb

View File

@ -1,33 +0,0 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <mutex>
#include "util/thread_status_impl.h"
#include "db/column_family.h"
#if ROCKSDB_USING_THREAD_STATUS
namespace rocksdb {
void ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(
const std::vector<ColumnFamilyHandle*>& handles,
bool check_exist) {
std::unique_lock<std::mutex> lock(thread_list_mutex_);
if (check_exist) {
assert(cf_info_map_.size() == handles.size());
}
for (auto* handle : handles) {
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
auto iter __attribute__((unused)) = cf_info_map_.find(cfd);
if (check_exist) {
assert(iter != cf_info_map_.end());
assert(iter->second);
assert(iter->second->cf_name == cfd->GetName());
} else {
assert(iter == cf_info_map_.end());
}
}
}
} // namespace rocksdb
#endif // ROCKSDB_USING_THREAD_STATUS

View File

@ -8,7 +8,6 @@
#include "db/db_impl.h"
#include "db/version_set.h"
#include "table/get_context.h"
#include "util/thread_status_impl.h"
namespace rocksdb {
@ -103,7 +102,6 @@ Status CompactedDBImpl::Init(const Options& options) {
if (!s.ok()) {
return s;
}
NewThreadStatusCfInfo(cfd_);
version_ = cfd_->GetSuperVersion()->current;
user_comparator_ = cfd_->user_comparator();
auto* vstorage = version_->storage_info();