Fix CompactFiles bug (#4665)

Summary:
`CompactFiles` gets `SuperVersion` before `WaitForIngestFile`, while `IngestExternalFile` may add files that overlap with `input_file_names`

The timeline of execution flow is as follow:

Let's say that level N has two file [1,2] and [5,6]
```
timeline              user_thread1                             user_thread2
t0   |      CompactFiles([1, 2], [5, 6]) begin
t1   |         GetReferencedSuperVersion()
t2   |                                              IngestExternalFile([3,4]) to level N begin
t3   |             CompactFiles resume
     V
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4665

Differential Revision: D13030674

Pulled By: ajkr

fbshipit-source-id: 8be19477fd6e505032267a979d32f3097cc3be51
This commit is contained in:
DorianZheng 2018-11-12 14:30:21 -08:00 committed by Facebook Github Bot
parent 05dec0c7c7
commit 0f88160f67
2 changed files with 68 additions and 8 deletions

View File

@ -705,7 +705,7 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
immutable_db_options_.info_log.get()); immutable_db_options_.info_log.get());
// Perform CompactFiles // Perform CompactFiles
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
@ -713,15 +713,16 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
// IngestExternalFile() calls to finish. // IngestExternalFile() calls to finish.
WaitForIngestFile(); WaitForIngestFile();
s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names, // We need to get current after `WaitForIngestFile`, because
// `IngestExternalFile` may add files that overlap with `input_file_names`
auto* current = cfd->current();
current->Ref();
s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
output_file_names, output_level, output_path_id, output_file_names, output_level, output_path_id,
&job_context, &log_buffer); &job_context, &log_buffer);
}
if (sv->Unref()) { current->Unref();
mutex_.Lock();
sv->Cleanup();
mutex_.Unlock();
delete sv;
} }
// Find and delete obsolete files // Find and delete obsolete files

View File

@ -2888,6 +2888,65 @@ TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) {
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
} }
TEST_F(DBTest2, TestCompactFiles) {
// Setup sync point dependency to reproduce the race condition of
// DBImpl::GetColumnFamilyHandleUnlocked
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"TestCompactFiles::IngestExternalFile1",
"TestCompactFiles::IngestExternalFile2"},
});
SyncPoint::GetInstance()->EnableProcessing();
Options options;
options.num_levels = 2;
options.disable_auto_compactions = true;
Reopen(options);
auto* handle = db_->DefaultColumnFamily();
ASSERT_EQ(db_->NumberLevels(handle), 2);
rocksdb::SstFileWriter sst_file_writer{rocksdb::EnvOptions(), options};
std::string external_file1 = dbname_ + "/test_compact_files1.sst_t";
std::string external_file2 = dbname_ + "/test_compact_files2.sst_t";
std::string external_file3 = dbname_ + "/test_compact_files3.sst_t";
ASSERT_OK(sst_file_writer.Open(external_file1));
ASSERT_OK(sst_file_writer.Put("1", "1"));
ASSERT_OK(sst_file_writer.Put("2", "2"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(sst_file_writer.Open(external_file2));
ASSERT_OK(sst_file_writer.Put("3", "3"));
ASSERT_OK(sst_file_writer.Put("4", "4"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(sst_file_writer.Open(external_file3));
ASSERT_OK(sst_file_writer.Put("5", "5"));
ASSERT_OK(sst_file_writer.Put("6", "6"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3},
IngestExternalFileOptions()));
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
std::vector<std::string> files;
GetSstFiles(env_, dbname_, &files);
ASSERT_EQ(files.size(), 2);
port::Thread user_thread1(
[&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); });
port::Thread user_thread2([&]() {
ASSERT_OK(db_->IngestExternalFile(handle, {external_file2},
IngestExternalFileOptions()));
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
});
user_thread1.join();
user_thread2.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {