Fix the deadlock issue in ThreadStatusSingleCompaction.
Summary: Fix the deadlock issue in ThreadStatusSingleCompaction. In the previous version of ThreadStatusSingleCompaction, the compaction thread will wait for a SYNC_POINT while its db_mutex is held. However, if the test hasn't finished its Put cycle while a compaction is running, a deadlock will happen in the test. Test Plan: export ROCKSDB_TESTS=ThreadStatus ./db_test Reviewers: sdong, igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D35001
This commit is contained in:
parent
b16ead531d
commit
8c12426c93
@ -229,12 +229,10 @@ CompactionJob::CompactionJob(
|
|||||||
ThreadStatusUtil::SetColumnFamily(
|
ThreadStatusUtil::SetColumnFamily(
|
||||||
compact_->compaction->column_family_data());
|
compact_->compaction->column_family_data());
|
||||||
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
|
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
|
||||||
TEST_SYNC_POINT("CompactionJob::CompationJob()");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CompactionJob::~CompactionJob() {
|
CompactionJob::~CompactionJob() {
|
||||||
assert(compact_ == nullptr);
|
assert(compact_ == nullptr);
|
||||||
TEST_SYNC_POINT("CompactionJob::~CompactionJob()");
|
|
||||||
ThreadStatusUtil::ResetThreadStatus();
|
ThreadStatusUtil::ResetThreadStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -290,6 +288,7 @@ void CompactionJob::Prepare() {
|
|||||||
Status CompactionJob::Run() {
|
Status CompactionJob::Run() {
|
||||||
AutoThreadOperationStageUpdater stage_updater(
|
AutoThreadOperationStageUpdater stage_updater(
|
||||||
ThreadStatus::STAGE_COMPACTION_RUN);
|
ThreadStatus::STAGE_COMPACTION_RUN);
|
||||||
|
TEST_SYNC_POINT("CompactionJob::Run():Start");
|
||||||
log_buffer_->FlushBufferToLog();
|
log_buffer_->FlushBufferToLog();
|
||||||
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
|
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
|
||||||
|
|
||||||
@ -481,6 +480,7 @@ Status CompactionJob::Run() {
|
|||||||
RecordCompactionIOStats();
|
RecordCompactionIOStats();
|
||||||
|
|
||||||
LogFlush(db_options_.info_log);
|
LogFlush(db_options_.info_log);
|
||||||
|
TEST_SYNC_POINT("CompactionJob::Run():End");
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10165,20 +10165,23 @@ TEST(DBTest, ThreadStatusSingleCompaction) {
|
|||||||
options.level0_file_num_compaction_trigger = kNumL0Files;
|
options.level0_file_num_compaction_trigger = kNumL0Files;
|
||||||
|
|
||||||
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||||
{"CompactionJob::CompationJob()",
|
{"CompactionJob::Run():Start",
|
||||||
"DBTest::ThreadStatusSingleCompaction:1"},
|
"DBTest::ThreadStatusSingleCompaction:1"},
|
||||||
{"DBTest::ThreadStatusSingleCompaction:2",
|
{"DBTest::ThreadStatusSingleCompaction:2",
|
||||||
"CompactionJob::~CompactionJob()"},
|
"CompactionJob::Run():End"},
|
||||||
});
|
});
|
||||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
for (int tests = 0; tests < 2; ++tests) {
|
for (int tests = 0; tests < 2; ++tests) {
|
||||||
TryReopen(options);
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
for (int key = kEntriesPerBuffer * kNumL0Files; key >= 0; --key) {
|
for (int key = kEntriesPerBuffer * kNumL0Files; key >= 0; --key) {
|
||||||
ASSERT_OK(Put(ToString(key), RandomString(&rnd, kTestValueSize)));
|
ASSERT_OK(Put(ToString(key), RandomString(&rnd, kTestValueSize)));
|
||||||
}
|
}
|
||||||
|
Flush();
|
||||||
|
ASSERT_GE(NumTableFilesAtLevel(0),
|
||||||
|
options.level0_file_num_compaction_trigger);
|
||||||
|
|
||||||
// wait for compaction to be scheduled
|
// wait for compaction to be scheduled
|
||||||
env_->SleepForMicroseconds(250000);
|
env_->SleepForMicroseconds(250000);
|
||||||
|
Loading…
Reference in New Issue
Block a user