Fix CF import with overlapping SST files (#6663)

Summary:
Invariant checking should use internal key comparator rather than
`sstableKeyCompare()`. The latter was intended for checking whether a
compaction input file's neighboring files need to be included in the
same compaction. Using it for invariant checking was leading to false
positives for files with overlapping endpoints.

Fixes https://github.com/facebook/rocksdb/issues/6647.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6663

Test Plan: regression test

Reviewed By: zhichao-cao

Differential Revision: D20910466

Pulled By: ajkr

fbshipit-source-id: f0b70dad7c4096fce635cab7a36f16e14f74ae3f
This commit is contained in:
Andrew Kryczka 2020-04-16 13:13:25 -07:00 committed by Facebook GitHub Bot
parent 73523baeb1
commit 6717ada899
2 changed files with 72 additions and 10 deletions

View File

@ -33,7 +33,6 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
files_to_import_.push_back(file_to_import);
}
const auto ucmp = cfd_->internal_comparator().user_comparator();
auto num_files = files_to_import_.size();
if (num_files == 0) {
return Status::InvalidArgument("The list of files is empty");
@ -55,17 +54,18 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
}
}
std::sort(sorted_files.begin(), sorted_files.end(),
[&ucmp](const IngestedFileInfo* info1,
const IngestedFileInfo* info2) {
return sstableKeyCompare(ucmp, info1->smallest_internal_key,
std::sort(
sorted_files.begin(), sorted_files.end(),
[this](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
return cfd_->internal_comparator().Compare(
info1->smallest_internal_key,
info2->smallest_internal_key) < 0;
});
for (size_t i = 0; i < sorted_files.size() - 1; i++) {
if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
sorted_files[i + 1]->smallest_internal_key) >=
0) {
if (cfd_->internal_comparator().Compare(
sorted_files[i]->largest_internal_key,
sorted_files[i + 1]->smallest_internal_key) >= 0) {
return Status::InvalidArgument("Files have overlapping ranges");
}
}

View File

@ -430,6 +430,68 @@ TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherDB) {
test::DestroyDir(env_, dbname_ + "/db_copy");
}
TEST_F(ImportColumnFamilyTest, LevelFilesOverlappingAtEndpoints) {
// Imports a column family containing a level where two files overlap at their
// endpoints. "Overlap" means the largest user key in one file is the same as
// the smallest user key in the second file.
const int kFileBytes = 128 << 10; // 128KB
const int kValueBytes = 1 << 10; // 1KB
const int kNumFiles = 4;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 2;
CreateAndReopenWithCF({"koko"}, options);
Random rnd(301);
// Every key is snapshot protected to ensure older versions will not be
// dropped during compaction.
std::vector<const Snapshot*> snapshots;
snapshots.reserve(kFileBytes / kValueBytes * kNumFiles);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kFileBytes / kValueBytes; ++j) {
auto value = RandomString(&rnd, kValueBytes);
ASSERT_OK(Put(1, "key", value));
snapshots.push_back(db_->GetSnapshot());
}
ASSERT_OK(Flush(1));
}
// Compact to create overlapping L1 files.
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
&metadata_ptr_));
ASSERT_NE(metadata_ptr_, nullptr);
delete checkpoint;
// Create a new db and import the files.
DB* db_copy;
test::DestroyDir(env_, dbname_ + "/db_copy");
ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy));
ColumnFamilyHandle* cfh = nullptr;
ASSERT_OK(db_copy->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
*metadata_ptr_, &cfh));
ASSERT_NE(cfh, nullptr);
{
std::string value;
ASSERT_OK(db_copy->Get(ReadOptions(), cfh, "key", &value));
}
db_copy->DropColumnFamily(cfh);
db_copy->DestroyColumnFamilyHandle(cfh);
delete db_copy;
test::DestroyDir(env_, dbname_ + "/db_copy");
for (const Snapshot* snapshot : snapshots) {
db_->ReleaseSnapshot(snapshot);
}
}
TEST_F(ImportColumnFamilyTest, ImportColumnFamilyNegativeTest) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);