Introduce GetThreadList API
Summary: Add GetThreadList API, which allows developer to track the status of each process. Currently, calling GetThreadList will only get the list of background threads in RocksDB with their thread-id and thread-type (priority) set. Will add more support on this in the later diffs. ThreadStatus currently has the following properties: // An unique ID for the thread. const uint64_t thread_id; // The type of the thread, it could be ROCKSDB_HIGH_PRIORITY, // ROCKSDB_LOW_PRIORITY, and USER_THREAD const ThreadType thread_type; // The name of the DB instance where the thread is currently // involved with. It would be set to empty string if the thread // does not involve in any DB operation. const std::string db_name; // The name of the column family where the thread is currently // It would be set to empty string if the thread does not involve // in any column family. const std::string cf_name; // The event that the current thread is involved. // It would be set to empty string if the information about event // is not currently available. Test Plan: ./thread_list_test export ROCKSDB_TESTS=GetThreadList ./db_test Reviewers: rven, igor, sdong, ljin Reviewed By: ljin Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D25047
This commit is contained in:
parent
1fd1aecb39
commit
d0c5f28a5c
@ -1,5 +1,8 @@
|
|||||||
# Rocksdb Change Log
|
# Rocksdb Change Log
|
||||||
|
|
||||||
|
### Unreleased Features
|
||||||
|
* Add rocksdb::GetThreadList(), which returns the current status of all rocksdb-related threads.
|
||||||
|
|
||||||
## 3.8.0 (11/14/2014)
|
## 3.8.0 (11/14/2014)
|
||||||
|
|
||||||
### Public API changes
|
### Public API changes
|
||||||
|
6
Makefile
6
Makefile
@ -150,7 +150,8 @@ TESTS = \
|
|||||||
flush_job_test \
|
flush_job_test \
|
||||||
wal_manager_test \
|
wal_manager_test \
|
||||||
listener_test \
|
listener_test \
|
||||||
compaction_job_test
|
compaction_job_test \
|
||||||
|
thread_list_test
|
||||||
|
|
||||||
TOOLS = \
|
TOOLS = \
|
||||||
sst_dump \
|
sst_dump \
|
||||||
@ -509,6 +510,9 @@ cuckoo_table_db_test: db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
|||||||
listener_test: db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
listener_test: db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
$(CXX) db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
|
$(CXX) db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
|
||||||
|
|
||||||
|
thread_list_test: util/thread_list_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
|
$(CXX) util/thread_list_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
|
||||||
|
|
||||||
compactor_test: utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
compactor_test: utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
$(CXX) utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
|
$(CXX) utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
|
||||||
|
|
||||||
|
112
db/db_impl.cc
112
db/db_impl.cc
@ -75,6 +75,7 @@
|
|||||||
#include "util/iostats_context_imp.h"
|
#include "util/iostats_context_imp.h"
|
||||||
#include "util/stop_watch.h"
|
#include "util/stop_watch.h"
|
||||||
#include "util/sync_point.h"
|
#include "util/sync_point.h"
|
||||||
|
#include "util/thread_status_impl.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
@ -241,6 +242,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
|||||||
}
|
}
|
||||||
|
|
||||||
DBImpl::~DBImpl() {
|
DBImpl::~DBImpl() {
|
||||||
|
EraseThreadStatusDbInfo();
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
|
|
||||||
if (flush_on_destroy_) {
|
if (flush_on_destroy_) {
|
||||||
@ -2453,40 +2455,50 @@ std::vector<Status> DBImpl::MultiGet(
|
|||||||
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
||||||
const std::string& column_family_name,
|
const std::string& column_family_name,
|
||||||
ColumnFamilyHandle** handle) {
|
ColumnFamilyHandle** handle) {
|
||||||
|
Status s;
|
||||||
*handle = nullptr;
|
*handle = nullptr;
|
||||||
MutexLock l(&mutex_);
|
{
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
|
||||||
if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
|
if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
|
||||||
nullptr) {
|
nullptr) {
|
||||||
return Status::InvalidArgument("Column family already exists");
|
return Status::InvalidArgument("Column family already exists");
|
||||||
|
}
|
||||||
|
VersionEdit edit;
|
||||||
|
edit.AddColumnFamily(column_family_name);
|
||||||
|
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
|
||||||
|
edit.SetColumnFamily(new_id);
|
||||||
|
edit.SetLogNumber(logfile_number_);
|
||||||
|
edit.SetComparatorName(cf_options.comparator->Name());
|
||||||
|
|
||||||
|
// LogAndApply will both write the creation in MANIFEST and create
|
||||||
|
// ColumnFamilyData object
|
||||||
|
Options opt(db_options_, cf_options);
|
||||||
|
s = versions_->LogAndApply(nullptr,
|
||||||
|
MutableCFOptions(opt, ImmutableCFOptions(opt)),
|
||||||
|
&edit, &mutex_, db_directory_.get(), false, &cf_options);
|
||||||
|
if (s.ok()) {
|
||||||
|
single_column_family_mode_ = false;
|
||||||
|
auto* cfd =
|
||||||
|
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
|
||||||
|
assert(cfd != nullptr);
|
||||||
|
delete InstallSuperVersion(
|
||||||
|
cfd, nullptr, *cfd->GetLatestMutableCFOptions());
|
||||||
|
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
|
||||||
|
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||||
|
"Created column family [%s] (ID %u)",
|
||||||
|
column_family_name.c_str(), (unsigned)cfd->GetID());
|
||||||
|
} else {
|
||||||
|
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
|
||||||
|
"Creating column family [%s] FAILED -- %s",
|
||||||
|
column_family_name.c_str(), s.ToString().c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
VersionEdit edit;
|
|
||||||
edit.AddColumnFamily(column_family_name);
|
|
||||||
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
|
|
||||||
edit.SetColumnFamily(new_id);
|
|
||||||
edit.SetLogNumber(logfile_number_);
|
|
||||||
edit.SetComparatorName(cf_options.comparator->Name());
|
|
||||||
|
|
||||||
// LogAndApply will both write the creation in MANIFEST and create
|
// this is outside the mutex
|
||||||
// ColumnFamilyData object
|
|
||||||
Options opt(db_options_, cf_options);
|
|
||||||
Status s = versions_->LogAndApply(nullptr,
|
|
||||||
MutableCFOptions(opt, ImmutableCFOptions(opt)),
|
|
||||||
&edit, &mutex_, db_directory_.get(), false, &cf_options);
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
single_column_family_mode_ = false;
|
NewThreadStatusCfInfo(
|
||||||
auto cfd =
|
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
|
||||||
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
|
|
||||||
assert(cfd != nullptr);
|
|
||||||
delete InstallSuperVersion(cfd, nullptr, *cfd->GetLatestMutableCFOptions());
|
|
||||||
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
|
|
||||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
|
||||||
"Created column family [%s] (ID %u)",
|
|
||||||
column_family_name.c_str(), (unsigned)cfd->GetID());
|
|
||||||
} else {
|
|
||||||
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
|
|
||||||
"Creating column family [%s] FAILED -- %s",
|
|
||||||
column_family_name.c_str(), s.ToString().c_str());
|
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
@ -2520,6 +2532,10 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
|
// Note that here we erase the associated cf_info of the to-be-dropped
|
||||||
|
// cfd before its ref-count goes to zero to avoid having to erase cf_info
|
||||||
|
// later inside db_mutex.
|
||||||
|
EraseThreadStatusCfInfo(cfd);
|
||||||
assert(cfd->IsDropped());
|
assert(cfd->IsDropped());
|
||||||
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
|
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
|
||||||
max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
|
max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
|
||||||
@ -3602,8 +3618,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
// their Listeners. To address this, we should have NotifyOnDatabaseOpen()
|
// their Listeners. To address this, we should have NotifyOnDatabaseOpen()
|
||||||
// here which passes the created ColumnFamilyHandle to the Listeners
|
// here which passes the created ColumnFamilyHandle to the Listeners
|
||||||
// as the first event after DB::Open().
|
// as the first event after DB::Open().
|
||||||
|
for (auto* h : *handles) {
|
||||||
|
impl->NewThreadStatusCfInfo(
|
||||||
|
reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
for (auto h : *handles) {
|
for (auto* h : *handles) {
|
||||||
delete h;
|
delete h;
|
||||||
}
|
}
|
||||||
handles->clear();
|
handles->clear();
|
||||||
@ -3702,6 +3722,38 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
void DBImpl::NewThreadStatusCfInfo(
|
||||||
|
ColumnFamilyData* cfd) const {
|
||||||
|
ThreadStatusImpl::NewColumnFamilyInfo(
|
||||||
|
this, GetName(), cfd, cfd->GetName());
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBImpl::EraseThreadStatusCfInfo(
|
||||||
|
ColumnFamilyData* cfd) const {
|
||||||
|
ThreadStatusImpl::EraseColumnFamilyInfo(cfd);
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBImpl::EraseThreadStatusDbInfo() const {
|
||||||
|
ThreadStatusImpl::EraseDatabaseInfo(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
|
||||||
|
return thread_local_status.GetThreadList(thread_list);
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
void DBImpl::NewThreadStatusCfInfo(
|
||||||
|
ColumnFamilyData* cfd) const {
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBImpl::EraseThreadStatusCfInfo(
|
||||||
|
ColumnFamilyData* cfd) const {
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBImpl::EraseThreadStatusDbInfo() const {
|
||||||
|
}
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
|
||||||
//
|
//
|
||||||
// A global method that can dump out the build version
|
// A global method that can dump out the build version
|
||||||
void DumpRocksDBBuildVersion(Logger * log) {
|
void DumpRocksDBBuildVersion(Logger * log) {
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
#include "util/stop_watch.h"
|
#include "util/stop_watch.h"
|
||||||
#include "util/thread_local.h"
|
#include "util/thread_local.h"
|
||||||
#include "util/scoped_arena_iterator.h"
|
#include "util/scoped_arena_iterator.h"
|
||||||
|
#include "util/hash.h"
|
||||||
#include "db/internal_stats.h"
|
#include "db/internal_stats.h"
|
||||||
#include "db/write_controller.h"
|
#include "db/write_controller.h"
|
||||||
#include "db/flush_scheduler.h"
|
#include "db/flush_scheduler.h"
|
||||||
@ -264,6 +265,12 @@ class DBImpl : public DB {
|
|||||||
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number,
|
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number,
|
||||||
const MutableCFOptions& mutable_cf_options);
|
const MutableCFOptions& mutable_cf_options);
|
||||||
|
|
||||||
|
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
|
||||||
|
|
||||||
|
void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
|
||||||
|
|
||||||
|
void EraseThreadStatusDbInfo() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class DB;
|
friend class DB;
|
||||||
friend class InternalStats;
|
friend class InternalStats;
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
#include "db/merge_context.h"
|
#include "db/merge_context.h"
|
||||||
#include "db/db_iter.h"
|
#include "db/db_iter.h"
|
||||||
#include "util/perf_context_imp.h"
|
#include "util/perf_context_imp.h"
|
||||||
|
#include "util/thread_status_impl.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
@ -152,6 +153,10 @@ Status DB::OpenForReadOnly(
|
|||||||
impl->mutex_.Unlock();
|
impl->mutex_.Unlock();
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
*dbptr = impl;
|
*dbptr = impl;
|
||||||
|
for (auto* h : *handles) {
|
||||||
|
impl->NewThreadStatusCfInfo(
|
||||||
|
reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
for (auto h : *handles) {
|
for (auto h : *handles) {
|
||||||
delete h;
|
delete h;
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
#include "rocksdb/table.h"
|
#include "rocksdb/table.h"
|
||||||
#include "rocksdb/options.h"
|
#include "rocksdb/options.h"
|
||||||
#include "rocksdb/table_properties.h"
|
#include "rocksdb/table_properties.h"
|
||||||
|
#include "rocksdb/thread_status.h"
|
||||||
#include "rocksdb/utilities/write_batch_with_index.h"
|
#include "rocksdb/utilities/write_batch_with_index.h"
|
||||||
#include "table/block_based_table_factory.h"
|
#include "table/block_based_table_factory.h"
|
||||||
#include "table/plain_table_factory.h"
|
#include "table/plain_table_factory.h"
|
||||||
@ -48,6 +49,7 @@
|
|||||||
#include "util/sync_point.h"
|
#include "util/sync_point.h"
|
||||||
#include "util/testutil.h"
|
#include "util/testutil.h"
|
||||||
#include "util/mock_env.h"
|
#include "util/mock_env.h"
|
||||||
|
#include "util/thread_status_impl.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
@ -8981,6 +8983,60 @@ TEST(DBTest, DynamicMemtableOptions) {
|
|||||||
sleeping_task_low3.WaitUntilDone();
|
sleeping_task_low3.WaitUntilDone();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
TEST(DBTest, GetThreadList) {
|
||||||
|
Options options;
|
||||||
|
options.env = env_;
|
||||||
|
|
||||||
|
std::vector<ThreadStatus> thread_list;
|
||||||
|
Status s = GetThreadList(&thread_list);
|
||||||
|
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
// repeat the test with differet number of high / low priority threads
|
||||||
|
const int kTestCount = 3;
|
||||||
|
const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5};
|
||||||
|
const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3};
|
||||||
|
for (int test = 0; test < kTestCount; ++test) {
|
||||||
|
// Change the number of threads in high / low priority pool.
|
||||||
|
env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH);
|
||||||
|
env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW);
|
||||||
|
// Wait to ensure the all threads has been registered
|
||||||
|
env_->SleepForMicroseconds(100000);
|
||||||
|
s = GetThreadList(&thread_list);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
unsigned int thread_type_counts[ThreadStatus::ThreadType::TOTAL];
|
||||||
|
memset(thread_type_counts, 0, sizeof(thread_type_counts));
|
||||||
|
for (auto thread : thread_list) {
|
||||||
|
ASSERT_LT(thread.thread_type, ThreadStatus::ThreadType::TOTAL);
|
||||||
|
thread_type_counts[thread.thread_type]++;
|
||||||
|
}
|
||||||
|
// Verify the total number of threades
|
||||||
|
ASSERT_EQ(
|
||||||
|
thread_list.size(),
|
||||||
|
kHighPriCounts[test] + kLowPriCounts[test]);
|
||||||
|
// Verify the number of high-priority threads
|
||||||
|
ASSERT_EQ(
|
||||||
|
thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY],
|
||||||
|
kHighPriCounts[test]);
|
||||||
|
// Verify the number of low-priority threads
|
||||||
|
ASSERT_EQ(
|
||||||
|
thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY],
|
||||||
|
kLowPriCounts[test]);
|
||||||
|
}
|
||||||
|
if (i == 0) {
|
||||||
|
// repeat the test with multiple column families
|
||||||
|
CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
|
||||||
|
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
db_->DropColumnFamily(handles_[2]);
|
||||||
|
handles_.erase(handles_.begin() + 2);
|
||||||
|
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_);
|
||||||
|
Close();
|
||||||
|
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_);
|
||||||
|
}
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
|
||||||
TEST(DBTest, DynamicCompactionOptions) {
|
TEST(DBTest, DynamicCompactionOptions) {
|
||||||
// minimum write buffer size is enforced at 64KB
|
// minimum write buffer size is enforced at 64KB
|
||||||
const uint64_t k32KB = 1 << 15;
|
const uint64_t k32KB = 1 << 15;
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
#include "rocksdb/types.h"
|
#include "rocksdb/types.h"
|
||||||
#include "rocksdb/transaction_log.h"
|
#include "rocksdb/transaction_log.h"
|
||||||
#include "rocksdb/listener.h"
|
#include "rocksdb/listener.h"
|
||||||
|
#include "rocksdb/thread_status.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
@ -547,6 +548,12 @@ Status DestroyDB(const std::string& name, const Options& options);
|
|||||||
Status RepairDB(const std::string& dbname, const Options& options);
|
Status RepairDB(const std::string& dbname, const Options& options);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
// Obtain the status of all rocksdb-related threads.
|
||||||
|
Status GetThreadList(std::vector<ThreadStatus>* thread_list);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
#endif // STORAGE_ROCKSDB_INCLUDE_DB_H_
|
#endif // STORAGE_ROCKSDB_INCLUDE_DB_H_
|
||||||
|
66
include/rocksdb/thread_status.h
Normal file
66
include/rocksdb/thread_status.h
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <cstddef>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
#ifndef ROCKSDB_USING_THREAD_STATUS
|
||||||
|
#define ROCKSDB_USING_THREAD_STATUS \
|
||||||
|
!defined(ROCKSDB_LITE) && \
|
||||||
|
!defined(NROCKSDB_THREAD_STATUS) && \
|
||||||
|
!defined(OS_MACOSX)
|
||||||
|
#endif
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
// A structure that describes the current status of a thread.
|
||||||
|
// The status of active threads can be fetched using
|
||||||
|
// rocksdb::GetThreadList().
|
||||||
|
struct ThreadStatus {
|
||||||
|
enum ThreadType {
|
||||||
|
ROCKSDB_HIGH_PRIORITY = 0x0,
|
||||||
|
ROCKSDB_LOW_PRIORITY = 0x1,
|
||||||
|
USER_THREAD = 0x2,
|
||||||
|
TOTAL = 0x3
|
||||||
|
};
|
||||||
|
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
ThreadStatus(const uint64_t _id,
|
||||||
|
const ThreadType _thread_type,
|
||||||
|
const std::string& _db_name,
|
||||||
|
const std::string& _cf_name,
|
||||||
|
const std::string& _event) :
|
||||||
|
thread_id(_id), thread_type(_thread_type),
|
||||||
|
db_name(_db_name),
|
||||||
|
cf_name(_cf_name),
|
||||||
|
event(_event) {}
|
||||||
|
|
||||||
|
// An unique ID for the thread.
|
||||||
|
const uint64_t thread_id;
|
||||||
|
|
||||||
|
// The type of the thread, it could be ROCKSDB_HIGH_PRIORITY,
|
||||||
|
// ROCKSDB_LOW_PRIORITY, and USER_THREAD
|
||||||
|
const ThreadType thread_type;
|
||||||
|
|
||||||
|
// The name of the DB instance where the thread is currently
|
||||||
|
// involved with. It would be set to empty string if the thread
|
||||||
|
// does not involve in any DB operation.
|
||||||
|
const std::string db_name;
|
||||||
|
|
||||||
|
// The name of the column family where the thread is currently
|
||||||
|
// It would be set to empty string if the thread does not involve
|
||||||
|
// in any column family.
|
||||||
|
const std::string cf_name;
|
||||||
|
|
||||||
|
// The event that the current thread is involved.
|
||||||
|
// It would be set to empty string if the information about event
|
||||||
|
// is not currently available.
|
||||||
|
const std::string event;
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace rocksdb
|
@ -42,6 +42,7 @@
|
|||||||
#include "util/random.h"
|
#include "util/random.h"
|
||||||
#include "util/iostats_context_imp.h"
|
#include "util/iostats_context_imp.h"
|
||||||
#include "util/rate_limiter.h"
|
#include "util/rate_limiter.h"
|
||||||
|
#include "util/thread_status_impl.h"
|
||||||
|
|
||||||
// Get nano time for mach systems
|
// Get nano time for mach systems
|
||||||
#ifdef __MACH__
|
#ifdef __MACH__
|
||||||
@ -75,6 +76,10 @@ int rocksdb_kill_odds = 0;
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
extern ThreadStatusImpl thread_local_status;
|
||||||
|
#endif
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
// A wrapper for fadvise, if the platform doesn't support fadvise,
|
// A wrapper for fadvise, if the platform doesn't support fadvise,
|
||||||
@ -1570,6 +1575,17 @@ class PosixEnv : public Env {
|
|||||||
return static_cast<int>(thread_id) >= total_threads_limit_;
|
return static_cast<int>(thread_id) >= total_threads_limit_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return the thread priority.
|
||||||
|
// This would allow its member-thread to know its priority.
|
||||||
|
Env::Priority GetThreadPriority() {
|
||||||
|
return priority_;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the thread priority.
|
||||||
|
void SetThreadPriority(Env::Priority priority) {
|
||||||
|
priority_ = priority;
|
||||||
|
}
|
||||||
|
|
||||||
void BGThread(size_t thread_id) {
|
void BGThread(size_t thread_id) {
|
||||||
bool low_io_priority = false;
|
bool low_io_priority = false;
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -1651,8 +1667,14 @@ class PosixEnv : public Env {
|
|||||||
BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
|
BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
|
||||||
size_t thread_id = meta->thread_id_;
|
size_t thread_id = meta->thread_id_;
|
||||||
ThreadPool* tp = meta->thread_pool_;
|
ThreadPool* tp = meta->thread_pool_;
|
||||||
|
// for thread-status
|
||||||
|
thread_local_status.SetThreadType(
|
||||||
|
(tp->GetThreadPriority() == Env::Priority::HIGH ?
|
||||||
|
ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY :
|
||||||
|
ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY));
|
||||||
delete meta;
|
delete meta;
|
||||||
tp->BGThread(thread_id);
|
tp->BGThread(thread_id);
|
||||||
|
thread_local_status.UnregisterThread();
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1753,6 +1775,7 @@ class PosixEnv : public Env {
|
|||||||
std::atomic_uint queue_len_; // Queue length. Used for stats reporting
|
std::atomic_uint queue_len_; // Queue length. Used for stats reporting
|
||||||
bool exit_all_threads_;
|
bool exit_all_threads_;
|
||||||
bool low_io_priority_;
|
bool low_io_priority_;
|
||||||
|
Env::Priority priority_;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::vector<ThreadPool> thread_pools_;
|
std::vector<ThreadPool> thread_pools_;
|
||||||
@ -1767,6 +1790,10 @@ PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
|
|||||||
page_size_(getpagesize()),
|
page_size_(getpagesize()),
|
||||||
thread_pools_(Priority::TOTAL) {
|
thread_pools_(Priority::TOTAL) {
|
||||||
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
|
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
|
||||||
|
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
|
||||||
|
thread_pools_[pool_id].SetThreadPriority(
|
||||||
|
static_cast<Env::Priority>(pool_id));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) {
|
void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) {
|
||||||
|
@ -24,4 +24,5 @@ inline uint32_t BloomHash(const Slice& key) {
|
|||||||
inline uint32_t GetSliceHash(const Slice& s) {
|
inline uint32_t GetSliceHash(const Slice& s) {
|
||||||
return Hash(s.data(), s.size(), 397);
|
return Hash(s.data(), s.size(), 397);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
} // namespace rocksdb
|
||||||
|
156
util/thread_list_test.cc
Normal file
156
util/thread_list_test.cc
Normal file
@ -0,0 +1,156 @@
|
|||||||
|
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
|
||||||
|
#include "util/thread_status_impl.h"
|
||||||
|
#include "util/testharness.h"
|
||||||
|
#include "rocksdb/db.h"
|
||||||
|
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
class SleepingBackgroundTask {
|
||||||
|
public:
|
||||||
|
SleepingBackgroundTask(const void* db_key, const std::string& db_name,
|
||||||
|
const void* cf_key, const std::string& cf_name)
|
||||||
|
: db_key_(db_key), db_name_(db_name),
|
||||||
|
cf_key_(cf_key), cf_name_(cf_name),
|
||||||
|
should_sleep_(true), sleeping_count_(0) {
|
||||||
|
ThreadStatusImpl::NewColumnFamilyInfo(
|
||||||
|
db_key_, db_name_, cf_key_, cf_name_);
|
||||||
|
}
|
||||||
|
|
||||||
|
~SleepingBackgroundTask() {
|
||||||
|
ThreadStatusImpl::EraseDatabaseInfo(db_key_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void DoSleep() {
|
||||||
|
thread_local_status.SetColumnFamilyInfoKey(cf_key_);
|
||||||
|
std::unique_lock<std::mutex> l(mutex_);
|
||||||
|
sleeping_count_++;
|
||||||
|
while (should_sleep_) {
|
||||||
|
bg_cv_.wait(l);
|
||||||
|
}
|
||||||
|
sleeping_count_--;
|
||||||
|
bg_cv_.notify_all();
|
||||||
|
thread_local_status.SetColumnFamilyInfoKey(0);
|
||||||
|
}
|
||||||
|
void WakeUp() {
|
||||||
|
std::unique_lock<std::mutex> l(mutex_);
|
||||||
|
should_sleep_ = false;
|
||||||
|
bg_cv_.notify_all();
|
||||||
|
}
|
||||||
|
void WaitUntilDone() {
|
||||||
|
std::unique_lock<std::mutex> l(mutex_);
|
||||||
|
while (sleeping_count_ > 0) {
|
||||||
|
bg_cv_.wait(l);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void DoSleepTask(void* arg) {
|
||||||
|
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
const void* db_key_;
|
||||||
|
const std::string db_name_;
|
||||||
|
const void* cf_key_;
|
||||||
|
const std::string cf_name_;
|
||||||
|
std::mutex mutex_;
|
||||||
|
std::condition_variable bg_cv_;
|
||||||
|
bool should_sleep_;
|
||||||
|
std::atomic<int> sleeping_count_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class ThreadListTest {
|
||||||
|
public:
|
||||||
|
ThreadListTest() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST(ThreadListTest, SimpleColumnFamilyInfoTest) {
|
||||||
|
Env* env = Env::Default();
|
||||||
|
const int kHighPriorityThreads = 3;
|
||||||
|
const int kLowPriorityThreads = 5;
|
||||||
|
const int kSleepingHighPriThreads = kHighPriorityThreads - 1;
|
||||||
|
const int kSleepingLowPriThreads = kLowPriorityThreads / 3;
|
||||||
|
env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
|
||||||
|
env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
|
||||||
|
|
||||||
|
SleepingBackgroundTask sleeping_task(
|
||||||
|
reinterpret_cast<void*>(1234), "sleeping",
|
||||||
|
reinterpret_cast<void*>(5678), "pikachu");
|
||||||
|
|
||||||
|
for (int test = 0; test < kSleepingHighPriThreads; ++test) {
|
||||||
|
env->Schedule(&SleepingBackgroundTask::DoSleepTask,
|
||||||
|
&sleeping_task, Env::Priority::HIGH);
|
||||||
|
}
|
||||||
|
for (int test = 0; test < kSleepingLowPriThreads; ++test) {
|
||||||
|
env->Schedule(&SleepingBackgroundTask::DoSleepTask,
|
||||||
|
&sleeping_task, Env::Priority::LOW);
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure everything is scheduled.
|
||||||
|
env->SleepForMicroseconds(10000);
|
||||||
|
|
||||||
|
std::vector<ThreadStatus> thread_list;
|
||||||
|
|
||||||
|
// Verify the number of sleeping threads in each pool.
|
||||||
|
GetThreadList(&thread_list);
|
||||||
|
int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0};
|
||||||
|
for (auto thread_status : thread_list) {
|
||||||
|
if (thread_status.cf_name == "pikachu" &&
|
||||||
|
thread_status.db_name == "sleeping") {
|
||||||
|
sleeping_count[thread_status.thread_type]++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT_EQ(
|
||||||
|
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY],
|
||||||
|
kSleepingHighPriThreads);
|
||||||
|
ASSERT_EQ(
|
||||||
|
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY],
|
||||||
|
kSleepingLowPriThreads);
|
||||||
|
ASSERT_EQ(
|
||||||
|
sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0);
|
||||||
|
|
||||||
|
sleeping_task.WakeUp();
|
||||||
|
sleeping_task.WaitUntilDone();
|
||||||
|
|
||||||
|
// Verify none of the threads are sleeping
|
||||||
|
GetThreadList(&thread_list);
|
||||||
|
for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) {
|
||||||
|
sleeping_count[i] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto thread_status : thread_list) {
|
||||||
|
if (thread_status.cf_name == "pikachu" &&
|
||||||
|
thread_status.db_name == "sleeping") {
|
||||||
|
sleeping_count[thread_status.thread_type]++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT_EQ(
|
||||||
|
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], 0);
|
||||||
|
ASSERT_EQ(
|
||||||
|
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], 0);
|
||||||
|
ASSERT_EQ(
|
||||||
|
sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
return rocksdb::test::RunAllTests();
|
||||||
|
}
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
198
util/thread_status_impl.cc
Normal file
198
util/thread_status_impl.cc
Normal file
@ -0,0 +1,198 @@
|
|||||||
|
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
|
||||||
|
#include "port/likely.h"
|
||||||
|
#include "util/mutexlock.h"
|
||||||
|
#include "util/thread_status_impl.h"
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
ThreadStatusImpl thread_local_status;
|
||||||
|
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
__thread ThreadStatusData* ThreadStatusImpl::thread_status_data_ = nullptr;
|
||||||
|
std::mutex ThreadStatusImpl::thread_list_mutex_;
|
||||||
|
std::unordered_set<ThreadStatusData*> ThreadStatusImpl::thread_data_set_;
|
||||||
|
std::unordered_map<const void*, ConstantColumnFamilyInfo*>
|
||||||
|
ThreadStatusImpl::cf_info_map_;
|
||||||
|
std::unordered_map<const void*, std::unordered_set<const void*>>
|
||||||
|
ThreadStatusImpl::db_key_map_;
|
||||||
|
|
||||||
|
ThreadStatusImpl::~ThreadStatusImpl() {
|
||||||
|
std::lock_guard<std::mutex> lck(thread_list_mutex_);
|
||||||
|
for (auto* thread_data : thread_data_set_) {
|
||||||
|
assert(thread_data->thread_type == ThreadStatus::ThreadType::USER_THREAD);
|
||||||
|
delete thread_data;
|
||||||
|
}
|
||||||
|
assert(thread_data_set_.size() == 0);
|
||||||
|
thread_data_set_.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::UnregisterThread() {
|
||||||
|
if (thread_status_data_ != nullptr) {
|
||||||
|
std::lock_guard<std::mutex> lck(thread_list_mutex_);
|
||||||
|
thread_data_set_.erase(thread_status_data_);
|
||||||
|
delete thread_status_data_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::SetThreadType(
|
||||||
|
ThreadStatus::ThreadType ttype) {
|
||||||
|
auto* data = InitAndGet();
|
||||||
|
data->thread_type.store(ttype, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::SetColumnFamilyInfoKey(
|
||||||
|
const void* cf_key) {
|
||||||
|
auto* data = InitAndGet();
|
||||||
|
data->cf_key.store(cf_key, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::SetEventInfoPtr(
|
||||||
|
const ThreadEventInfo* event_info) {
|
||||||
|
auto* data = InitAndGet();
|
||||||
|
data->event_info.store(event_info, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
Status ThreadStatusImpl::GetThreadList(
|
||||||
|
std::vector<ThreadStatus>* thread_list) const {
|
||||||
|
thread_list->clear();
|
||||||
|
std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lck(thread_list_mutex_);
|
||||||
|
for (auto* thread_data : thread_data_set_) {
|
||||||
|
assert(thread_data);
|
||||||
|
auto thread_type = thread_data->thread_type.load(
|
||||||
|
std::memory_order_relaxed);
|
||||||
|
auto cf_key = thread_data->cf_key.load(
|
||||||
|
std::memory_order_relaxed);
|
||||||
|
auto iter = cf_info_map_.find(
|
||||||
|
thread_data->cf_key.load(std::memory_order_relaxed));
|
||||||
|
assert(cf_key == 0 || iter != cf_info_map_.end());
|
||||||
|
auto* cf_info = iter != cf_info_map_.end() ?
|
||||||
|
iter->second : nullptr;
|
||||||
|
auto* event_info = thread_data->event_info.load(
|
||||||
|
std::memory_order_relaxed);
|
||||||
|
const std::string* db_name = nullptr;
|
||||||
|
const std::string* cf_name = nullptr;
|
||||||
|
const std::string* event_name = nullptr;
|
||||||
|
if (cf_info != nullptr) {
|
||||||
|
db_name = &cf_info->db_name;
|
||||||
|
cf_name = &cf_info->cf_name;
|
||||||
|
// display lower-level info only when higher-level info is available.
|
||||||
|
if (event_info != nullptr) {
|
||||||
|
event_name = &event_info->event_name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
thread_list->emplace_back(
|
||||||
|
thread_data->thread_id, thread_type,
|
||||||
|
db_name ? *db_name : "",
|
||||||
|
cf_name ? *cf_name : "",
|
||||||
|
event_name ? *event_name : "");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
ThreadStatusData* ThreadStatusImpl::InitAndGet() {
|
||||||
|
if (UNLIKELY(thread_status_data_ == nullptr)) {
|
||||||
|
thread_status_data_ = new ThreadStatusData();
|
||||||
|
thread_status_data_->thread_id = reinterpret_cast<uint64_t>(
|
||||||
|
thread_status_data_);
|
||||||
|
std::lock_guard<std::mutex> lck(thread_list_mutex_);
|
||||||
|
thread_data_set_.insert(thread_status_data_);
|
||||||
|
}
|
||||||
|
return thread_status_data_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::NewColumnFamilyInfo(
|
||||||
|
const void* db_key, const std::string& db_name,
|
||||||
|
const void* cf_key, const std::string& cf_name) {
|
||||||
|
std::lock_guard<std::mutex> lck(thread_list_mutex_);
|
||||||
|
|
||||||
|
cf_info_map_[cf_key] = new ConstantColumnFamilyInfo(db_key, db_name, cf_name);
|
||||||
|
db_key_map_[db_key].insert(cf_key);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) {
|
||||||
|
std::lock_guard<std::mutex> lck(thread_list_mutex_);
|
||||||
|
auto cf_pair = cf_info_map_.find(cf_key);
|
||||||
|
assert(cf_pair != cf_info_map_.end());
|
||||||
|
|
||||||
|
auto* cf_info = cf_pair->second;
|
||||||
|
assert(cf_info);
|
||||||
|
|
||||||
|
// Remove its entry from db_key_map_ by the following steps:
|
||||||
|
// 1. Obtain the entry in db_key_map_ whose set contains cf_key
|
||||||
|
// 2. Remove it from the set.
|
||||||
|
auto db_pair = db_key_map_.find(cf_info->db_key);
|
||||||
|
assert(db_pair != db_key_map_.end());
|
||||||
|
int result __attribute__((unused)) = db_pair->second.erase(cf_key);
|
||||||
|
assert(result);
|
||||||
|
|
||||||
|
delete cf_info;
|
||||||
|
result = cf_info_map_.erase(cf_key);
|
||||||
|
assert(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) {
|
||||||
|
std::lock_guard<std::mutex> lck(thread_list_mutex_);
|
||||||
|
auto db_pair = db_key_map_.find(db_key);
|
||||||
|
if (UNLIKELY(db_pair == db_key_map_.end())) {
|
||||||
|
// In some occasional cases such as DB::Open fails, we won't
|
||||||
|
// register ColumnFamilyInfo for a db.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int result __attribute__((unused)) = 0;
|
||||||
|
for (auto cf_key : db_pair->second) {
|
||||||
|
auto cf_pair = cf_info_map_.find(cf_key);
|
||||||
|
assert(cf_pair != cf_info_map_.end());
|
||||||
|
result = cf_info_map_.erase(cf_key);
|
||||||
|
delete cf_pair->second;
|
||||||
|
assert(result);
|
||||||
|
}
|
||||||
|
db_key_map_.erase(db_key);
|
||||||
|
}
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
ThreadStatusImpl::~ThreadStatusImpl() {
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::UnregisterThread() {
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::SetThreadType(
|
||||||
|
ThreadStatus::ThreadType ttype) {
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::SetColumnFamilyInfoKey(
|
||||||
|
const void* cf_key) {
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::SetEventInfoPtr(
|
||||||
|
const ThreadEventInfo* event_info) {
|
||||||
|
}
|
||||||
|
|
||||||
|
Status ThreadStatusImpl::GetThreadList(
|
||||||
|
std::vector<ThreadStatus>* thread_list) const {
|
||||||
|
return Status::NotSupported(
|
||||||
|
"GetThreadList is not supported in the current running environment.");
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::NewColumnFamilyInfo(
|
||||||
|
const void* db_key, const std::string& db_name,
|
||||||
|
const void* cf_key, const std::string& cf_name) {
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) {
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) {
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
} // namespace rocksdb
|
164
util/thread_status_impl.h
Normal file
164
util/thread_status_impl.h
Normal file
@ -0,0 +1,164 @@
|
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
// The implementation of ThreadStatus. It is implemented via combination
|
||||||
|
// of macros and thread-local variables.
|
||||||
|
//
|
||||||
|
// Note that we make get and set access to ThreadStatusData lockless.
|
||||||
|
// As a result, ThreadStatusData as a whole is not atomic. However,
|
||||||
|
// we guarantee consistent ThreadStatusData all the time whenever
|
||||||
|
// user call GetThreadList(). This consistency guarantee is done
|
||||||
|
// by having the following constraint in the internal implementation
|
||||||
|
// of set and get order:
|
||||||
|
//
|
||||||
|
// 1. When reset any information in ThreadStatusData, always start from
|
||||||
|
// clearing up the lower-level information first.
|
||||||
|
// 2. When setting any information in ThreadStatusData, always start from
|
||||||
|
// setting the higher-level information.
|
||||||
|
// 3. When returning ThreadStatusData to the user, fields are fetched from
|
||||||
|
// higher-level to lower-level. In addition, where there's a nullptr
|
||||||
|
// in one field, then all fields that has lower-level than that field
|
||||||
|
// should be ignored.
|
||||||
|
//
|
||||||
|
// The high to low level information would be:
|
||||||
|
// thread_id > thread_type > db > cf > event > event_count > event_details
|
||||||
|
//
|
||||||
|
// This means user might not always get full information, but whenever
|
||||||
|
// returned by the GetThreadList() is guaranteed to be consistent.
|
||||||
|
#pragma once
|
||||||
|
#include <unordered_set>
|
||||||
|
#include <atomic>
|
||||||
|
#include <string>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <mutex>
|
||||||
|
#include <list>
|
||||||
|
#include <vector>
|
||||||
|
#include "rocksdb/status.h"
|
||||||
|
#include "rocksdb/thread_status.h"
|
||||||
|
#include "port/port_posix.h"
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
class ColumnFamilyHandle;
|
||||||
|
|
||||||
|
// The mutable version of ThreadStatus. It has a static set maintaining
|
||||||
|
// the set of current registered threades.
|
||||||
|
//
|
||||||
|
// Note that it is suggested to call the above macros.
|
||||||
|
struct ConstantColumnFamilyInfo {
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
public:
|
||||||
|
ConstantColumnFamilyInfo(
|
||||||
|
const void* _db_key,
|
||||||
|
const std::string& _db_name,
|
||||||
|
const std::string& _cf_name) :
|
||||||
|
db_key(_db_key), db_name(_db_name), cf_name(_cf_name) {}
|
||||||
|
const void* db_key;
|
||||||
|
const std::string db_name;
|
||||||
|
const std::string cf_name;
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ThreadEventInfo {
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
public:
|
||||||
|
const std::string event_name;
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
};
|
||||||
|
|
||||||
|
// the internal data-structure that is used to reflect the current
|
||||||
|
// status of a thread using a set of atomic pointers.
|
||||||
|
struct ThreadStatusData {
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
explicit ThreadStatusData() : thread_id(0) {
|
||||||
|
thread_type.store(ThreadStatus::ThreadType::USER_THREAD);
|
||||||
|
cf_key.store(0);
|
||||||
|
event_info.store(nullptr);
|
||||||
|
}
|
||||||
|
uint64_t thread_id;
|
||||||
|
std::atomic<ThreadStatus::ThreadType> thread_type;
|
||||||
|
std::atomic<const void*> cf_key;
|
||||||
|
std::atomic<const ThreadEventInfo*> event_info;
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
};
|
||||||
|
|
||||||
|
class ThreadStatusImpl {
|
||||||
|
public:
|
||||||
|
ThreadStatusImpl() {}
|
||||||
|
|
||||||
|
// Releases all ThreadStatusData of all active threads.
|
||||||
|
~ThreadStatusImpl();
|
||||||
|
|
||||||
|
void UnregisterThread();
|
||||||
|
|
||||||
|
// Set the thread type of the current thread.
|
||||||
|
void SetThreadType(ThreadStatus::ThreadType ttype);
|
||||||
|
|
||||||
|
// Update the column-family info of the current thread by setting
|
||||||
|
// its thread-local pointer of ThreadEventInfo to the correct entry.
|
||||||
|
void SetColumnFamilyInfoKey(const void* cf_key);
|
||||||
|
|
||||||
|
// Update the event info of the current thread by setting
|
||||||
|
// its thread-local pointer of ThreadEventInfo to the correct entry.
|
||||||
|
void SetEventInfoPtr(const ThreadEventInfo* event_info);
|
||||||
|
|
||||||
|
Status GetThreadList(
|
||||||
|
std::vector<ThreadStatus>* thread_list) const;
|
||||||
|
|
||||||
|
// Create an entry in the global ColumnFamilyInfo table for the
|
||||||
|
// specified column family. This function should be called only
|
||||||
|
// when the current thread does not hold db_mutex.
|
||||||
|
static void NewColumnFamilyInfo(
|
||||||
|
const void* db_key, const std::string& db_name,
|
||||||
|
const void* cf_key, const std::string& cf_name);
|
||||||
|
|
||||||
|
// Erase all ConstantColumnFamilyInfo that is associated with the
|
||||||
|
// specified db instance. This function should be called only when
|
||||||
|
// the current thread does not hold db_mutex.
|
||||||
|
static void EraseDatabaseInfo(const void* db_key);
|
||||||
|
|
||||||
|
// Erase the ConstantColumnFamilyInfo that is associated with the
|
||||||
|
// specified ColumnFamilyData. This function should be called only
|
||||||
|
// when the current thread does not hold db_mutex.
|
||||||
|
static void EraseColumnFamilyInfo(const void* cf_key);
|
||||||
|
|
||||||
|
// Verifies whether the input ColumnFamilyHandles matches
|
||||||
|
// the information stored in the current cf_info_map.
|
||||||
|
static void TEST_VerifyColumnFamilyInfoMap(
|
||||||
|
const std::vector<ColumnFamilyHandle*>& handles);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// The thread-local variable for storing thread status.
|
||||||
|
static __thread ThreadStatusData* thread_status_data_;
|
||||||
|
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
|
||||||
|
// Obtain the pointer to the thread status data. It also performs
|
||||||
|
// initialization when necessary.
|
||||||
|
ThreadStatusData* InitAndGet();
|
||||||
|
|
||||||
|
// The mutex that protects cf_info_map and db_key_map.
|
||||||
|
static std::mutex thread_list_mutex_;
|
||||||
|
|
||||||
|
// The current status data of all active threads.
|
||||||
|
static std::unordered_set<ThreadStatusData*> thread_data_set_;
|
||||||
|
|
||||||
|
// A global map that keeps the column family information. It is stored
|
||||||
|
// globally instead of inside DB is to avoid the situation where DB is
|
||||||
|
// closing while GetThreadList function already get the pointer to its
|
||||||
|
// CopnstantColumnFamilyInfo.
|
||||||
|
static std::unordered_map<
|
||||||
|
const void*, ConstantColumnFamilyInfo*> cf_info_map_;
|
||||||
|
|
||||||
|
// A db_key to cf_key map that allows erasing elements in cf_info_map
|
||||||
|
// associated to the same db_key faster.
|
||||||
|
static std::unordered_map<
|
||||||
|
const void*, std::unordered_set<const void*>> db_key_map_;
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
extern ThreadStatusImpl thread_local_status;
|
||||||
|
} // namespace rocksdb
|
26
util/thread_status_impl_debug.cc
Normal file
26
util/thread_status_impl_debug.cc
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
#include "util/thread_status_impl.h"
|
||||||
|
#include "db/column_family.h"
|
||||||
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
void ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(
|
||||||
|
const std::vector<ColumnFamilyHandle*>& handles) {
|
||||||
|
std::unique_lock<std::mutex> lock(thread_list_mutex_);
|
||||||
|
assert(cf_info_map_.size() == handles.size());
|
||||||
|
for (auto* handle : handles) {
|
||||||
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
|
||||||
|
auto iter = cf_info_map_.find(cfd);
|
||||||
|
assert(iter != cf_info_map_.end());
|
||||||
|
assert(iter->second);
|
||||||
|
assert(iter->second->cf_name == cfd->GetName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // namespace rocksdb
|
||||||
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
@ -8,6 +8,7 @@
|
|||||||
#include "db/db_impl.h"
|
#include "db/db_impl.h"
|
||||||
#include "db/version_set.h"
|
#include "db/version_set.h"
|
||||||
#include "table/get_context.h"
|
#include "table/get_context.h"
|
||||||
|
#include "util/thread_status_impl.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
@ -102,6 +103,7 @@ Status CompactedDBImpl::Init(const Options& options) {
|
|||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
NewThreadStatusCfInfo(cfd_);
|
||||||
version_ = cfd_->GetSuperVersion()->current;
|
version_ = cfd_->GetSuperVersion()->current;
|
||||||
user_comparator_ = cfd_->user_comparator();
|
user_comparator_ = cfd_->user_comparator();
|
||||||
auto* vstorage = version_->storage_info();
|
auto* vstorage = version_->storage_info();
|
||||||
|
Loading…
Reference in New Issue
Block a user