Allow GetThreadList() to report operation stage.

Summary: Allow GetThreadList() to report operation stage.

Test Plan:
  ./thread_list_test
  ./db_bench --benchmarks=fillrandom --num=100000 --threads=40 \
    --max_background_compactions=10 --max_background_flushes=3 \
    --thread_status_per_interval=1000 --key_size=16 --value_size=1000 \
    --num_column_families=10

  export ROCKSDB_TESTS=ThreadStatus
  ./db_test

Sample output
          ThreadID ThreadType                    cfName    Operation        OP_StartTime    ElapsedTime                                         Stage        State
   140116265861184    Low Pri
   140116270055488    Low Pri
   140116274249792   High Pri column_family_name_000005        Flush 2015/03/10-14:58:11           0 us                    FlushJob::WriteLevel0Table
   140116400078912    Low Pri column_family_name_000004   Compaction 2015/03/10-14:58:11           0 us     CompactionJob::FinishCompactionOutputFile
   140116358135872    Low Pri column_family_name_000006   Compaction 2015/03/10-14:58:10           1 us     CompactionJob::FinishCompactionOutputFile
   140116341358656    Low Pri
   140116295221312   High Pri                   default        Flush 2015/03/10-14:58:11           0 us                    FlushJob::WriteLevel0Table
   140116324581440    Low Pri column_family_name_000009   Compaction 2015/03/10-14:58:11           0 us      CompactionJob::ProcessKeyValueCompaction
   140116278444096    Low Pri
   140116299415616    Low Pri column_family_name_000008   Compaction 2015/03/10-14:58:11           0 us     CompactionJob::FinishCompactionOutputFile
   140116291027008   High Pri column_family_name_000001        Flush 2015/03/10-14:58:11           0 us                    FlushJob::WriteLevel0Table
   140116286832704    Low Pri column_family_name_000002   Compaction 2015/03/10-14:58:11           0 us     CompactionJob::FinishCompactionOutputFile
   140116282638400    Low Pri

Reviewers: rven, igor, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D34683
This commit is contained in:
Yueh-Hsuan Chiang 2015-03-13 10:45:40 -07:00
parent 2623b2cf6d
commit c594b0e89d
15 changed files with 207 additions and 24 deletions

View File

@ -225,9 +225,22 @@ CompactionJob::CompactionJob(
snapshots_(snapshots), snapshots_(snapshots),
is_snapshot_supported_(is_snapshot_supported), is_snapshot_supported_(is_snapshot_supported),
table_cache_(std::move(table_cache)), table_cache_(std::move(table_cache)),
yield_callback_(std::move(yield_callback)) {} yield_callback_(std::move(yield_callback)) {
ThreadStatusUtil::SetColumnFamily(
compact_->compaction->column_family_data());
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
TEST_SYNC_POINT("CompactionJob::CompationJob()");
}
CompactionJob::~CompactionJob() {
assert(compact_ == nullptr);
TEST_SYNC_POINT("CompactionJob::~CompactionJob()");
ThreadStatusUtil::ResetThreadStatus();
}
void CompactionJob::Prepare() { void CompactionJob::Prepare() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PREPARE);
compact_->CleanupBatchBuffer(); compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer(); compact_->CleanupMergedBuffer();
@ -275,11 +288,10 @@ void CompactionJob::Prepare() {
} }
Status CompactionJob::Run() { Status CompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
log_buffer_->FlushBufferToLog(); log_buffer_->FlushBufferToLog();
ColumnFamilyData* cfd = compact_->compaction->column_family_data(); ColumnFamilyData* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
TEST_SYNC_POINT("CompactionJob::Run:Start");
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
std::unique_ptr<Iterator> input( std::unique_ptr<Iterator> input(
@ -469,12 +481,12 @@ Status CompactionJob::Run() {
RecordCompactionIOStats(); RecordCompactionIOStats();
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run:End");
ThreadStatusUtil::ResetThreadStatus();
return status; return status;
} }
void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_INSTALL);
db_mutex->AssertHeld(); db_mutex->AssertHeld();
ColumnFamilyData* cfd = compact_->compaction->column_family_data(); ColumnFamilyData* cfd = compact_->compaction->column_family_data();
cfd->internal_stats()->AddCompactionStats( cfd->internal_stats()->AddCompactionStats(
@ -511,6 +523,8 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) {
Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
Iterator* input, Iterator* input,
bool is_compaction_v2) { bool is_compaction_v2) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
size_t combined_idx = 0; size_t combined_idx = 0;
Status status; Status status;
std::string compaction_filter_value; std::string compaction_filter_value;
@ -849,6 +863,8 @@ void CompactionJob::CallCompactionFilterV2(
if (compact_ == nullptr || compaction_filter_v2 == nullptr) { if (compact_ == nullptr || compaction_filter_v2 == nullptr) {
return; return;
} }
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_FILTER_V2);
// Assemble slice vectors for user keys and existing values. // Assemble slice vectors for user keys and existing values.
// We also keep track of our parsed internal key structs because // We also keep track of our parsed internal key structs because
@ -907,6 +923,8 @@ void CompactionJob::CallCompactionFilterV2(
} }
Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
assert(compact_ != nullptr); assert(compact_ != nullptr);
assert(compact_->outfile); assert(compact_->outfile);
assert(compact_->builder != nullptr); assert(compact_->builder != nullptr);

View File

@ -62,7 +62,7 @@ class CompactionJob {
bool is_snapshot_supported, std::shared_ptr<Cache> table_cache, bool is_snapshot_supported, std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback); std::function<uint64_t()> yield_callback);
~CompactionJob() { assert(compact_ == nullptr); } ~CompactionJob();
// no copy/move // no copy/move
CompactionJob(CompactionJob&& job) = delete; CompactionJob(CompactionJob&& job) = delete;

View File

@ -926,9 +926,9 @@ class Stats {
std::vector<ThreadStatus> thread_list; std::vector<ThreadStatus> thread_list;
FLAGS_env->GetThreadList(&thread_list); FLAGS_env->GetThreadList(&thread_list);
fprintf(stderr, "\n%18s %10s %25s %12s %20s %13s %12s\n", fprintf(stderr, "\n%18s %10s %25s %12s %20s %13s %45s %12s\n",
"ThreadID", "ThreadType", "cfName", "Operation", "ThreadID", "ThreadType", "cfName", "Operation",
"OP_StartTime ", "ElapsedTime", "State"); "OP_StartTime ", "ElapsedTime", "Stage", "State");
int64_t current_time = 0; int64_t current_time = 0;
Env::Default()->GetCurrentTime(&current_time); Env::Default()->GetCurrentTime(&current_time);
@ -941,13 +941,14 @@ class Stats {
} else { } else {
elapsed_time[0] = 0; elapsed_time[0] = 0;
} }
fprintf(stderr, "%18" PRIu64 " %10s %25s %12s %20s %13s %12s\n", fprintf(stderr, "%18" PRIu64 " %10s %25s %12s %20s %13s %45s %12s\n",
ts.thread_id, ts.thread_id,
ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(), ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(),
ts.cf_name.c_str(), ts.cf_name.c_str(),
ThreadStatus::GetOperationName(ts.operation_type).c_str(), ThreadStatus::GetOperationName(ts.operation_type).c_str(),
ThreadStatus::TimeToString(ts.op_start_time).c_str(), ThreadStatus::TimeToString(ts.op_start_time).c_str(),
elapsed_time, elapsed_time,
ThreadStatus::GetOperationStageName(ts.operation_stage).c_str(),
ThreadStatus::GetStateName(ts.state_type).c_str()); ThreadStatus::GetStateName(ts.state_type).c_str());
} }
} }

View File

@ -10107,8 +10107,8 @@ TEST(DBTest, ThreadStatusFlush) {
options = CurrentOptions(options); options = CurrentOptions(options);
rocksdb::SyncPoint::GetInstance()->LoadDependency({ rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"FlushJob::Run:Start", "DBTest::ThreadStatusFlush:1"}, {"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"},
{"DBTest::ThreadStatusFlush:2", "FlushJob::Run:End"}, {"DBTest::ThreadStatusFlush:2", "FlushJob::~FlushJob()"},
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -10151,8 +10151,10 @@ TEST(DBTest, ThreadStatusSingleCompaction) {
options.level0_file_num_compaction_trigger = kNumL0Files; options.level0_file_num_compaction_trigger = kNumL0Files;
rocksdb::SyncPoint::GetInstance()->LoadDependency({ rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"CompactionJob::Run:Start", "DBTest::ThreadStatusSingleCompaction:1"}, {"CompactionJob::CompationJob()",
{"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run:End"}, "DBTest::ThreadStatusSingleCompaction:1"},
{"DBTest::ThreadStatusSingleCompaction:2",
"CompactionJob::Run:~CompactionJob()"},
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -10175,6 +10177,7 @@ TEST(DBTest, ThreadStatusSingleCompaction) {
// If thread tracking is not enabled, compaction count should be 0. // If thread tracking is not enabled, compaction count should be 0.
VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0); VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0);
} }
// TODO(yhchiang): adding assert to verify each compaction stage.
TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2"); TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2");
// repeat the test with disabling thread tracking. // repeat the test with disabling thread tracking.

View File

@ -79,9 +79,21 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
output_file_directory_(output_file_directory), output_file_directory_(output_file_directory),
output_compression_(output_compression), output_compression_(output_compression),
stats_(stats), stats_(stats),
event_logger_(event_logger) {} event_logger_(event_logger) {
// Update the thread status to indicate flush.
ThreadStatusUtil::SetColumnFamily(cfd_);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
TEST_SYNC_POINT("FlushJob::FlushJob()");
}
FlushJob::~FlushJob() {
TEST_SYNC_POINT("FlushJob::~FlushJob()");
ThreadStatusUtil::ResetThreadStatus();
}
Status FlushJob::Run(uint64_t* file_number) { Status FlushJob::Run(uint64_t* file_number) {
AutoThreadOperationStageUpdater stage_run(
ThreadStatus::STAGE_FLUSH_RUN);
// Save the contents of the earliest memtable as a new Table // Save the contents of the earliest memtable as a new Table
uint64_t fn; uint64_t fn;
autovector<MemTable*> mems; autovector<MemTable*> mems;
@ -92,10 +104,6 @@ Status FlushJob::Run(uint64_t* file_number) {
return Status::OK(); return Status::OK();
} }
// Update the thread status to indicate flush.
ThreadStatusUtil::SetColumnFamily(cfd_);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
TEST_SYNC_POINT("FlushJob::Run:Start");
// entries mems are (implicitly) sorted in ascending order by their created // entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for // time. We will use the first memtable's `edit` to keep the meta info for
@ -130,13 +138,13 @@ Status FlushJob::Run(uint64_t* file_number) {
*file_number = fn; *file_number = fn;
} }
TEST_SYNC_POINT("FlushJob::Run:End");
ThreadStatusUtil::ResetThreadStatus();
return s; return s;
} }
Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems, Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber) { VersionEdit* edit, uint64_t* filenumber) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0);
db_mutex_->AssertHeld(); db_mutex_->AssertHeld();
const uint64_t start_micros = db_options_.env->NowMicros(); const uint64_t start_micros = db_options_.env->NowMicros();
FileMetaData meta; FileMetaData meta;

View File

@ -61,7 +61,8 @@ class FlushJob {
LogBuffer* log_buffer, Directory* db_directory, LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory, CompressionType output_compression, Directory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger); Statistics* stats, EventLogger* event_logger);
~FlushJob() {}
~FlushJob();
Status Run(uint64_t* file_number = nullptr); Status Run(uint64_t* file_number = nullptr);

View File

@ -19,6 +19,7 @@
#include "table/merger.h" #include "table/merger.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/log_buffer.h" #include "util/log_buffer.h"
#include "util/thread_status_util.h"
namespace rocksdb { namespace rocksdb {
@ -127,6 +128,8 @@ bool MemTableList::IsFlushPending() const {
// Returns the memtables that need to be flushed. // Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) { void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
const auto& memlist = current_->memlist_; const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it; MemTable* m = *it;
@ -145,6 +148,8 @@ void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems, void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t file_number) { uint64_t file_number) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
assert(!mems.empty()); assert(!mems.empty());
// If the flush was not successful, then just reset state. // If the flush was not successful, then just reset state.
@ -167,6 +172,8 @@ Status MemTableList::InstallMemtableFlushResults(
const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu, const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu,
uint64_t file_number, autovector<MemTable*>* to_delete, uint64_t file_number, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer) { Directory* db_directory, LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld(); mu->AssertHeld();
// flush was sucessful // flush was sucessful

View File

@ -48,6 +48,22 @@ struct ThreadStatus {
NUM_OP_TYPES NUM_OP_TYPES
}; };
enum OperationStage : int {
STAGE_UNKNOWN = 0,
STAGE_FLUSH_RUN,
STAGE_FLUSH_WRITE_L0,
STAGE_COMPACTION_PREPARE,
STAGE_COMPACTION_RUN,
STAGE_COMPACTION_PROCESS_KV,
STAGE_COMPACTION_FILTER_V2,
STAGE_COMPACTION_INSTALL,
STAGE_COMPACTION_SYNC_FILE,
STAGE_PICK_MEMTABLES_TO_FLUSH,
STAGE_MEMTABLE_ROLLBACK,
STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS,
NUM_OP_STAGES
};
// The type used to refer to a thread state. // The type used to refer to a thread state.
// A state describes lower-level action of a thread // A state describes lower-level action of a thread
// such as reading / writing a file or waiting for a mutex. // such as reading / writing a file or waiting for a mutex.
@ -63,12 +79,14 @@ struct ThreadStatus {
const std::string& _cf_name, const std::string& _cf_name,
const OperationType _operation_type, const OperationType _operation_type,
const int64_t _op_start_time, const int64_t _op_start_time,
const OperationStage _operation_stage,
const StateType _state_type) : const StateType _state_type) :
thread_id(_id), thread_type(_thread_type), thread_id(_id), thread_type(_thread_type),
db_name(_db_name), db_name(_db_name),
cf_name(_cf_name), cf_name(_cf_name),
operation_type(_operation_type), operation_type(_operation_type),
op_start_time(_op_start_time), op_start_time(_op_start_time),
operation_stage(_operation_stage),
state_type(_state_type) {} state_type(_state_type) {}
// An unique ID for the thread. // An unique ID for the thread.
@ -95,6 +113,10 @@ struct ThreadStatus {
// Epoch, 1970-01-01 00:00:00 (UTC). // Epoch, 1970-01-01 00:00:00 (UTC).
const int64_t op_start_time; const int64_t op_start_time;
// An integer showing the current stage where the thread is involved
// in the current operation.
const OperationStage operation_stage;
// The state (lower-level action) that the current thread is involved. // The state (lower-level action) that the current thread is involved.
const StateType state_type; const StateType state_type;
@ -108,6 +130,10 @@ struct ThreadStatus {
static const std::string TimeToString(int64_t op_start_time); static const std::string TimeToString(int64_t op_start_time);
// Obtain a human-readable string describing the specified operation stage.
static const std::string& GetOperationStageName(
OperationStage stage);
// Obtain the name of a state given its type. // Obtain the name of a state given its type.
static const std::string& GetStateName(StateType state_type); static const std::string& GetStateName(StateType state_type);
}; };

View File

@ -94,7 +94,7 @@ class ThreadListTest {
} }
}; };
TEST(ThreadListTest, EventTables) { TEST(ThreadListTest, GlobalTables) {
// verify the global tables for operations and states are properly indexed. // verify the global tables for operations and states are properly indexed.
for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) { for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) {
ASSERT_EQ(global_operation_table[type].type, type); ASSERT_EQ(global_operation_table[type].type, type);
@ -109,6 +109,13 @@ TEST(ThreadListTest, EventTables) {
ThreadStatus::GetStateName( ThreadStatus::GetStateName(
ThreadStatus::StateType(type))); ThreadStatus::StateType(type)));
} }
for (int stage = 0; stage != ThreadStatus::NUM_OP_STAGES; ++stage) {
ASSERT_EQ(global_op_stage_table[stage].stage, stage);
ASSERT_EQ(global_op_stage_table[stage].name,
ThreadStatus::GetOperationStageName(
ThreadStatus::OperationStage(stage)));
}
} }
TEST(ThreadListTest, SimpleColumnFamilyInfoTest) { TEST(ThreadListTest, SimpleColumnFamilyInfoTest) {

View File

@ -41,6 +41,40 @@ static OperationInfo global_operation_table[] = {
{ThreadStatus::OP_FLUSH, "Flush"} {ThreadStatus::OP_FLUSH, "Flush"}
}; };
struct OperationStageInfo {
const ThreadStatus::OperationStage stage;
const std::string name;
};
// A table maintains the mapping from stage type to stage string.
// Note that the string must be changed accordingly when the
// associated function name changed.
static OperationStageInfo global_op_stage_table[] = {
{ThreadStatus::STAGE_UNKNOWN, ""},
{ThreadStatus::STAGE_FLUSH_RUN,
"FlushJob::Run"},
{ThreadStatus::STAGE_FLUSH_WRITE_L0,
"FlushJob::WriteLevel0Table"},
{ThreadStatus::STAGE_COMPACTION_PREPARE,
"CompactionJob::Prepare"},
{ThreadStatus::STAGE_COMPACTION_RUN,
"CompactionJob::Run"},
{ThreadStatus::STAGE_COMPACTION_PROCESS_KV,
"CompactionJob::ProcessKeyValueCompaction"},
{ThreadStatus::STAGE_COMPACTION_FILTER_V2,
"CompactionJob::CallCompactionFilterV2"},
{ThreadStatus::STAGE_COMPACTION_INSTALL,
"CompactionJob::Install"},
{ThreadStatus::STAGE_COMPACTION_SYNC_FILE,
"CompactionJob::FinishCompactionOutputFile"},
{ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH,
"MemTableList::PickMemtablesToFlush"},
{ThreadStatus::STAGE_MEMTABLE_ROLLBACK,
"MemTableList::RollbackMemtableFlush"},
{ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS,
"MemTableList::InstallMemtableFlushResults"},
};
// The structure that describes a state. // The structure that describes a state.
struct StateInfo { struct StateInfo {
const ThreadStatus::StateType type; const ThreadStatus::StateType type;

View File

@ -23,6 +23,11 @@ const std::string& ThreadStatus::GetOperationName(
return global_operation_table[op_type].name; return global_operation_table[op_type].name;
} }
const std::string& ThreadStatus::GetOperationStageName(
ThreadStatus::OperationStage stage) {
return global_op_stage_table[stage].name;
}
const std::string& ThreadStatus::GetStateName( const std::string& ThreadStatus::GetStateName(
ThreadStatus::StateType state_type) { ThreadStatus::StateType state_type) {
return global_state_table[state_type].name; return global_state_table[state_type].name;
@ -50,6 +55,12 @@ const std::string& ThreadStatus::GetOperationName(
return dummy_str; return dummy_str;
} }
const std::string& ThreadStatus::GetOperationStageName(
ThreadStatus::OperationStage stage) {
static std::string dummy_str = "";
return dummy_str;
}
const std::string& ThreadStatus::GetStateName( const std::string& ThreadStatus::GetStateName(
ThreadStatus::StateType state_type) { ThreadStatus::StateType state_type) {
static std::string dummy_str = ""; static std::string dummy_str = "";

View File

@ -60,6 +60,8 @@ void ThreadStatusUpdater::SetThreadOperation(
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
std::memory_order_relaxed);
data->operation_type.store(type, std::memory_order_relaxed); data->operation_type.store(type, std::memory_order_relaxed);
} }
@ -78,10 +80,23 @@ void ThreadStatusUpdater::ClearThreadOperation() {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
std::memory_order_relaxed);
data->operation_type.store( data->operation_type.store(
ThreadStatus::OP_UNKNOWN, std::memory_order_relaxed); ThreadStatus::OP_UNKNOWN, std::memory_order_relaxed);
} }
ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
ThreadStatus::OperationStage stage) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return ThreadStatus::STAGE_UNKNOWN;
}
return data->operation_stage.exchange(
stage, std::memory_order_relaxed);
}
void ThreadStatusUpdater::SetThreadState( void ThreadStatusUpdater::SetThreadState(
const ThreadStatus::StateType type) { const ThreadStatus::StateType type) {
auto* data = InitAndGet(); auto* data = InitAndGet();
@ -124,6 +139,7 @@ Status ThreadStatusUpdater::GetThreadList(
const std::string* db_name = nullptr; const std::string* db_name = nullptr;
const std::string* cf_name = nullptr; const std::string* cf_name = nullptr;
ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN; ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN;
ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN; ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
int64_t op_start_time = 0; int64_t op_start_time = 0;
if (cf_info != nullptr) { if (cf_info != nullptr) {
@ -135,6 +151,8 @@ Status ThreadStatusUpdater::GetThreadList(
if (op_type != ThreadStatus::OP_UNKNOWN) { if (op_type != ThreadStatus::OP_UNKNOWN) {
op_start_time = thread_data->op_start_time.load( op_start_time = thread_data->op_start_time.load(
std::memory_order_relaxed); std::memory_order_relaxed);
op_stage = thread_data->operation_stage.load(
std::memory_order_relaxed);
state_type = thread_data->state_type.load( state_type = thread_data->state_type.load(
std::memory_order_relaxed); std::memory_order_relaxed);
} }
@ -143,7 +161,7 @@ Status ThreadStatusUpdater::GetThreadList(
thread_data->thread_id, thread_type, thread_data->thread_id, thread_type,
db_name ? *db_name : "", db_name ? *db_name : "",
cf_name ? *cf_name : "", cf_name ? *cf_name : "",
op_type, op_start_time, state_type); op_type, op_start_time, op_stage, state_type);
} }
return Status::OK(); return Status::OK();

View File

@ -87,6 +87,7 @@ struct ThreadStatusData {
std::atomic<const void*> cf_key; std::atomic<const void*> cf_key;
std::atomic<ThreadStatus::OperationType> operation_type; std::atomic<ThreadStatus::OperationType> operation_type;
std::atomic<int64_t> op_start_time; std::atomic<int64_t> op_start_time;
std::atomic<ThreadStatus::OperationStage> operation_stage;
std::atomic<ThreadStatus::StateType> state_type; std::atomic<ThreadStatus::StateType> state_type;
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
}; };
@ -126,8 +127,13 @@ class ThreadStatusUpdater {
// Update the thread operation of the current thread. // Update the thread operation of the current thread.
void SetThreadOperation(const ThreadStatus::OperationType type); void SetThreadOperation(const ThreadStatus::OperationType type);
// Set the start time of an operation.
void SetOperationStartTime(const int64_t start_time); void SetOperationStartTime(const int64_t start_time);
// Update the thread operation stage of the current thread.
ThreadStatus::OperationStage SetThreadOperationStage(
const ThreadStatus::OperationStage stage);
// Clear thread operation of the current thread. // Clear thread operation of the current thread.
void ClearThreadOperation(); void ClearThreadOperation();

View File

@ -66,6 +66,17 @@ void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) {
thread_updater_local_cache_->SetThreadOperation(op); thread_updater_local_cache_->SetThreadOperation(op);
} }
ThreadStatus::OperationStage ThreadStatusUtil::SetThreadOperationStage(
ThreadStatus::OperationStage stage) {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return ThreadStatus::STAGE_UNKNOWN;
}
return thread_updater_local_cache_->SetThreadOperationStage(stage);
}
void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) { void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) {
if (thread_updater_local_cache_ == nullptr) { if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily // thread_updater_local_cache_ must be set in SetColumnFamily
@ -118,6 +129,15 @@ bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) {
return (thread_updater_local_cache_ != nullptr); return (thread_updater_local_cache_ != nullptr);
} }
AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater(
ThreadStatus::OperationStage stage) {
prev_stage_ = ThreadStatusUtil::SetThreadOperationStage(stage);
}
AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() {
ThreadStatusUtil::SetThreadOperationStage(prev_stage_);
}
#else #else
ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr; ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr;
@ -150,6 +170,13 @@ void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) {
void ThreadStatusUtil::ResetThreadStatus() { void ThreadStatusUtil::ResetThreadStatus() {
} }
AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater(
ThreadStatus::OperationStage stage) {
}
AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() {
}
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
} // namespace rocksdb } // namespace rocksdb

View File

@ -56,6 +56,9 @@ class ThreadStatusUtil {
static void SetThreadOperation(ThreadStatus::OperationType type); static void SetThreadOperation(ThreadStatus::OperationType type);
static ThreadStatus::OperationStage SetThreadOperationStage(
ThreadStatus::OperationStage stage);
static void SetThreadState(ThreadStatus::StateType type); static void SetThreadState(ThreadStatus::StateType type);
static void ResetThreadStatus(); static void ResetThreadStatus();
@ -104,4 +107,17 @@ class ThreadStatusUtil {
#endif #endif
}; };
// A helper class for updating thread state. It will set the
// thread state according to the input parameter in its constructor
// and set the thread state to the previous state in its destructor.
class AutoThreadOperationStageUpdater {
public:
explicit AutoThreadOperationStageUpdater(
ThreadStatus::OperationStage stage);
~AutoThreadOperationStageUpdater();
private:
ThreadStatus::OperationStage prev_stage_;
};
} // namespace rocksdb } // namespace rocksdb