Fix data race in AddFile() with multiple files + custom comparator bug
Summary: When ingesting multiple files - We should use user comparator - Should not call `cfd->current()` outside of mutex Test Plan: unit tests Reviewers: sdong, lightmark Reviewed By: lightmark Subscribers: andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D63075
This commit is contained in:
parent
5051755e35
commit
80c75593ed
@ -129,6 +129,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
||||
Status status;
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
const Comparator* user_cmp = cfd->internal_comparator().user_comparator();
|
||||
|
||||
auto num_files = file_info_list.size();
|
||||
if (num_files == 0) {
|
||||
@ -142,19 +143,17 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
||||
sorted_file_info_list[i] = &file_info_list[i];
|
||||
}
|
||||
|
||||
auto* vstorage = cfd->current()->storage_info();
|
||||
std::sort(
|
||||
sorted_file_info_list.begin(), sorted_file_info_list.end(),
|
||||
[&vstorage, &file_info_list](const ExternalSstFileInfo* info1,
|
||||
const ExternalSstFileInfo* info2) {
|
||||
return vstorage->InternalComparator()->user_comparator()->Compare(
|
||||
info1->smallest_key, info2->smallest_key) < 0;
|
||||
});
|
||||
std::sort(sorted_file_info_list.begin(), sorted_file_info_list.end(),
|
||||
[&user_cmp, &file_info_list](const ExternalSstFileInfo* info1,
|
||||
const ExternalSstFileInfo* info2) {
|
||||
return user_cmp->Compare(info1->smallest_key,
|
||||
info2->smallest_key) < 0;
|
||||
});
|
||||
|
||||
for (size_t i = 0; i < num_files - 1; i++) {
|
||||
if (sorted_file_info_list[i]->largest_key >=
|
||||
sorted_file_info_list[i + 1]->smallest_key) {
|
||||
return Status::NotSupported("Cannot add overlapping range among files");
|
||||
if (user_cmp->Compare(sorted_file_info_list[i]->largest_key,
|
||||
sorted_file_info_list[i + 1]->smallest_key) >= 0) {
|
||||
return Status::NotSupported("Files have overlapping ranges");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -268,9 +267,8 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
||||
if (status.ok() && iter->Valid()) {
|
||||
ParsedInternalKey seek_result;
|
||||
if (ParseInternalKey(iter->key(), &seek_result)) {
|
||||
auto* vstorage = cfd->current()->storage_info();
|
||||
if (vstorage->InternalComparator()->user_comparator()->Compare(
|
||||
seek_result.user_key, file_info_list[i].largest_key) <= 0) {
|
||||
if (user_cmp->Compare(seek_result.user_key,
|
||||
file_info_list[i].largest_key) <= 0) {
|
||||
status = Status::NotSupported("Cannot add overlapping range");
|
||||
break;
|
||||
}
|
||||
|
@ -1471,24 +1471,37 @@ TEST_F(DBSSTTest, AddExternalSstFileMultiThreaded) {
|
||||
|
||||
thread_num.store(0);
|
||||
std::atomic<int> files_added(0);
|
||||
// Thread 0 -> Load {f0,f1}
|
||||
// Thread 1 -> Load {f0,f1}
|
||||
// Thread 2 -> Load {f2,f3}
|
||||
// Thread 3 -> Load {f2,f3}
|
||||
// Thread 4 -> Load {f4,f5}
|
||||
// Thread 5 -> Load {f4,f5}
|
||||
// ...
|
||||
std::function<void()> load_file_func = [&]() {
|
||||
// We intentionally add every file twice, and assert that it was added
|
||||
// only once and the other add failed
|
||||
int thread_id = thread_num.fetch_add(1);
|
||||
int file_idx = thread_id / 2;
|
||||
int file_idx = (thread_id / 2) * 2;
|
||||
// sometimes we use copy, sometimes link .. the result should be the same
|
||||
bool move_file = (thread_id % 3 == 0);
|
||||
|
||||
Status s = db_->AddFile(std::vector<std::string>(1, file_names[file_idx]),
|
||||
move_file);
|
||||
std::vector<std::string> files_to_add;
|
||||
|
||||
files_to_add = {file_names[file_idx]};
|
||||
if (static_cast<size_t>(file_idx + 1) < file_names.size()) {
|
||||
files_to_add.push_back(file_names[file_idx + 1]);
|
||||
}
|
||||
|
||||
Status s = db_->AddFile(files_to_add, move_file);
|
||||
if (s.ok()) {
|
||||
files_added++;
|
||||
files_added += static_cast<int>(files_to_add.size());
|
||||
}
|
||||
};
|
||||
// Bulk load num_files files in parallel
|
||||
std::vector<std::thread> add_file_threads;
|
||||
DestroyAndReopen(options);
|
||||
for (int i = 0; i < num_files * 2; ++i) {
|
||||
for (int i = 0; i < num_files; ++i) {
|
||||
add_file_threads.emplace_back(load_file_func);
|
||||
}
|
||||
|
||||
@ -1831,6 +1844,65 @@ TEST_F(DBSSTTest, AddExternalSstFilePickedLevelDynamic) {
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBSSTTest, AddExternalSstFileWithCustomCompartor) {
|
||||
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
||||
env_->CreateDir(sst_files_folder);
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.comparator = ReverseBytewiseComparator();
|
||||
DestroyAndReopen(options);
|
||||
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
||||
|
||||
// Generate files with these key ranges
|
||||
// {14 -> 0}
|
||||
// {24 -> 10}
|
||||
// {34 -> 20}
|
||||
// {44 -> 30}
|
||||
// ..
|
||||
std::vector<std::string> generated_files;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
std::string file_name = sst_files_folder + env_->GenerateUniqueId();
|
||||
ASSERT_OK(sst_file_writer.Open(file_name));
|
||||
|
||||
int range_end = i * 10;
|
||||
int range_start = range_end + 15;
|
||||
for (int k = (range_start - 1); k >= range_end; k--) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k)));
|
||||
}
|
||||
ExternalSstFileInfo file_info;
|
||||
ASSERT_OK(sst_file_writer.Finish(&file_info));
|
||||
generated_files.push_back(file_name);
|
||||
}
|
||||
|
||||
std::vector<std::string> in_files;
|
||||
|
||||
// These 2nd and 3rd files overlap with each other
|
||||
in_files = {generated_files[0], generated_files[4], generated_files[5],
|
||||
generated_files[7]};
|
||||
ASSERT_NOK(db_->AddFile(in_files));
|
||||
|
||||
// These 2 files dont overlap with each other
|
||||
in_files = {generated_files[0], generated_files[2]};
|
||||
ASSERT_OK(db_->AddFile(in_files));
|
||||
|
||||
// These 2 files dont overlap with each other but overlap with keys in DB
|
||||
in_files = {generated_files[3], generated_files[7]};
|
||||
ASSERT_NOK(db_->AddFile(in_files));
|
||||
|
||||
// Files dont overlap and dont overlap with DB key range
|
||||
in_files = {generated_files[4], generated_files[6], generated_files[8]};
|
||||
ASSERT_OK(db_->AddFile(in_files));
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
if (i % 20 <= 14) {
|
||||
ASSERT_EQ(Get(Key(i)), Key(i));
|
||||
} else {
|
||||
ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// 1 Create some SST files by inserting K-V pairs into DB
|
||||
|
Loading…
Reference in New Issue
Block a user