rocksdb/util/thread_list_test.cc
Yueh-Hsuan Chiang d0c5f28a5c Introduce GetThreadList API
Summary:
Add GetThreadList API, which allows developer to track the
status of each process.  Currently, calling GetThreadList will
only get the list of background threads in RocksDB with their
thread-id and thread-type (priority) set.  Will add more support
on this in the later diffs.

ThreadStatus currently has the following properties:

  // An unique ID for the thread.
  const uint64_t thread_id;

  // The type of the thread, it could be ROCKSDB_HIGH_PRIORITY,
  // ROCKSDB_LOW_PRIORITY, and USER_THREAD
  const ThreadType thread_type;

  // The name of the DB instance where the thread is currently
  // involved with.  It would be set to empty string if the thread
  // does not involve in any DB operation.
  const std::string db_name;

  // The name of the column family where the thread is currently
  // It would be set to empty string if the thread does not involve
  // in any column family.
  const std::string cf_name;

  // The event that the current thread is involved.
  // It would be set to empty string if the information about event
  // is not currently available.

Test Plan:
./thread_list_test
export ROCKSDB_TESTS=GetThreadList
./db_test

Reviewers: rven, igor, sdong, ljin

Reviewed By: ljin

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D25047
2014-11-20 10:49:32 -08:00

157 lines
4.4 KiB
C++

// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <mutex>
#include <condition_variable>
#include "util/thread_status_impl.h"
#include "util/testharness.h"
#include "rocksdb/db.h"
#if ROCKSDB_USING_THREAD_STATUS
namespace rocksdb {
class SleepingBackgroundTask {
public:
SleepingBackgroundTask(const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name)
: db_key_(db_key), db_name_(db_name),
cf_key_(cf_key), cf_name_(cf_name),
should_sleep_(true), sleeping_count_(0) {
ThreadStatusImpl::NewColumnFamilyInfo(
db_key_, db_name_, cf_key_, cf_name_);
}
~SleepingBackgroundTask() {
ThreadStatusImpl::EraseDatabaseInfo(db_key_);
}
void DoSleep() {
thread_local_status.SetColumnFamilyInfoKey(cf_key_);
std::unique_lock<std::mutex> l(mutex_);
sleeping_count_++;
while (should_sleep_) {
bg_cv_.wait(l);
}
sleeping_count_--;
bg_cv_.notify_all();
thread_local_status.SetColumnFamilyInfoKey(0);
}
void WakeUp() {
std::unique_lock<std::mutex> l(mutex_);
should_sleep_ = false;
bg_cv_.notify_all();
}
void WaitUntilDone() {
std::unique_lock<std::mutex> l(mutex_);
while (sleeping_count_ > 0) {
bg_cv_.wait(l);
}
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
const void* db_key_;
const std::string db_name_;
const void* cf_key_;
const std::string cf_name_;
std::mutex mutex_;
std::condition_variable bg_cv_;
bool should_sleep_;
std::atomic<int> sleeping_count_;
};
class ThreadListTest {
public:
ThreadListTest() {
}
};
TEST(ThreadListTest, SimpleColumnFamilyInfoTest) {
Env* env = Env::Default();
const int kHighPriorityThreads = 3;
const int kLowPriorityThreads = 5;
const int kSleepingHighPriThreads = kHighPriorityThreads - 1;
const int kSleepingLowPriThreads = kLowPriorityThreads / 3;
env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
SleepingBackgroundTask sleeping_task(
reinterpret_cast<void*>(1234), "sleeping",
reinterpret_cast<void*>(5678), "pikachu");
for (int test = 0; test < kSleepingHighPriThreads; ++test) {
env->Schedule(&SleepingBackgroundTask::DoSleepTask,
&sleeping_task, Env::Priority::HIGH);
}
for (int test = 0; test < kSleepingLowPriThreads; ++test) {
env->Schedule(&SleepingBackgroundTask::DoSleepTask,
&sleeping_task, Env::Priority::LOW);
}
// make sure everything is scheduled.
env->SleepForMicroseconds(10000);
std::vector<ThreadStatus> thread_list;
// Verify the number of sleeping threads in each pool.
GetThreadList(&thread_list);
int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0};
for (auto thread_status : thread_list) {
if (thread_status.cf_name == "pikachu" &&
thread_status.db_name == "sleeping") {
sleeping_count[thread_status.thread_type]++;
}
}
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY],
kSleepingHighPriThreads);
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY],
kSleepingLowPriThreads);
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0);
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
// Verify none of the threads are sleeping
GetThreadList(&thread_list);
for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) {
sleeping_count[i] = 0;
}
for (auto thread_status : thread_list) {
if (thread_status.cf_name == "pikachu" &&
thread_status.db_name == "sleeping") {
sleeping_count[thread_status.thread_type]++;
}
}
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], 0);
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], 0);
ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0);
}
} // namespace rocksdb
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}
#else
int main(int argc, char** argv) {
return 0;
}
#endif // ROCKSDB_USING_THREAD_STATUS