diff --git a/db/db_impl.cc b/db/db_impl.cc index fe55a42b9..10aa6599e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -285,7 +285,6 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { } DBImpl::~DBImpl() { - EraseThreadStatusDbInfo(); mutex_.Lock(); if (!shutting_down_.load(std::memory_order_acquire) && flush_on_destroy_) { @@ -316,6 +315,7 @@ DBImpl::~DBImpl() { while (bg_compaction_scheduled_ || bg_flush_scheduled_) { bg_cv_.Wait(); } + EraseThreadStatusDbInfo(); flush_scheduler_.Clear(); while (!flush_queue_.empty()) { @@ -1310,7 +1310,7 @@ void DBImpl::NotifyOnFlushCompleted( // go to L0 in the future. info.file_path = MakeTableFileName(db_options_.db_paths[0].path, file_number); - info.thread_id = ThreadStatusUtil::GetThreadID(); + info.thread_id = env_->GetThreadID(); info.job_id = job_id; info.triggered_writes_slowdown = triggered_writes_slowdown; info.triggered_writes_stop = triggered_writes_stop; @@ -1621,7 +1621,7 @@ void DBImpl::NotifyOnCompactionCompleted( CompactionJobInfo info; info.cf_name = cfd->GetName(); info.status = st; - info.thread_id = ThreadStatusUtil::GetThreadID(); + info.thread_id = env_->GetThreadID(); info.job_id = job_id; info.base_input_level = c->start_level(); info.output_level = c->output_level(); diff --git a/db/listener_test.cc b/db/listener_test.cc index 79db2235f..a5acabbaf 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -156,6 +156,8 @@ class TestCompactionListener : public EventListener { compacted_dbs_.push_back(db); ASSERT_GT(ci.input_files.size(), 0U); ASSERT_GT(ci.output_files.size(), 0U); + ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id); + ASSERT_GT(ci.thread_id, 0U); } std::vector compacted_dbs_; @@ -177,7 +179,9 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) { options.max_bytes_for_level_base = options.target_file_size_base * 2; options.max_bytes_for_level_multiplier = 2; options.compression = kNoCompression; +#if ROCKSDB_USING_THREAD_STATUS options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS options.level0_file_num_compaction_trigger = kNumL0Files; TestCompactionListener* listener = new TestCompactionListener(); @@ -211,6 +215,11 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) { // This simple Listener can only handle one flush at a time. class TestFlushListener : public EventListener { public: + explicit TestFlushListener(Env* env) : + slowdown_count(0), + stop_count(0), + db_closed(false), + env_(env) {} void OnTableFileCreated( const TableFileCreationInfo& info) override { // remember the info for later checking the FlushJobInfo. @@ -224,6 +233,25 @@ class TestFlushListener : public EventListener { ASSERT_GT(info.table_properties.raw_value_size, 0U); ASSERT_GT(info.table_properties.num_data_blocks, 0U); ASSERT_GT(info.table_properties.num_entries, 0U); + +#if ROCKSDB_USING_THREAD_STATUS + // Verify the id of the current thread that created this table + // file matches the id of any active flush or compaction thread. + uint64_t thread_id = env_->GetThreadID(); + std::vector thread_list; + ASSERT_OK(env_->GetThreadList(&thread_list)); + bool found_match = false; + for (auto thread_status : thread_list) { + if (thread_status.operation_type == ThreadStatus::OP_FLUSH || + thread_status.operation_type == ThreadStatus::OP_COMPACTION) { + if (thread_id == thread_status.thread_id) { + found_match = true; + break; + } + } + } + ASSERT_TRUE(found_match); +#endif // ROCKSDB_USING_THREAD_STATUS } void OnFlushCompleted( @@ -241,19 +269,29 @@ class TestFlushListener : public EventListener { ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name); ASSERT_EQ(prev_fc_info_.job_id, info.job_id); ASSERT_EQ(prev_fc_info_.file_path, info.file_path); + ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id); + ASSERT_GT(info.thread_id, 0U); } std::vector flushed_column_family_names_; std::vector flushed_dbs_; int slowdown_count; int stop_count; + bool db_closing; + std::atomic_bool db_closed; TableFileCreationInfo prev_fc_info_; + + protected: + Env* env_; }; TEST_F(EventListenerTest, OnSingleDBFlushTest) { Options options; options.write_buffer_size = 100000; - TestFlushListener* listener = new TestFlushListener(); +#if ROCKSDB_USING_THREAD_STATUS + options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS + TestFlushListener* listener = new TestFlushListener(options.env); options.listeners.emplace_back(listener); std::vector cf_names = { "pikachu", "ilya", "muromec", "dobrynia", @@ -284,7 +322,10 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) { TEST_F(EventListenerTest, MultiCF) { Options options; options.write_buffer_size = 100000; - TestFlushListener* listener = new TestFlushListener(); +#if ROCKSDB_USING_THREAD_STATUS + options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS + TestFlushListener* listener = new TestFlushListener(options.env); options.listeners.emplace_back(listener); std::vector cf_names = { "pikachu", "ilya", "muromec", "dobrynia", @@ -312,18 +353,21 @@ TEST_F(EventListenerTest, MultiCF) { } TEST_F(EventListenerTest, MultiDBMultiListeners) { + Options options; +#if ROCKSDB_USING_THREAD_STATUS + options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS std::vector listeners; const int kNumDBs = 5; const int kNumListeners = 10; for (int i = 0; i < kNumListeners; ++i) { - listeners.emplace_back(new TestFlushListener()); + listeners.emplace_back(new TestFlushListener(options.env)); } std::vector cf_names = { "pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}; - Options options; options.create_if_missing = true; for (int i = 0; i < kNumListeners; ++i) { options.listeners.emplace_back(listeners[i]); @@ -374,6 +418,7 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) { } } + for (auto handles : vec_handles) { for (auto h : handles) { delete h; @@ -389,7 +434,10 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) { TEST_F(EventListenerTest, DisableBGCompaction) { Options options; - TestFlushListener* listener = new TestFlushListener(); +#if ROCKSDB_USING_THREAD_STATUS + options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS + TestFlushListener* listener = new TestFlushListener(options.env); const int kSlowdownTrigger = 5; const int kStopTrigger = 10; options.level0_slowdown_writes_trigger = kSlowdownTrigger; @@ -409,6 +457,7 @@ TEST_F(EventListenerTest, DisableBGCompaction) { // keep writing until writes are forced to stop. for (int i = 0; static_cast(cf_meta.file_count) < kStopTrigger; ++i) { Put(1, ToString(i), std::string(100000, 'x'), wopts); + db_->Flush(FlushOptions()); db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); } ASSERT_GE(listener->slowdown_count, kStopTrigger - kSlowdownTrigger); diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index cc94d52af..bd9118ebf 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -164,6 +164,10 @@ class HdfsEnv : public Env { return (uint64_t)pthread_self(); } + virtual uint64_t GetThreadID() const override { + return HdfsEnv::gettid(); + } + private: std::string fsname_; // string of the form "hdfs://hostname:port/" hdfsFS fileSys_; // a single FileSystem object for all files @@ -360,6 +364,10 @@ class HdfsEnv : public Env { virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { } virtual std::string TimeToString(uint64_t number) override { return ""; } + + virtual uint64_t GetThreadID() const override { + return 0; + } }; } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index ceb598f80..f185f2b7f 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -17,12 +17,12 @@ #ifndef STORAGE_ROCKSDB_INCLUDE_ENV_H_ #define STORAGE_ROCKSDB_INCLUDE_ENV_H_ -#include -#include -#include -#include -#include #include +#include +#include +#include +#include +#include #include "rocksdb/status.h" #include "rocksdb/thread_status.h" @@ -320,6 +320,9 @@ class Env { return thread_status_updater_; } + // Returns the ID of the current thread. + virtual uint64_t GetThreadID() const; + protected: // The pointer to an internal structure that will update the // status of each thread. @@ -876,6 +879,10 @@ class EnvWrapper : public Env { return target_->GetThreadStatusUpdater(); } + uint64_t GetThreadID() const override { + return target_->GetThreadID(); + } + private: Env* target_; }; diff --git a/util/env.cc b/util/env.cc index 0695b551a..e044024de 100644 --- a/util/env.cc +++ b/util/env.cc @@ -9,7 +9,9 @@ #include "rocksdb/env.h" +#include #include + #include "rocksdb/options.h" #include "util/arena.h" #include "util/autovector.h" @@ -19,6 +21,11 @@ namespace rocksdb { Env::~Env() { } +uint64_t Env::GetThreadID() const { + std::hash hasher; + return hasher(std::this_thread::get_id()); +} + SequentialFile::~SequentialFile() { } diff --git a/util/env_posix.cc b/util/env_posix.cc index 1be1d68b8..b3c63c8e8 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1435,6 +1435,10 @@ class PosixEnv : public Env { return gettid(tid); } + virtual uint64_t GetThreadID() const { + return gettid(pthread_self()); + } + virtual Status NewLogger(const std::string& fname, shared_ptr* result) override { FILE* f; diff --git a/util/thread_status_updater.cc b/util/thread_status_updater.cc index 3127e491e..2dc15b429 100644 --- a/util/thread_status_updater.cc +++ b/util/thread_status_updater.cc @@ -15,11 +15,6 @@ namespace rocksdb { __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; -uint64_t ThreadStatusUpdater::GetThreadID() { - auto* data = InitAndGet(); - return data->thread_id; -} - void ThreadStatusUpdater::UnregisterThread() { if (thread_status_data_ != nullptr) { std::lock_guard lck(thread_list_mutex_); @@ -29,6 +24,11 @@ void ThreadStatusUpdater::UnregisterThread() { } } +void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) { + auto* data = InitAndGet(); + data->thread_id.store(thread_id, std::memory_order_relaxed); +} + void ThreadStatusUpdater::SetThreadType( ThreadStatus::ThreadType ttype) { auto* data = InitAndGet(); @@ -173,6 +173,8 @@ Status ThreadStatusUpdater::GetThreadList( std::lock_guard lck(thread_list_mutex_); for (auto* thread_data : thread_data_set_) { assert(thread_data); + auto thread_id = thread_data->thread_id.load( + std::memory_order_relaxed); auto thread_type = thread_data->thread_type.load( std::memory_order_relaxed); // Since any change to cf_info_map requires thread_list_mutex, @@ -181,7 +183,6 @@ Status ThreadStatusUpdater::GetThreadList( auto cf_key = thread_data->cf_key.load( std::memory_order_relaxed); auto iter = cf_info_map_.find(cf_key); - assert(cf_key == 0 || iter != cf_info_map_.end()); auto* cf_info = iter != cf_info_map_.end() ? iter->second.get() : nullptr; const std::string* db_name = nullptr; @@ -211,7 +212,7 @@ Status ThreadStatusUpdater::GetThreadList( } } thread_list->emplace_back( - thread_data->thread_id, thread_type, + thread_id, thread_type, db_name ? *db_name : "", cf_name ? *cf_name : "", op_type, op_elapsed_micros, op_stage, op_props, @@ -224,8 +225,6 @@ Status ThreadStatusUpdater::GetThreadList( ThreadStatusData* ThreadStatusUpdater::InitAndGet() { if (UNLIKELY(thread_status_data_ == nullptr)) { thread_status_data_ = new ThreadStatusData(); - thread_status_data_->thread_id = reinterpret_cast( - thread_status_data_); std::lock_guard lck(thread_list_mutex_); thread_data_set_.insert(thread_status_data_); } @@ -297,8 +296,7 @@ void ThreadStatusUpdater::UnregisterThread() { void ThreadStatusUpdater::ResetThreadStatus() { } -uint64_t ThreadStatusUpdater::GetThreadID() { - return 0; +void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) { } void ThreadStatusUpdater::SetThreadType( diff --git a/util/thread_status_updater.h b/util/thread_status_updater.h index 6f7c4e384..5e7c2b894 100644 --- a/util/thread_status_updater.h +++ b/util/thread_status_updater.h @@ -64,7 +64,8 @@ struct ConstantColumnFamilyInfo { // status of a thread using a set of atomic pointers. struct ThreadStatusData { #if ROCKSDB_USING_THREAD_STATUS - explicit ThreadStatusData() : thread_id(0), enable_tracking(false) { + explicit ThreadStatusData() : enable_tracking(false) { + thread_id.store(0); thread_type.store(ThreadStatus::USER); cf_key.store(nullptr); operation_type.store(ThreadStatus::OP_UNKNOWN); @@ -72,8 +73,6 @@ struct ThreadStatusData { state_type.store(ThreadStatus::STATE_UNKNOWN); } - uint64_t thread_id; - // A flag to indicate whether the thread tracking is enabled // in the current thread. This value will be updated based on whether // the associated Options::enable_thread_tracking is set to true @@ -83,6 +82,7 @@ struct ThreadStatusData { // will be no-op. bool enable_tracking; + std::atomic thread_id; std::atomic thread_type; std::atomic cf_key; std::atomic operation_type; @@ -115,7 +115,8 @@ class ThreadStatusUpdater { // ColumnFamilyInfoKey, ThreadOperation, and ThreadState. void ResetThreadStatus(); - uint64_t GetThreadID(); + // Set the id of the current thread. + void SetThreadID(uint64_t thread_id); // Set the thread type of the current thread. void SetThreadType(ThreadStatus::ThreadType ttype); diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc index 7907669a7..116950e13 100644 --- a/util/thread_status_util.cc +++ b/util/thread_status_util.cc @@ -21,6 +21,7 @@ void ThreadStatusUtil::SetThreadType( return; } assert(thread_updater_local_cache_); + thread_updater_local_cache_->SetThreadID(env->GetThreadID()); thread_updater_local_cache_->SetThreadType(thread_type); } @@ -32,16 +33,6 @@ void ThreadStatusUtil::UnregisterThread() { } } -uint64_t ThreadStatusUtil::GetThreadID() { - if (thread_updater_local_cache_ == nullptr) { - // thread_updater_local_cache_ must be set in SetColumnFamily - // or other ThreadStatusUtil functions. - return 0; - } - return thread_updater_local_cache_->GetThreadID(); -} - - void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) { return; @@ -180,10 +171,6 @@ bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) { return false; } -uint64_t ThreadStatusUtil::GetThreadID() { - return 0; -} - void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { } diff --git a/util/thread_status_util.h b/util/thread_status_util.h index 2e52461a0..ba0238d58 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -27,8 +27,6 @@ class ColumnFamilyData; // all function calls to ThreadStatusUtil will be no-op. class ThreadStatusUtil { public: - static uint64_t GetThreadID(); - // Set the thread type of the current thread. static void SetThreadType( const Env* env, ThreadStatus::ThreadType thread_type);