Add enable_thread_tracking to DBOptions
Summary: Add enable_thread_tracking to DBOptions to allow tracking thread status related to the DB. Default is off. Test Plan: export ROCKSDB_TESTS=ThreadList ./db_test Reviewers: ljin, sdong, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D29289
This commit is contained in:
parent
9e285d4238
commit
4b63fcbff3
@ -3725,17 +3725,23 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
|
|||||||
#if ROCKSDB_USING_THREAD_STATUS
|
#if ROCKSDB_USING_THREAD_STATUS
|
||||||
void DBImpl::NewThreadStatusCfInfo(
|
void DBImpl::NewThreadStatusCfInfo(
|
||||||
ColumnFamilyData* cfd) const {
|
ColumnFamilyData* cfd) const {
|
||||||
ThreadStatusImpl::NewColumnFamilyInfo(
|
if (db_options_.enable_thread_tracking) {
|
||||||
this, GetName(), cfd, cfd->GetName());
|
ThreadStatusImpl::NewColumnFamilyInfo(
|
||||||
|
this, GetName(), cfd, cfd->GetName());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBImpl::EraseThreadStatusCfInfo(
|
void DBImpl::EraseThreadStatusCfInfo(
|
||||||
ColumnFamilyData* cfd) const {
|
ColumnFamilyData* cfd) const {
|
||||||
ThreadStatusImpl::EraseColumnFamilyInfo(cfd);
|
if (db_options_.enable_thread_tracking) {
|
||||||
|
ThreadStatusImpl::EraseColumnFamilyInfo(cfd);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBImpl::EraseThreadStatusDbInfo() const {
|
void DBImpl::EraseThreadStatusDbInfo() const {
|
||||||
ThreadStatusImpl::EraseDatabaseInfo(this);
|
if (db_options_.enable_thread_tracking) {
|
||||||
|
ThreadStatusImpl::EraseDatabaseInfo(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
|
Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
|
||||||
|
@ -8986,6 +8986,8 @@ TEST(DBTest, DynamicMemtableOptions) {
|
|||||||
TEST(DBTest, GetThreadList) {
|
TEST(DBTest, GetThreadList) {
|
||||||
Options options;
|
Options options;
|
||||||
options.env = env_;
|
options.env = env_;
|
||||||
|
options.enable_thread_tracking = true;
|
||||||
|
TryReopen(options);
|
||||||
|
|
||||||
std::vector<ThreadStatus> thread_list;
|
std::vector<ThreadStatus> thread_list;
|
||||||
Status s = GetThreadList(&thread_list);
|
Status s = GetThreadList(&thread_list);
|
||||||
@ -9025,14 +9027,24 @@ TEST(DBTest, GetThreadList) {
|
|||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
// repeat the test with multiple column families
|
// repeat the test with multiple column families
|
||||||
CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
|
CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
|
||||||
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_);
|
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
db_->DropColumnFamily(handles_[2]);
|
db_->DropColumnFamily(handles_[2]);
|
||||||
handles_.erase(handles_.begin() + 2);
|
handles_.erase(handles_.begin() + 2);
|
||||||
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_);
|
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true);
|
||||||
Close();
|
Close();
|
||||||
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_);
|
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(DBTest, DisableThreadList) {
|
||||||
|
Options options;
|
||||||
|
options.env = env_;
|
||||||
|
options.enable_thread_tracking = false;
|
||||||
|
TryReopen(options);
|
||||||
|
CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
|
||||||
|
// Verify non of the column family info exists
|
||||||
|
ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, false);
|
||||||
}
|
}
|
||||||
#endif // ROCKSDB_USING_THREAD_STATUS
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
|
||||||
|
@ -884,6 +884,12 @@ struct DBOptions {
|
|||||||
// When rate limiter is enabled, it automatically enables bytes_per_sync
|
// When rate limiter is enabled, it automatically enables bytes_per_sync
|
||||||
// to 1MB.
|
// to 1MB.
|
||||||
uint64_t bytes_per_sync;
|
uint64_t bytes_per_sync;
|
||||||
|
|
||||||
|
// If true, then the status of the threads involved in this DB will
|
||||||
|
// be tracked and available via GetThreadList() API.
|
||||||
|
//
|
||||||
|
// Default: false
|
||||||
|
bool enable_thread_tracking;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Options to control the behavior of a database (passed to DB::Open)
|
// Options to control the behavior of a database (passed to DB::Open)
|
||||||
|
@ -232,7 +232,8 @@ DBOptions::DBOptions()
|
|||||||
advise_random_on_open(true),
|
advise_random_on_open(true),
|
||||||
access_hint_on_compaction_start(NORMAL),
|
access_hint_on_compaction_start(NORMAL),
|
||||||
use_adaptive_mutex(false),
|
use_adaptive_mutex(false),
|
||||||
bytes_per_sync(0) {}
|
bytes_per_sync(0),
|
||||||
|
enable_thread_tracking(false) {}
|
||||||
|
|
||||||
DBOptions::DBOptions(const Options& options)
|
DBOptions::DBOptions(const Options& options)
|
||||||
: create_if_missing(options.create_if_missing),
|
: create_if_missing(options.create_if_missing),
|
||||||
@ -274,7 +275,8 @@ DBOptions::DBOptions(const Options& options)
|
|||||||
advise_random_on_open(options.advise_random_on_open),
|
advise_random_on_open(options.advise_random_on_open),
|
||||||
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
|
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
|
||||||
use_adaptive_mutex(options.use_adaptive_mutex),
|
use_adaptive_mutex(options.use_adaptive_mutex),
|
||||||
bytes_per_sync(options.bytes_per_sync) {}
|
bytes_per_sync(options.bytes_per_sync),
|
||||||
|
enable_thread_tracking(options.enable_thread_tracking) {}
|
||||||
|
|
||||||
static const char* const access_hints[] = {
|
static const char* const access_hints[] = {
|
||||||
"NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"
|
"NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"
|
||||||
@ -342,6 +344,8 @@ void DBOptions::Dump(Logger* log) const {
|
|||||||
rate_limiter.get());
|
rate_limiter.get());
|
||||||
Log(log, " Options.bytes_per_sync: %" PRIu64,
|
Log(log, " Options.bytes_per_sync: %" PRIu64,
|
||||||
bytes_per_sync);
|
bytes_per_sync);
|
||||||
|
Log(log, " enable_thread_tracking: %d",
|
||||||
|
enable_thread_tracking);
|
||||||
} // DBOptions::Dump
|
} // DBOptions::Dump
|
||||||
|
|
||||||
void ColumnFamilyOptions::Dump(Logger* log) const {
|
void ColumnFamilyOptions::Dump(Logger* log) const {
|
||||||
|
@ -127,7 +127,8 @@ class ThreadStatusImpl {
|
|||||||
// Verifies whether the input ColumnFamilyHandles matches
|
// Verifies whether the input ColumnFamilyHandles matches
|
||||||
// the information stored in the current cf_info_map.
|
// the information stored in the current cf_info_map.
|
||||||
static void TEST_VerifyColumnFamilyInfoMap(
|
static void TEST_VerifyColumnFamilyInfoMap(
|
||||||
const std::vector<ColumnFamilyHandle*>& handles);
|
const std::vector<ColumnFamilyHandle*>& handles,
|
||||||
|
bool check_exist);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// The thread-local variable for storing thread status.
|
// The thread-local variable for storing thread status.
|
||||||
|
@ -11,15 +11,22 @@
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
void ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(
|
void ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(
|
||||||
const std::vector<ColumnFamilyHandle*>& handles) {
|
const std::vector<ColumnFamilyHandle*>& handles,
|
||||||
|
bool check_exist) {
|
||||||
std::unique_lock<std::mutex> lock(thread_list_mutex_);
|
std::unique_lock<std::mutex> lock(thread_list_mutex_);
|
||||||
assert(cf_info_map_.size() == handles.size());
|
if (check_exist) {
|
||||||
|
assert(cf_info_map_.size() == handles.size());
|
||||||
|
}
|
||||||
for (auto* handle : handles) {
|
for (auto* handle : handles) {
|
||||||
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
|
||||||
auto iter __attribute__((unused)) = cf_info_map_.find(cfd);
|
auto iter __attribute__((unused)) = cf_info_map_.find(cfd);
|
||||||
assert(iter != cf_info_map_.end());
|
if (check_exist) {
|
||||||
assert(iter->second);
|
assert(iter != cf_info_map_.end());
|
||||||
assert(iter->second->cf_name == cfd->GetName());
|
assert(iter->second);
|
||||||
|
assert(iter->second->cf_name == cfd->GetName());
|
||||||
|
} else {
|
||||||
|
assert(iter == cf_info_map_.end());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
Loading…
Reference in New Issue
Block a user