diff --git a/db/compaction_job.cc b/db/compaction_job.cc index f3b4834be..00c7b52f9 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -275,9 +275,7 @@ Status CompactionJob::Run() { ColumnFamilyData* cfd = compact_->compaction->column_family_data(); ThreadStatusUtil::SetColumnFamily(cfd); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); -#ifndef NDEBUG - ThreadStatusUtil::TEST_OperationDelay(ThreadStatus::OP_COMPACTION); -#endif + TEST_SYNC_POINT("CompactionJob::Run:Start"); const uint64_t start_micros = env_->NowMicros(); std::unique_ptr input( @@ -467,6 +465,7 @@ Status CompactionJob::Run() { RecordCompactionIOStats(); LogFlush(db_options_.info_log); + TEST_SYNC_POINT("CompactionJob::Run:End"); ThreadStatusUtil::ResetThreadStatus(); return status; } diff --git a/db/db_impl.cc b/db/db_impl.cc index f51d53403..da304e505 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2216,9 +2216,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, // TODO(yhchiang): add op details for showing trivial-move. ThreadStatusUtil::SetColumnFamily(c->column_family_data()); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); -#ifndef NDEBUG - ThreadStatusUtil::TEST_OperationDelay(ThreadStatus::OP_COMPACTION); -#endif // Move file to next level assert(c->num_input_files(0) == 1); diff --git a/db/db_test.cc b/db/db_test.cc index 04d81d564..b71b859eb 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -9653,6 +9653,21 @@ TEST(DBTest, DynamicMemtableOptions) { } #if ROCKSDB_USING_THREAD_STATUS +namespace { +void VerifyOperationCount(Env* env, ThreadStatus::OperationType op_type, + int expected_count) { + int op_count = 0; + std::vector thread_list; + ASSERT_OK(env->GetThreadList(&thread_list)); + for (auto thread : thread_list) { + if (thread.operation_type == op_type) { + op_count++; + } + } + ASSERT_EQ(op_count, expected_count); +} +} // namespace + TEST(DBTest, GetThreadStatus) { Options options; options.env = env_; @@ -9723,6 +9738,38 @@ TEST(DBTest, DisableThreadStatus) { handles_, false); } +TEST(DBTest, ThreadStatusFlush) { + Options options; + options.env = env_; + options.write_buffer_size = 100000; // Small write buffer + options.enable_thread_tracking = true; + options = CurrentOptions(options); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"FlushJob::Run:Start", "DBTest::ThreadStatusFlush:1"}, + {"DBTest::ThreadStatusFlush:2", "FlushJob::Run:End"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + CreateAndReopenWithCF({"pikachu"}, options); + VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0); + + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_EQ("v1", Get(1, "foo")); + VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0); + + Put(1, "k1", std::string(100000, 'x')); // Fill memtable + VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0); + Put(1, "k2", std::string(100000, 'y')); // Trigger flush + // wait for flush to be scheduled + env_->SleepForMicroseconds(250000); + TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1"); + VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 1); + TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2"); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST(DBTest, ThreadStatusSingleCompaction) { const int kTestKeySize = 16; const int kTestValueSize = 984; @@ -9741,14 +9788,15 @@ TEST(DBTest, ThreadStatusSingleCompaction) { options.enable_thread_tracking = true; const int kNumL0Files = 4; options.level0_file_num_compaction_trigger = kNumL0Files; + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"CompactionJob::Run:Start", "DBTest::ThreadStatusSingleCompaction:1"}, + {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run:End"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + for (int tests = 0; tests < 2; ++tests) { TryReopen(options); - // Each compaction will run at least 2 seconds, which allows - // the test to capture the status of compaction with fewer - // false alarm. - const int kCompactionDelayMicro = 2000000; - ThreadStatusUtil::TEST_SetOperationDelay( - ThreadStatus::OP_COMPACTION, kCompactionDelayMicro); Random rnd(301); for (int key = kEntriesPerBuffer * kNumL0Files; key >= 0; --key) { @@ -9756,33 +9804,22 @@ TEST(DBTest, ThreadStatusSingleCompaction) { } // wait for compaction to be scheduled - env_->SleepForMicroseconds(500000); - - // check how many threads are doing compaction using GetThreadList - std::vector thread_list; - Status s = env_->GetThreadList(&thread_list); - ASSERT_OK(s); - int compaction_count = 0; - for (auto thread : thread_list) { - if (thread.operation_type == ThreadStatus::OP_COMPACTION) { - compaction_count++; - } - } + env_->SleepForMicroseconds(250000); + TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:1"); if (options.enable_thread_tracking) { // expecting one single L0 to L1 compaction - ASSERT_EQ(compaction_count, 1); + VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 1); } else { // If thread tracking is not enabled, compaction count should be 0. - ASSERT_EQ(compaction_count, 0); + VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0); } - - ThreadStatusUtil::TEST_SetOperationDelay( - ThreadStatus::OP_COMPACTION, 0); + TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2"); // repeat the test with disabling thread tracking. options.enable_thread_tracking = false; } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } TEST(DBTest, ThreadStatusMultipleCompaction) { @@ -9812,53 +9849,58 @@ TEST(DBTest, ThreadStatusMultipleCompaction) { options.max_bytes_for_level_multiplier = 2; options.max_background_compactions = kLowPriCount; - for (int tests = 0; tests < 2; ++tests) { - TryReopen(options); - Random rnd(301); + TryReopen(options); + Random rnd(301); - int max_compaction_count = 0; - std::vector thread_list; - const int kCompactionDelayMicro = 20000; - ThreadStatusUtil::TEST_SetOperationDelay( - ThreadStatus::OP_COMPACTION, kCompactionDelayMicro); + std::vector thread_list; + // Delay both flush and compaction + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"FlushJob::Run:Start", + "CompactionJob::Run:Start"}, + {"CompactionJob::Run:Start", + "DBTest::ThreadStatusMultipleCompaction:GetThreadList"}, + {"DBTest::ThreadStatusMultipleCompaction:VerifyStatus", + "CompactionJob::Run:End"}, + {"CompactionJob::Run:End", + "FlushJob::Run:End"}}); - // Make rocksdb busy - int key = 0; - for (int file = 0; file < 64 * kNumL0Files; ++file) { - for (int k = 0; k < kEntriesPerBuffer; ++k) { - ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize))); - } - - // check how many threads are doing compaction using GetThreadList - int compaction_count = 0; - Status s = env_->GetThreadList(&thread_list); - for (auto thread : thread_list) { - if (thread.operation_type == ThreadStatus::OP_COMPACTION) { - compaction_count++; - } - } - - // Record the max number of compactions at a time. - if (max_compaction_count < compaction_count) { - max_compaction_count = compaction_count; - } + // Make rocksdb busy + int key = 0; + int max_operation_count[ThreadStatus::NUM_OP_TYPES] = {0}; + for (int file = 0; file < 64 * kNumL0Files; ++file) { + for (int k = 0; k < kEntriesPerBuffer; ++k) { + ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize))); } - if (options.enable_thread_tracking) { - // Expect rocksdb to at least utilize 60% of the compaction threads. - ASSERT_GE(1.0 * max_compaction_count, - 0.6 * options.max_background_compactions); - } else { - // If thread tracking is not enabled, compaction count should be 0. - ASSERT_EQ(max_compaction_count, 0); + // check how many threads are doing compaction using GetThreadList + int operation_count[ThreadStatus::NUM_OP_TYPES] = {0}; + TEST_SYNC_POINT( + "DBTest::ThreadStatusMultipleCompaction:GetThreadList"); + Status s = env_->GetThreadList(&thread_list); + TEST_SYNC_POINT( + "DBTest::ThreadStatusMultipleCompaction:VerifyStatus"); + for (auto thread : thread_list) { + operation_count[thread.operation_type]++; } - // repeat the test with disabling thread tracking. - options.enable_thread_tracking = false; + // Record the max number of compactions at a time. + for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) { + if (max_operation_count[i] < operation_count[i]) { + max_operation_count[i] = operation_count[i]; + } + } + // Speed up the test + if (max_operation_count[ThreadStatus::OP_FLUSH] > 1 && + max_operation_count[ThreadStatus::OP_COMPACTION] > + 0.6 * options.max_background_compactions) { + break; + } } - ThreadStatusUtil::TEST_SetOperationDelay( - ThreadStatus::OP_COMPACTION, 0); + ASSERT_GE(max_operation_count[ThreadStatus::OP_FLUSH], 1); + // Expect rocksdb to at least utilize 60% of the compaction threads. + ASSERT_GE(1.0 * max_operation_count[ThreadStatus::OP_COMPACTION], + 0.6 * options.max_background_compactions); } #endif // ROCKSDB_USING_THREAD_STATUS diff --git a/db/flush_job.cc b/db/flush_job.cc index fcacbf3bd..0f8dc58de 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -48,6 +48,7 @@ #include "util/iostats_context_imp.h" #include "util/stop_watch.h" #include "util/sync_point.h" +#include "util/thread_status_util.h" namespace rocksdb { @@ -88,6 +89,11 @@ Status FlushJob::Run(uint64_t* file_number) { return Status::OK(); } + // Update the thread status to indicate flush. + ThreadStatusUtil::SetColumnFamily(cfd_); + ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH); + TEST_SYNC_POINT("FlushJob::Run:Start"); + // entries mems are (implicitly) sorted in ascending order by their created // time. We will use the first memtable's `edit` to keep the meta info for // this flush. @@ -120,6 +126,9 @@ Status FlushJob::Run(uint64_t* file_number) { if (s.ok() && file_number != nullptr) { *file_number = fn; } + + TEST_SYNC_POINT("FlushJob::Run:End"); + ThreadStatusUtil::ResetThreadStatus(); return s; } diff --git a/util/thread_status_util.h b/util/thread_status_util.h index 8428d492c..0e9d49db8 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -60,16 +60,6 @@ class ThreadStatusUtil { static void ResetThreadStatus(); -#ifndef NDEBUG - static void TEST_SetOperationDelay( - const ThreadStatus::OperationType operation, int micro); - static void TEST_OperationDelay( - const ThreadStatus::OperationType operation); - static void TEST_SetStateDelay( - const ThreadStatus::StateType state, int micro); - static void TEST_StateDelay(const ThreadStatus::StateType state); -#endif - protected: // Initialize the thread-local ThreadStatusUpdater when it finds // the cached value is nullptr. Returns true if it has cached diff --git a/util/thread_status_util_debug.cc b/util/thread_status_util_debug.cc deleted file mode 100644 index 5a86af26a..000000000 --- a/util/thread_status_util_debug.cc +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include - -#include "rocksdb/env.h" -#include "util/thread_status_updater.h" -#include "util/thread_status_util.h" - -namespace rocksdb { - -#ifndef NDEBUG -// the delay for debugging purpose. -static std::atomic operations_delay[ThreadStatus::NUM_OP_TYPES]; -static std::atomic states_delay[ThreadStatus::NUM_STATE_TYPES]; - -void ThreadStatusUtil::TEST_SetStateDelay( - const ThreadStatus::StateType state, int micro) { - states_delay[state].store(micro, std::memory_order_relaxed); -} - -void ThreadStatusUtil::TEST_StateDelay( - const ThreadStatus::StateType state) { - Env::Default()->SleepForMicroseconds( - states_delay[state].load(std::memory_order_relaxed)); -} - -void ThreadStatusUtil::TEST_SetOperationDelay( - const ThreadStatus::OperationType operation, int micro) { - operations_delay[operation].store(micro, std::memory_order_relaxed); -} - -void ThreadStatusUtil::TEST_OperationDelay( - const ThreadStatus::OperationType operation) { - Env::Default()->SleepForMicroseconds( - operations_delay[operation].load(std::memory_order_relaxed)); -} -#endif // !NDEBUG - -} // namespace rocksdb