Fix conflict between AddFile() and CompactRange()
Summary: Fix the conflict bug between AddFile() and CompactRange() by - Make sure that no AddFile calls are running when asking CompactionPicker to pick compaction for manual compaction - If AddFile() run after we pick the compaction for the manual compaction it will be aware of it since we will add the manual compaction to running_compactions_ after picking it This will solve these 2 scenarios - If AddFile() is running, we will wait for it to finish before we pick a compaction for the manual compaction - If we already picked a manual compaction and then AddFile() started ... we ensure that it never ingest a file in a level that will overlap with the manual compaction Test Plan: unit tests Reviewers: sdong Reviewed By: sdong Subscribers: andrewkr, yoshinorim, jkedgar, dhruba Differential Revision: https://reviews.facebook.net/D64449
This commit is contained in:
parent
f0b881629f
commit
24c3b2b21e
@ -342,7 +342,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
||||
has_unpersisted_data_(false),
|
||||
env_options_(db_options_),
|
||||
num_running_addfile_(0),
|
||||
addfile_cv_(&mutex_),
|
||||
#ifndef ROCKSDB_LITE
|
||||
wal_manager_(db_options_, env_options_),
|
||||
#endif // ROCKSDB_LITE
|
||||
@ -1996,7 +1995,6 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
|
||||
int max_level_with_files = 0;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
WaitForAddFile();
|
||||
Version* base = cfd->current();
|
||||
for (int level = 1; level < base->storage_info()->num_non_empty_levels();
|
||||
level++) {
|
||||
@ -2707,6 +2705,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
manual.end = &end_storage;
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
|
||||
TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
|
||||
// When a manual compaction arrives, temporarily disable scheduling of
|
||||
@ -2775,6 +2775,10 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
ca->m = &manual;
|
||||
manual.incomplete = false;
|
||||
bg_compaction_scheduled_++;
|
||||
// manual.compaction will be added to running_compactions_ and erased
|
||||
// inside BackgroundCompaction() but we need to put it now since we
|
||||
// will unlock the mutex.
|
||||
running_compactions_.insert(manual.compaction);
|
||||
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
||||
&DBImpl::UnscheduleCallback);
|
||||
scheduled = true;
|
||||
@ -3613,6 +3617,11 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
|
||||
}
|
||||
|
||||
bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
|
||||
if (num_running_addfile_ > 0) {
|
||||
// We need to wait for other AddFile() calls to finish
|
||||
// before running a manual compaction.
|
||||
return true;
|
||||
}
|
||||
if (m->exclusive) {
|
||||
return (bg_compaction_scheduled_ > 0);
|
||||
}
|
||||
|
16
db/db_impl.h
16
db/db_impl.h
@ -654,11 +654,12 @@ class DBImpl : public DB {
|
||||
// REQUIRES: mutex_ held
|
||||
void WaitForAddFile();
|
||||
|
||||
Status CompactFilesImpl(
|
||||
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
|
||||
Version* version, const std::vector<std::string>& input_file_names,
|
||||
const int output_level, int output_path_id, JobContext* job_context,
|
||||
LogBuffer* log_buffer);
|
||||
Status CompactFilesImpl(const CompactionOptions& compact_options,
|
||||
ColumnFamilyData* cfd, Version* version,
|
||||
const std::vector<std::string>& input_file_names,
|
||||
const int output_level, int output_path_id,
|
||||
JobContext* job_context, LogBuffer* log_buffer);
|
||||
|
||||
Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family,
|
||||
const std::string& file_path,
|
||||
ExternalSstFileInfo* file_info);
|
||||
@ -736,6 +737,7 @@ class DBImpl : public DB {
|
||||
// * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases
|
||||
// (i.e. whenever a flush is done, even if it didn't make any progress)
|
||||
// * whenever there is an error in background purge, flush or compaction
|
||||
// * whenever num_running_addfile_ goes to 0.
|
||||
InstrumentedCondVar bg_cv_;
|
||||
uint64_t logfile_number_;
|
||||
std::deque<uint64_t>
|
||||
@ -985,10 +987,6 @@ class DBImpl : public DB {
|
||||
// REQUIRES: mutex held
|
||||
int num_running_addfile_;
|
||||
|
||||
// A condition variable that will be signaled whenever
|
||||
// num_running_addfile_ goes to 0.
|
||||
InstrumentedCondVar addfile_cv_;
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
WalManager wal_manager_;
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -323,7 +323,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
||||
|
||||
num_running_addfile_--;
|
||||
if (num_running_addfile_ == 0) {
|
||||
addfile_cv_.SignalAll();
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
|
||||
} // mutex_ is unlocked here;
|
||||
@ -409,7 +409,7 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd,
|
||||
void DBImpl::WaitForAddFile() {
|
||||
mutex_.AssertHeld();
|
||||
while (num_running_addfile_ > 0) {
|
||||
addfile_cv_.Wait();
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -1011,10 +1011,13 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
|
||||
// We have 2 overlapping files in L0
|
||||
EXPECT_EQ(FilesPerLevel(), "2");
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||
{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
|
||||
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
|
||||
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
|
||||
{"ExternalSSTFileTest::PickedLevelBug:2",
|
||||
"DBImpl::RunManualCompaction:0"},
|
||||
{"ExternalSSTFileTest::PickedLevelBug:3",
|
||||
"DBImpl::RunManualCompaction:1"}});
|
||||
|
||||
std::atomic<bool> bg_compact_started(false);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
@ -1023,6 +1026,12 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// While writing the MANIFEST start a thread that will ask for compaction
|
||||
std::thread bg_compact([&]() {
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||
});
|
||||
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
|
||||
|
||||
// Start a thread that will ingest a new file
|
||||
std::thread bg_addfile([&]() {
|
||||
file_keys = {1, 2, 3};
|
||||
@ -1032,10 +1041,7 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
|
||||
// Wait for AddFile to start picking levels and writing MANIFEST
|
||||
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
|
||||
|
||||
// While writing the MANIFEST start a thread that will ask for compaction
|
||||
std::thread bg_compact([&]() {
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||
});
|
||||
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
|
||||
|
||||
// We need to verify that no compactions can run while AddFile is
|
||||
// ingesting the files into the levels it find suitable. So we will
|
||||
@ -1065,6 +1071,51 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = false;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.num_levels = 2;
|
||||
options.env = env_;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
std::function<void()> bg_compact = [&]() {
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||
};
|
||||
|
||||
int range_id = 0;
|
||||
std::vector<int> file_keys;
|
||||
std::function<void()> bg_addfile = [&]() {
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
|
||||
};
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
while (range_id < 5000) {
|
||||
int range_start = (range_id * 20);
|
||||
int range_end = range_start + 10;
|
||||
|
||||
file_keys.clear();
|
||||
for (int k = range_start + 1; k < range_end; k++) {
|
||||
file_keys.push_back(k);
|
||||
}
|
||||
ASSERT_OK(Put(Key(range_start), Key(range_start)));
|
||||
ASSERT_OK(Put(Key(range_end), Key(range_end)));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
if (range_id % 10 == 0) {
|
||||
threads.emplace_back(bg_compact);
|
||||
}
|
||||
threads.emplace_back(bg_addfile);
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
threads.clear();
|
||||
|
||||
range_id++;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = false;
|
||||
|
Loading…
Reference in New Issue
Block a user