Test for compaction of corrupted keys
Summary: Fixes T7697334. Adds a simple test to check whether CompactionJob deals with corrupted keys correctly. Right now, we preserve corrupted keys. Note: depending on the type of corruption and options like comparators, CompactionJob fails. This test just checks whether corrupted keys that do not fail CompactionJob are preserved. Test Plan: `make compaction_job_test && ./compaction_job_test` -> Tests pass. Add `input->Next(); continue;` in CompactionJob::ProcessKeyValueCompaction() inside then-branch of `!ParseInternalKey(key, &ikey)` -> Tests fail. Reviewers: sdong, rven, yhchiang, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42237
This commit is contained in:
parent
c5bca53198
commit
e4af3bfb27
@ -20,6 +20,43 @@
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
void VerifyInitializationOfCompactionJobStats(
|
||||||
|
const CompactionJobStats& compaction_job_stats) {
|
||||||
|
#if !defined(IOS_CROSS_COMPILE)
|
||||||
|
ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U);
|
||||||
|
|
||||||
|
ASSERT_EQ(compaction_job_stats.num_input_records, 0U);
|
||||||
|
ASSERT_EQ(compaction_job_stats.num_input_files, 0U);
|
||||||
|
ASSERT_EQ(compaction_job_stats.num_input_files_at_output_level, 0U);
|
||||||
|
|
||||||
|
ASSERT_EQ(compaction_job_stats.num_output_records, 0U);
|
||||||
|
ASSERT_EQ(compaction_job_stats.num_output_files, 0U);
|
||||||
|
|
||||||
|
ASSERT_EQ(compaction_job_stats.is_manual_compaction, false);
|
||||||
|
|
||||||
|
ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U);
|
||||||
|
ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U);
|
||||||
|
|
||||||
|
ASSERT_EQ(compaction_job_stats.total_input_raw_key_bytes, 0U);
|
||||||
|
ASSERT_EQ(compaction_job_stats.total_input_raw_value_bytes, 0U);
|
||||||
|
|
||||||
|
ASSERT_EQ(compaction_job_stats.smallest_output_key_prefix[0], 0);
|
||||||
|
ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0);
|
||||||
|
|
||||||
|
ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U);
|
||||||
|
#endif // !defined(IOS_CROSS_COMPILE)
|
||||||
|
}
|
||||||
|
|
||||||
|
void VerifyCompactionJobStats(const CompactionJobStats& compaction_job_stats,
|
||||||
|
const std::vector<FileMetaData*>& files,
|
||||||
|
size_t num_output_files) {
|
||||||
|
ASSERT_GE(compaction_job_stats.elapsed_micros, 0U);
|
||||||
|
ASSERT_EQ(compaction_job_stats.num_input_files, files.size());
|
||||||
|
ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files);
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
// TODO(icanadi) Make it simpler once we mock out VersionSet
|
// TODO(icanadi) Make it simpler once we mock out VersionSet
|
||||||
class CompactionJobTest : public testing::Test {
|
class CompactionJobTest : public testing::Test {
|
||||||
public:
|
public:
|
||||||
@ -53,22 +90,40 @@ class CompactionJobTest : public testing::Test {
|
|||||||
return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
|
return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Corrupts key by changing the type
|
||||||
|
void CorruptKey(InternalKey* ikey) {
|
||||||
|
std::string keystr = ikey->Encode().ToString();
|
||||||
|
keystr[keystr.size() - 8] = kTypeLogData;
|
||||||
|
ikey->DecodeFrom(Slice(keystr.data(), keystr.size()));
|
||||||
|
}
|
||||||
|
|
||||||
// returns expected result after compaction
|
// returns expected result after compaction
|
||||||
mock::MockFileContents CreateTwoFiles() {
|
mock::MockFileContents CreateTwoFiles(bool gen_corrupted_keys) {
|
||||||
mock::MockFileContents expected_results;
|
mock::MockFileContents expected_results;
|
||||||
const int kKeysPerFile = 10000;
|
const int kKeysPerFile = 10000;
|
||||||
|
const int kCorruptKeysPerFile = 200;
|
||||||
|
const int kMatchingKeys = kKeysPerFile / 2;
|
||||||
SequenceNumber sequence_number = 0;
|
SequenceNumber sequence_number = 0;
|
||||||
|
|
||||||
|
auto corrupt_id = [&](int id) {
|
||||||
|
return gen_corrupted_keys && id > 0 && id <= kCorruptKeysPerFile;
|
||||||
|
};
|
||||||
|
|
||||||
for (int i = 0; i < 2; ++i) {
|
for (int i = 0; i < 2; ++i) {
|
||||||
mock::MockFileContents contents;
|
mock::MockFileContents contents;
|
||||||
SequenceNumber smallest_seqno = 0, largest_seqno = 0;
|
SequenceNumber smallest_seqno = 0, largest_seqno = 0;
|
||||||
InternalKey smallest, largest;
|
InternalKey smallest, largest;
|
||||||
for (int k = 0; k < kKeysPerFile; ++k) {
|
for (int k = 0; k < kKeysPerFile; ++k) {
|
||||||
auto key = ToString(i * (kKeysPerFile / 2) + k);
|
auto key = ToString(i * kMatchingKeys + k);
|
||||||
auto value = ToString(i * kKeysPerFile + k);
|
auto value = ToString(i * kKeysPerFile + k);
|
||||||
InternalKey internal_key(key, ++sequence_number, kTypeValue);
|
InternalKey internal_key(key, ++sequence_number, kTypeValue);
|
||||||
// This is how the key will look like once it's written in bottommost
|
// This is how the key will look like once it's written in bottommost
|
||||||
// file
|
// file
|
||||||
InternalKey bottommost_internal_key(key, 0, kTypeValue);
|
InternalKey bottommost_internal_key(key, 0, kTypeValue);
|
||||||
|
if (corrupt_id(k)) {
|
||||||
|
CorruptKey(&internal_key);
|
||||||
|
CorruptKey(&bottommost_internal_key);
|
||||||
|
}
|
||||||
if (k == 0) {
|
if (k == 0) {
|
||||||
smallest = internal_key;
|
smallest = internal_key;
|
||||||
smallest_seqno = sequence_number;
|
smallest_seqno = sequence_number;
|
||||||
@ -77,7 +132,7 @@ class CompactionJobTest : public testing::Test {
|
|||||||
largest_seqno = sequence_number;
|
largest_seqno = sequence_number;
|
||||||
}
|
}
|
||||||
contents.insert({internal_key.Encode().ToString(), value});
|
contents.insert({internal_key.Encode().ToString(), value});
|
||||||
if (i == 1 || k < kKeysPerFile / 2) {
|
if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) {
|
||||||
expected_results.insert(
|
expected_results.insert(
|
||||||
{bottommost_internal_key.Encode().ToString(), value});
|
{bottommost_internal_key.Encode().ToString(), value});
|
||||||
}
|
}
|
||||||
@ -122,6 +177,45 @@ class CompactionJobTest : public testing::Test {
|
|||||||
s = SetCurrentFile(env_, dbname_, 1, nullptr);
|
s = SetCurrentFile(env_, dbname_, 1, nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RunCompaction(std::function<uint64_t()> yield_callback,
|
||||||
|
const std::vector<FileMetaData*>& files) {
|
||||||
|
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||||
|
|
||||||
|
CompactionInputFiles compaction_input_files;
|
||||||
|
compaction_input_files.level = 0;
|
||||||
|
for (auto file : files) {
|
||||||
|
compaction_input_files.files.push_back(file);
|
||||||
|
}
|
||||||
|
Compaction compaction(cfd->current()->storage_info(),
|
||||||
|
*cfd->GetLatestMutableCFOptions(),
|
||||||
|
{compaction_input_files}, 1, 1024 * 1024, 10, 0,
|
||||||
|
kNoCompression, {});
|
||||||
|
compaction.SetInputVersion(cfd->current());
|
||||||
|
|
||||||
|
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
||||||
|
mutex_.Lock();
|
||||||
|
EventLogger event_logger(db_options_.info_log.get());
|
||||||
|
CompactionJobStats compaction_job_stats;
|
||||||
|
CompactionJob compaction_job(0, &compaction, db_options_, env_options_,
|
||||||
|
versions_.get(), &shutting_down_, &log_buffer,
|
||||||
|
nullptr, nullptr, nullptr, {}, table_cache_,
|
||||||
|
yield_callback, &event_logger, false,
|
||||||
|
dbname_, &compaction_job_stats);
|
||||||
|
|
||||||
|
VerifyInitializationOfCompactionJobStats(compaction_job_stats);
|
||||||
|
|
||||||
|
compaction_job.Prepare();
|
||||||
|
mutex_.Unlock();
|
||||||
|
ASSERT_OK(compaction_job.Run());
|
||||||
|
mutex_.Lock();
|
||||||
|
Status s;
|
||||||
|
compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
mutex_.Unlock();
|
||||||
|
|
||||||
|
VerifyCompactionJobStats(compaction_job_stats, files, 1);
|
||||||
|
}
|
||||||
|
|
||||||
Env* env_;
|
Env* env_;
|
||||||
std::string dbname_;
|
std::string dbname_;
|
||||||
EnvOptions env_options_;
|
EnvOptions env_options_;
|
||||||
@ -137,93 +231,36 @@ class CompactionJobTest : public testing::Test {
|
|||||||
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
|
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
|
||||||
};
|
};
|
||||||
|
|
||||||
namespace {
|
|
||||||
void VerifyInitializationOfCompactionJobStats(
|
|
||||||
const CompactionJobStats& compaction_job_stats) {
|
|
||||||
#if !defined(IOS_CROSS_COMPILE)
|
|
||||||
ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U);
|
|
||||||
|
|
||||||
ASSERT_EQ(compaction_job_stats.num_input_records, 0U);
|
|
||||||
ASSERT_EQ(compaction_job_stats.num_input_files, 0U);
|
|
||||||
ASSERT_EQ(compaction_job_stats.num_input_files_at_output_level, 0U);
|
|
||||||
|
|
||||||
ASSERT_EQ(compaction_job_stats.num_output_records, 0U);
|
|
||||||
ASSERT_EQ(compaction_job_stats.num_output_files, 0U);
|
|
||||||
|
|
||||||
ASSERT_EQ(compaction_job_stats.is_manual_compaction, false);
|
|
||||||
|
|
||||||
ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U);
|
|
||||||
ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U);
|
|
||||||
|
|
||||||
ASSERT_EQ(compaction_job_stats.total_input_raw_key_bytes, 0U);
|
|
||||||
ASSERT_EQ(compaction_job_stats.total_input_raw_value_bytes, 0U);
|
|
||||||
|
|
||||||
ASSERT_EQ(compaction_job_stats.smallest_output_key_prefix[0], 0);
|
|
||||||
ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0);
|
|
||||||
|
|
||||||
ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U);
|
|
||||||
#endif // !defined(IOS_CROSS_COMPILE)
|
|
||||||
}
|
|
||||||
|
|
||||||
void VerifyCompactionJobStats(const CompactionJobStats& compaction_job_stats,
|
|
||||||
const std::vector<FileMetaData*>& files,
|
|
||||||
size_t num_output_files) {
|
|
||||||
ASSERT_GE(compaction_job_stats.elapsed_micros, 0U);
|
|
||||||
ASSERT_EQ(compaction_job_stats.num_input_files, files.size());
|
|
||||||
ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files);
|
|
||||||
}
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
TEST_F(CompactionJobTest, Simple) {
|
TEST_F(CompactionJobTest, Simple) {
|
||||||
|
auto expected_results = CreateTwoFiles(false);
|
||||||
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
|
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||||
|
|
||||||
auto expected_results = CreateTwoFiles();
|
|
||||||
|
|
||||||
auto files = cfd->current()->storage_info()->LevelFiles(0);
|
auto files = cfd->current()->storage_info()->LevelFiles(0);
|
||||||
ASSERT_EQ(2U, files.size());
|
ASSERT_EQ(2U, files.size());
|
||||||
|
|
||||||
CompactionInputFiles compaction_input_files;
|
|
||||||
compaction_input_files.level = 0;
|
|
||||||
compaction_input_files.files.push_back(files[0]);
|
|
||||||
compaction_input_files.files.push_back(files[1]);
|
|
||||||
std::unique_ptr<Compaction> compaction(new Compaction(
|
|
||||||
cfd->current()->storage_info(), *cfd->GetLatestMutableCFOptions(),
|
|
||||||
{compaction_input_files}, 1, 1024 * 1024, 10, 0, kNoCompression, {}));
|
|
||||||
compaction->SetInputVersion(cfd->current());
|
|
||||||
|
|
||||||
int yield_callback_called = 0;
|
int yield_callback_called = 0;
|
||||||
std::function<uint64_t()> yield_callback = [&]() {
|
std::function<uint64_t()> yield_callback = [&]() {
|
||||||
yield_callback_called++;
|
yield_callback_called++;
|
||||||
return 0;
|
return 0;
|
||||||
};
|
};
|
||||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
|
||||||
mutex_.Lock();
|
|
||||||
EventLogger event_logger(db_options_.info_log.get());
|
|
||||||
std::string db_name = "dbname";
|
|
||||||
CompactionJobStats compaction_job_stats;
|
|
||||||
CompactionJob compaction_job(0, compaction.get(), db_options_, env_options_,
|
|
||||||
versions_.get(), &shutting_down_, &log_buffer,
|
|
||||||
nullptr, nullptr, nullptr, {}, table_cache_,
|
|
||||||
std::move(yield_callback), &event_logger, false,
|
|
||||||
db_name, &compaction_job_stats);
|
|
||||||
|
|
||||||
VerifyInitializationOfCompactionJobStats(compaction_job_stats);
|
|
||||||
|
|
||||||
compaction_job.Prepare();
|
|
||||||
mutex_.Unlock();
|
|
||||||
ASSERT_OK(compaction_job.Run());
|
|
||||||
mutex_.Lock();
|
|
||||||
Status s;
|
|
||||||
compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_);
|
|
||||||
ASSERT_OK(s);
|
|
||||||
mutex_.Unlock();
|
|
||||||
|
|
||||||
VerifyCompactionJobStats(compaction_job_stats, files, 1);
|
|
||||||
|
|
||||||
|
RunCompaction(std::move(yield_callback), files);
|
||||||
mock_table_factory_->AssertLatestFile(expected_results);
|
mock_table_factory_->AssertLatestFile(expected_results);
|
||||||
ASSERT_EQ(yield_callback_called, 20000);
|
ASSERT_EQ(yield_callback_called, 20000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(CompactionJobTest, SimpleCorrupted) {
|
||||||
|
auto expected_results = CreateTwoFiles(true);
|
||||||
|
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||||
|
auto files = cfd->current()->storage_info()->LevelFiles(0);
|
||||||
|
|
||||||
|
std::function<uint64_t()> yield_callback = [&]() {
|
||||||
|
return 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
RunCompaction(std::move(yield_callback), files);
|
||||||
|
mock_table_factory_->AssertLatestFile(expected_results);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user