add unit test for FlushJob write.

This commit is contained in:
bowang 2022-05-12 22:02:21 -07:00
parent 3fbf8186fb
commit 91da17d383
4 changed files with 72 additions and 9 deletions

View File

@ -2491,7 +2491,7 @@ Env::IOPriority CompactionJob::GetRateLimiterPriority(
return Env::IO_LOW; return Env::IO_LOW;
} else { } else {
if (write_controller->NeedsDelay()|| (write_controller->IsStopped()) { if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
return Env::IO_USER; return Env::IO_USER;
} else if (write_controller->NeedSpeedupCompaction()) { } else if (write_controller->NeedSpeedupCompaction()) {
return Env::IO_HIGH; return Env::IO_HIGH;

View File

@ -1082,7 +1082,6 @@ Env::IOPriority FlushJob::GetRateLimiterPriority(
return Env::IO_USER; return Env::IO_USER;
} }
} }
}
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

View File

@ -94,6 +94,8 @@ class FlushJob {
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
private: private:
friend class FlushJobTest_GetRateLimiterPriority_Test;
void ReportStartedFlush(); void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems); void ReportFlushInputSize(const autovector<MemTable*>& mems);
void RecordFlushIOStats(); void RecordFlushIOStats();

View File

@ -528,6 +528,75 @@ TEST_F(FlushJobTest, Snapshots) {
job_context.Clean(); job_context.Clean();
} }
TEST_F(FlushJobTest, GetRateLimiterPriority) {
// Prepare a FlushJob that flush MemTables of Single Column Family.
const size_t num_mems = 2;
const size_t num_mems_to_flush = 1;
const size_t num_keys_per_table = 100;
JobContext job_context(0);
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
std::vector<uint64_t> memtable_ids;
std::vector<MemTable*> new_mems;
for (size_t i = 0; i != num_mems; ++i) {
MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
kMaxSequenceNumber);
mem->SetID(i);
mem->Ref();
new_mems.emplace_back(mem);
memtable_ids.push_back(mem->GetID());
for (size_t j = 0; j < num_keys_per_table; ++j) {
std::string key(std::to_string(j + i * num_keys_per_table));
std::string value("value" + key);
ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue,
key, value, nullptr /* kv_prot_info */));
}
}
autovector<MemTable*> to_delete;
for (auto mem : new_mems) {
cfd->imm()->Add(mem, &to_delete);
}
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
assert(memtable_ids.size() == num_mems);
uint64_t smallest_memtable_id = memtable_ids.front();
uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_,
versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);
// When the state from WriteController is normal.
ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite),
Env::IO_HIGH);
ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead),
Env::IO_USER);
WriteController* write_controller =
flush_job.versions_->GetColumnFamilySet()->write_controller();
// When the state from WriteController is Delayed.
std::unique_ptr<WriteControllerToken> delay_token =
write_controller->GetDelayToken(1000000);
ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite),
Env::IO_USER);
ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead),
Env::IO_USER);
// When the state from WriteController is Stopped.
std::unique_ptr<WriteControllerToken> stop_token =
write_controller->GetStopToken();
ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite),
Env::IO_USER);
ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead),
Env::IO_USER);
}
class FlushJobTimestampTest : public FlushJobTestBase { class FlushJobTimestampTest : public FlushJobTestBase {
public: public:
FlushJobTimestampTest() FlushJobTimestampTest()
@ -658,14 +727,7 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) {
ASSERT_TRUE(to_delete.empty()); ASSERT_TRUE(to_delete.empty());
} }
TEST_F(FlushJobTimestampTest, GetRateLimiterPriority) {
// When the state from WriteController is normal.
ASSERT_EQ(GetRateLimiterPriority(RateLimiter::OpType::kWrite), Env::IO_HIGH);
// When the state from WriteController is Delayed.
// When the state from WriteController is Stopped.
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE