rocksdb/util/thread_status_util.cc
Yueh-Hsuan Chiang fe5c6321cb Allow EventListener::OnCompactionCompleted to return CompactionJobStats.
Summary:
Allow EventListener::OnCompactionCompleted to return CompactionJobStats,
which contains useful information about a compaction.

Example CompactionJobStats returned by OnCompactionCompleted():
    smallest_output_key_prefix 05000000
    largest_output_key_prefix 06990000
    elapsed_time 42419
    num_input_records 300
    num_input_files 3
    num_input_files_at_output_level 2
    num_output_records 200
    num_output_files 1
    actual_bytes_input 167200
    actual_bytes_output 110688
    total_input_raw_key_bytes 5400
    total_input_raw_value_bytes 300000
    num_records_replaced 100
    is_manual_compaction 1

Test Plan: Developed a mega test in db_test which covers 20 variables in CompactionJobStats.

Reviewers: rven, igor, anthony, sdong

Reviewed By: sdong

Subscribers: tnovak, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D38463
2015-06-02 17:07:16 -07:00

228 lines
6.6 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/env.h"
#include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
namespace rocksdb {
#if ROCKSDB_USING_THREAD_STATUS
__thread ThreadStatusUpdater*
ThreadStatusUtil::thread_updater_local_cache_ = nullptr;
__thread bool ThreadStatusUtil::thread_updater_initialized_ = false;
void ThreadStatusUtil::SetThreadType(
const Env* env, ThreadStatus::ThreadType thread_type) {
if (!MaybeInitThreadLocalUpdater(env)) {
return;
}
assert(thread_updater_local_cache_);
thread_updater_local_cache_->SetThreadType(thread_type);
}
void ThreadStatusUtil::UnregisterThread() {
thread_updater_initialized_ = false;
if (thread_updater_local_cache_ != nullptr) {
thread_updater_local_cache_->UnregisterThread();
thread_updater_local_cache_ = nullptr;
}
}
uint64_t ThreadStatusUtil::GetThreadID() {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return 0;
}
return thread_updater_local_cache_->GetThreadID();
}
void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) {
return;
}
assert(thread_updater_local_cache_);
if (cfd != nullptr && cfd->options()->enable_thread_tracking) {
thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd);
} else {
// When cfd == nullptr or enable_thread_tracking == false, we set
// ColumnFamilyInfoKey to nullptr, which makes SetThreadOperation
// and SetThreadState become no-op.
thread_updater_local_cache_->SetColumnFamilyInfoKey(nullptr);
}
}
void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return;
}
if (op != ThreadStatus::OP_UNKNOWN) {
uint64_t current_time = Env::Default()->NowMicros();
thread_updater_local_cache_->SetOperationStartTime(current_time);
} else {
// TDOO(yhchiang): we could report the time when we set operation to
// OP_UNKNOWN once the whole instrumentation has been done.
thread_updater_local_cache_->SetOperationStartTime(0);
}
thread_updater_local_cache_->SetThreadOperation(op);
}
ThreadStatus::OperationStage ThreadStatusUtil::SetThreadOperationStage(
ThreadStatus::OperationStage stage) {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return ThreadStatus::STAGE_UNKNOWN;
}
return thread_updater_local_cache_->SetThreadOperationStage(stage);
}
void ThreadStatusUtil::SetThreadOperationProperty(
int code, uint64_t value) {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return;
}
thread_updater_local_cache_->SetThreadOperationProperty(
code, value);
}
void ThreadStatusUtil::IncreaseThreadOperationProperty(
int code, uint64_t delta) {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return;
}
thread_updater_local_cache_->IncreaseThreadOperationProperty(
code, delta);
}
void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return;
}
thread_updater_local_cache_->SetThreadState(state);
}
void ThreadStatusUtil::ResetThreadStatus() {
if (thread_updater_local_cache_ == nullptr) {
return;
}
thread_updater_local_cache_->ResetThreadStatus();
}
void ThreadStatusUtil::NewColumnFamilyInfo(
const DB* db, const ColumnFamilyData* cfd) {
if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) {
return;
}
assert(thread_updater_local_cache_);
if (thread_updater_local_cache_) {
thread_updater_local_cache_->NewColumnFamilyInfo(
db, db->GetName(), cfd, cfd->GetName());
}
}
void ThreadStatusUtil::EraseColumnFamilyInfo(
const ColumnFamilyData* cfd) {
if (thread_updater_local_cache_ == nullptr) {
return;
}
thread_updater_local_cache_->EraseColumnFamilyInfo(cfd);
}
void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) {
if (thread_updater_local_cache_ == nullptr) {
return;
}
thread_updater_local_cache_->EraseDatabaseInfo(db);
}
bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) {
if (!thread_updater_initialized_ && env != nullptr) {
thread_updater_initialized_ = true;
thread_updater_local_cache_ = env->GetThreadStatusUpdater();
}
return (thread_updater_local_cache_ != nullptr);
}
AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater(
ThreadStatus::OperationStage stage) {
prev_stage_ = ThreadStatusUtil::SetThreadOperationStage(stage);
}
AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() {
ThreadStatusUtil::SetThreadOperationStage(prev_stage_);
}
#else
ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr;
bool ThreadStatusUtil::thread_updater_initialized_ = false;
bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) {
return false;
}
uint64_t ThreadStatusUtil::GetThreadID() {
return 0;
}
void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
}
void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) {
}
void ThreadStatusUtil::SetThreadOperationProperty(
int code, uint64_t value) {
}
void ThreadStatusUtil::IncreaseThreadOperationProperty(
int code, uint64_t delta) {
}
void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) {
}
void ThreadStatusUtil::NewColumnFamilyInfo(
const DB* db, const ColumnFamilyData* cfd) {
}
void ThreadStatusUtil::EraseColumnFamilyInfo(
const ColumnFamilyData* cfd) {
}
void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) {
}
void ThreadStatusUtil::ResetThreadStatus() {
}
AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater(
ThreadStatus::OperationStage stage) {
}
AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() {
}
#endif // ROCKSDB_USING_THREAD_STATUS
} // namespace rocksdb