From 91da17d38340f289c7246459d684614e1f0c6151 Mon Sep 17 00:00:00 2001 From: bowang Date: Thu, 12 May 2022 22:02:21 -0700 Subject: [PATCH] add unit test for FlushJob write. --- db/compaction/compaction_job.cc | 2 +- db/flush_job.cc | 1 - db/flush_job.h | 2 + db/flush_job_test.cc | 76 ++++++++++++++++++++++++++++++--- 4 files changed, 72 insertions(+), 9 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 48a668f30..812275df7 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -2491,7 +2491,7 @@ Env::IOPriority CompactionJob::GetRateLimiterPriority( return Env::IO_LOW; } else { - if (write_controller->NeedsDelay()|| (write_controller->IsStopped()) { + if (write_controller->NeedsDelay() || write_controller->IsStopped()) { return Env::IO_USER; } else if (write_controller->NeedSpeedupCompaction()) { return Env::IO_HIGH; diff --git a/db/flush_job.cc b/db/flush_job.cc index db1f401fd..38048b4e9 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -1082,7 +1082,6 @@ Env::IOPriority FlushJob::GetRateLimiterPriority( return Env::IO_USER; } } -} #endif // !ROCKSDB_LITE diff --git a/db/flush_job.h b/db/flush_job.h index d0e77dff3..8f5f12b2a 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -94,6 +94,8 @@ class FlushJob { #endif // !ROCKSDB_LITE private: + friend class FlushJobTest_GetRateLimiterPriority_Test; + void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index d71c0dfd6..69a3e97a9 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -528,6 +528,75 @@ TEST_F(FlushJobTest, Snapshots) { job_context.Clean(); } +TEST_F(FlushJobTest, GetRateLimiterPriority) { + // Prepare a FlushJob that flush MemTables of Single Column Family. + const size_t num_mems = 2; + const size_t num_mems_to_flush = 1; + const size_t num_keys_per_table = 100; + JobContext job_context(0); + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + std::vector memtable_ids; + std::vector new_mems; + for (size_t i = 0; i != num_mems; ++i) { + MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + mem->SetID(i); + mem->Ref(); + new_mems.emplace_back(mem); + memtable_ids.push_back(mem->GetID()); + + for (size_t j = 0; j < num_keys_per_table; ++j) { + std::string key(std::to_string(j + i * num_keys_per_table)); + std::string value("value" + key); + ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, + key, value, nullptr /* kv_prot_info */)); + } + } + + autovector to_delete; + for (auto mem : new_mems) { + cfd->imm()->Add(mem, &to_delete); + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + + assert(memtable_ids.size() == num_mems); + uint64_t smallest_memtable_id = memtable_ids.front(); + uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/); + + // When the state from WriteController is normal. + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), + Env::IO_HIGH); + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), + Env::IO_USER); + + WriteController* write_controller = + flush_job.versions_->GetColumnFamilySet()->write_controller(); + // When the state from WriteController is Delayed. + std::unique_ptr delay_token = + write_controller->GetDelayToken(1000000); + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), + Env::IO_USER); + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), + Env::IO_USER); + // When the state from WriteController is Stopped. + std::unique_ptr stop_token = + write_controller->GetStopToken(); + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), + Env::IO_USER); + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), + Env::IO_USER); +} + class FlushJobTimestampTest : public FlushJobTestBase { public: FlushJobTimestampTest() @@ -658,14 +727,7 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) { ASSERT_TRUE(to_delete.empty()); } -TEST_F(FlushJobTimestampTest, GetRateLimiterPriority) { - // When the state from WriteController is normal. - ASSERT_EQ(GetRateLimiterPriority(RateLimiter::OpType::kWrite), Env::IO_HIGH); - // When the state from WriteController is Delayed. - - // When the state from WriteController is Stopped. -} } // namespace ROCKSDB_NAMESPACE