diff --git a/HISTORY.md b/HISTORY.md index 6f5d4d9d5..a7713c24c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,7 @@ * Added new option -- verify_checksums_in_compaction * Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership) Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly) +* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools ### New Features * If we find one truncated record at the end of the MANIFEST or WAL files, diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 17d8fcb2b..303cd81cf 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -110,6 +110,11 @@ class HdfsEnv : public Env { virtual void WaitForJoin() { posixEnv->WaitForJoin(); } + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const + override { + return posixEnv->GetThreadPoolQueueLen(pri); + } + virtual Status GetTestDirectory(std::string* path) { return posixEnv->GetTestDirectory(path); } @@ -292,6 +297,10 @@ class HdfsEnv : public Env { virtual void WaitForJoin() {} + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { + return 0; + } + virtual Status GetTestDirectory(std::string* path) {return notsup;} virtual uint64_t NowMicros() {return 0;} diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index c96a659fe..16eb16440 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -210,6 +210,11 @@ class Env { // Wait for all threads started by StartThread to terminate. virtual void WaitForJoin() = 0; + // Get thread pool queue length for specific thrad pool. + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { + return 0; + } + // *path is set to a temporary directory that can be used for testing. It may // or many not have just been created. The directory may or may not differ // between runs of the same process, but subsequent calls will return the @@ -702,6 +707,9 @@ class EnvWrapper : public Env { return target_->StartThread(f, a); } void WaitForJoin() { return target_->WaitForJoin(); } + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { + return target_->GetThreadPoolQueueLen(pri); + } virtual Status GetTestDirectory(std::string* path) { return target_->GetTestDirectory(path); } diff --git a/util/env_posix.cc b/util/env_posix.cc index e019d6af0..89d8df68d 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1206,6 +1206,8 @@ class PosixEnv : public Env { virtual void WaitForJoin(); + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; + virtual Status GetTestDirectory(std::string* result) { const char* env = getenv("TEST_TMPDIR"); if (env && env[0] != '\0') { @@ -1370,12 +1372,12 @@ class PosixEnv : public Env { class ThreadPool { public: - - ThreadPool() : - total_threads_limit_(1), - bgthreads_(0), - queue_(), - exit_all_threads_(false) { + ThreadPool() + : total_threads_limit_(1), + bgthreads_(0), + queue_(), + queue_len_(0), + exit_all_threads_(false) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); } @@ -1405,6 +1407,7 @@ class PosixEnv : public Env { void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); + queue_len_.store(queue_.size(), std::memory_order_relaxed); PthreadCall("unlock", pthread_mutex_unlock(&mu_)); (*function)(arg); @@ -1459,6 +1462,7 @@ class PosixEnv : public Env { queue_.push_back(BGItem()); queue_.back().function = function; queue_.back().arg = arg; + queue_len_.store(queue_.size(), std::memory_order_relaxed); // always wake up at least one waiting thread. PthreadCall("signal", pthread_cond_signal(&bgsignal_)); @@ -1466,6 +1470,10 @@ class PosixEnv : public Env { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } + unsigned int GetQueueLen() const { + return queue_len_.load(std::memory_order_relaxed); + } + private: // Entry per Schedule() call struct BGItem { void* arg; void (*function)(void*); }; @@ -1476,6 +1484,7 @@ class PosixEnv : public Env { int total_threads_limit_; std::vector bgthreads_; BGQueue queue_; + std::atomic_uint queue_len_; // Queue length. Used for stats reporting bool exit_all_threads_; }; @@ -1498,6 +1507,11 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) { thread_pools_[pri].Schedule(function, arg); } +unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { + assert(pri >= Priority::LOW && pri <= Priority::HIGH); + return thread_pools_[pri].GetQueueLen(); +} + namespace { struct StartThreadState { void (*user_function)(void*); diff --git a/util/env_test.cc b/util/env_test.cc index a442e3a5c..e17027a39 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -172,17 +172,30 @@ TEST(EnvPosixTest, TwoPools) { env_->SetBackgroundThreads(kLowPoolSize); env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + // schedule same number of jobs in each pool for (int i = 0; i < kJobs; i++) { env_->Schedule(&CB::Run, &low_pool_job); env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); } + // Wait a short while for the jobs to be dispatched. + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(kJobs - kLowPoolSize, env_->GetThreadPoolQueueLen()); + ASSERT_EQ(kJobs - kLowPoolSize, + env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(kJobs - kHighPoolSize, + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); // wait for all jobs to finish while (low_pool_job.NumFinished() < kJobs || high_pool_job.NumFinished() < kJobs) { env_->SleepForMicroseconds(kDelayMicros); } + + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); } bool IsSingleVarint(const std::string& s) {