Fix AddFile() conflict with compaction output [WaitForAddFile()]
Summary: Since AddFile unlock/lock the mutex inside LogAndApply() we need to ensure that during this period other compactions cannot run since such compactions are not aware of the file we are ingesting and could create a compaction that overlap wit this file this diff add - WaitForAddFile() call that will ensure that no AddFile() calls are being processed right now - Call `WaitForAddFile()` in 3 locations -- When doing manual Compaction -- When starting automatic Compaction -- When doing CompactFiles() Test Plan: unit test Reviewers: lightmark, yiwu, andrewkr, sdong Reviewed By: sdong Subscribers: andrewkr, yoshinorim, jkedgar, dhruba Differential Revision: https://reviews.facebook.net/D64383
This commit is contained in:
parent
8ee2ee8952
commit
f0b881629f
@ -341,6 +341,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
|||||||
next_job_id_(1),
|
next_job_id_(1),
|
||||||
has_unpersisted_data_(false),
|
has_unpersisted_data_(false),
|
||||||
env_options_(db_options_),
|
env_options_(db_options_),
|
||||||
|
num_running_addfile_(0),
|
||||||
|
addfile_cv_(&mutex_),
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
wal_manager_(db_options_, env_options_),
|
wal_manager_(db_options_, env_options_),
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
@ -1994,6 +1996,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
|
|||||||
int max_level_with_files = 0;
|
int max_level_with_files = 0;
|
||||||
{
|
{
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
WaitForAddFile();
|
||||||
Version* base = cfd->current();
|
Version* base = cfd->current();
|
||||||
for (int level = 1; level < base->storage_info()->num_non_empty_levels();
|
for (int level = 1; level < base->storage_info()->num_non_empty_levels();
|
||||||
level++) {
|
level++) {
|
||||||
@ -2109,6 +2112,10 @@ Status DBImpl::CompactFiles(
|
|||||||
{
|
{
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
|
||||||
|
// This call will unlock/lock the mutex to wait for current running
|
||||||
|
// AddFile() calls to finish.
|
||||||
|
WaitForAddFile();
|
||||||
|
|
||||||
s = CompactFilesImpl(compact_options, cfd, sv->current,
|
s = CompactFilesImpl(compact_options, cfd, sv->current,
|
||||||
input_file_names, output_level,
|
input_file_names, output_level,
|
||||||
output_path_id, &job_context, &log_buffer);
|
output_path_id, &job_context, &log_buffer);
|
||||||
@ -3190,6 +3197,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
|
|||||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
||||||
{
|
{
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
|
||||||
|
// This call will unlock/lock the mutex to wait for current running
|
||||||
|
// AddFile() calls to finish.
|
||||||
|
WaitForAddFile();
|
||||||
|
|
||||||
num_running_compactions_++;
|
num_running_compactions_++;
|
||||||
|
|
||||||
auto pending_outputs_inserted_elem =
|
auto pending_outputs_inserted_elem =
|
||||||
|
16
db/db_impl.h
16
db/db_impl.h
@ -650,6 +650,10 @@ class DBImpl : public DB {
|
|||||||
int PickLevelForIngestedFile(ColumnFamilyData* cfd,
|
int PickLevelForIngestedFile(ColumnFamilyData* cfd,
|
||||||
const ExternalSstFileInfo& file_info);
|
const ExternalSstFileInfo& file_info);
|
||||||
|
|
||||||
|
// Wait for current AddFile() calls to finish.
|
||||||
|
// REQUIRES: mutex_ held
|
||||||
|
void WaitForAddFile();
|
||||||
|
|
||||||
Status CompactFilesImpl(
|
Status CompactFilesImpl(
|
||||||
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
|
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
|
||||||
Version* version, const std::vector<std::string>& input_file_names,
|
Version* version, const std::vector<std::string>& input_file_names,
|
||||||
@ -659,6 +663,10 @@ class DBImpl : public DB {
|
|||||||
const std::string& file_path,
|
const std::string& file_path,
|
||||||
ExternalSstFileInfo* file_info);
|
ExternalSstFileInfo* file_info);
|
||||||
|
|
||||||
|
#else
|
||||||
|
// AddFile is not supported in ROCKSDB_LITE so this function
|
||||||
|
// will be no-op
|
||||||
|
void WaitForAddFile() {}
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
|
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
|
||||||
@ -973,6 +981,14 @@ class DBImpl : public DB {
|
|||||||
// REQUIRES: mutex held
|
// REQUIRES: mutex held
|
||||||
std::unordered_set<Compaction*> running_compactions_;
|
std::unordered_set<Compaction*> running_compactions_;
|
||||||
|
|
||||||
|
// Number of running AddFile() calls.
|
||||||
|
// REQUIRES: mutex held
|
||||||
|
int num_running_addfile_;
|
||||||
|
|
||||||
|
// A condition variable that will be signaled whenever
|
||||||
|
// num_running_addfile_ goes to 0.
|
||||||
|
InstrumentedCondVar addfile_cv_;
|
||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
WalManager wal_manager_;
|
WalManager wal_manager_;
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
@ -237,12 +237,16 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
|||||||
|
|
||||||
{
|
{
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
|
||||||
|
|
||||||
const MutableCFOptions mutable_cf_options =
|
const MutableCFOptions mutable_cf_options =
|
||||||
*cfd->GetLatestMutableCFOptions();
|
*cfd->GetLatestMutableCFOptions();
|
||||||
|
|
||||||
WriteThread::Writer w;
|
WriteThread::Writer w;
|
||||||
write_thread_.EnterUnbatched(&w, &mutex_);
|
write_thread_.EnterUnbatched(&w, &mutex_);
|
||||||
|
|
||||||
|
num_running_addfile_++;
|
||||||
|
|
||||||
if (!skip_snapshot_check && !snapshots_.empty()) {
|
if (!skip_snapshot_check && !snapshots_.empty()) {
|
||||||
// Check that no snapshots are being held
|
// Check that no snapshots are being held
|
||||||
status =
|
status =
|
||||||
@ -316,7 +320,13 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
|||||||
ReleaseFileNumberFromPendingOutputs(
|
ReleaseFileNumberFromPendingOutputs(
|
||||||
pending_outputs_inserted_elem_list[i]);
|
pending_outputs_inserted_elem_list[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
num_running_addfile_--;
|
||||||
|
if (num_running_addfile_ == 0) {
|
||||||
|
addfile_cv_.SignalAll();
|
||||||
}
|
}
|
||||||
|
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
|
||||||
|
} // mutex_ is unlocked here;
|
||||||
|
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
// We failed to add the files to the database
|
// We failed to add the files to the database
|
||||||
@ -395,6 +405,13 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd,
|
|||||||
|
|
||||||
return target_level;
|
return target_level;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DBImpl::WaitForAddFile() {
|
||||||
|
mutex_.AssertHeld();
|
||||||
|
while (num_running_addfile_ > 0) {
|
||||||
|
addfile_cv_.Wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -984,6 +984,87 @@ TEST_F(ExternalSSTFileTest, PickedLevel) {
|
|||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ExternalSSTFileTest, PickedLevelBug) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.disable_auto_compactions = false;
|
||||||
|
options.level0_file_num_compaction_trigger = 3;
|
||||||
|
options.num_levels = 2;
|
||||||
|
options.env = env_;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
std::vector<int> file_keys;
|
||||||
|
|
||||||
|
// file #1 in L0
|
||||||
|
file_keys = {0, 5, 7};
|
||||||
|
for (int k : file_keys) {
|
||||||
|
ASSERT_OK(Put(Key(k), Key(k)));
|
||||||
|
}
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
|
||||||
|
// file #2 in L0
|
||||||
|
file_keys = {4, 6, 8, 9};
|
||||||
|
for (int k : file_keys) {
|
||||||
|
ASSERT_OK(Put(Key(k), Key(k)));
|
||||||
|
}
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
|
||||||
|
// We have 2 overlapping files in L0
|
||||||
|
EXPECT_EQ(FilesPerLevel(), "2");
|
||||||
|
|
||||||
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||||
|
{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
|
||||||
|
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
|
||||||
|
});
|
||||||
|
|
||||||
|
std::atomic<bool> bg_compact_started(false);
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"DBImpl::BackgroundCompaction:Start",
|
||||||
|
[&](void* arg) { bg_compact_started.store(true); });
|
||||||
|
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
// Start a thread that will ingest a new file
|
||||||
|
std::thread bg_addfile([&]() {
|
||||||
|
file_keys = {1, 2, 3};
|
||||||
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1));
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for AddFile to start picking levels and writing MANIFEST
|
||||||
|
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
|
||||||
|
|
||||||
|
// While writing the MANIFEST start a thread that will ask for compaction
|
||||||
|
std::thread bg_compact([&]() {
|
||||||
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||||
|
});
|
||||||
|
|
||||||
|
// We need to verify that no compactions can run while AddFile is
|
||||||
|
// ingesting the files into the levels it find suitable. So we will
|
||||||
|
// wait for 2 seconds to give a chance for compactions to run during
|
||||||
|
// this period, and then make sure that no compactions where able to run
|
||||||
|
env_->SleepForMicroseconds(1000000 * 2);
|
||||||
|
ASSERT_FALSE(bg_compact_started.load());
|
||||||
|
|
||||||
|
// Hold AddFile from finishing writing the MANIFEST
|
||||||
|
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");
|
||||||
|
|
||||||
|
bg_addfile.join();
|
||||||
|
bg_compact.join();
|
||||||
|
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
|
||||||
|
int total_keys = 0;
|
||||||
|
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||||
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
total_keys++;
|
||||||
|
}
|
||||||
|
ASSERT_EQ(total_keys, 10);
|
||||||
|
|
||||||
|
delete iter;
|
||||||
|
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
|
TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
options.disable_auto_compactions = false;
|
options.disable_auto_compactions = false;
|
||||||
|
Loading…
Reference in New Issue
Block a user