Fix race due to delete triggered compaction in Universal compaction mode (#6799)
Summary: Delete triggered compaction in universal compaction mode was causing a corruption when scheduled in parallel with other compactions. 1. When num_levels = 1, a file marked for compaction may be picked along with all older files in L0, without checking if any of them are already being compaction. This can cause unpredictable results like resurrection of older versions of keys or deleted keys. 2. When num_levels > 1, a delete triggered compaction would not get scheduled if it overlaps with a running regular compaction. However, the reverse is not true. This is due to the fact that in ```UniversalCompactionBuilder::CalculateSortedRuns```, it assumes that entire sorted runs are picked for compaction and only checks the first file in a sorted run to determine conflicts. This is violated by a delete triggered compaction as it works on a subset of a sorted run. Fix the bug for num_levels > 1, and disable the feature for now when num_levels = 1. After disabling this feature, files would still get marked for compaction, but no compaction would get scheduled. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6799 Reviewed By: pdillinger Differential Revision: D21431286 Pulled By: anand1976 fbshipit-source-id: ae9f0bdb1d6ae2f10284847db731c23f43af164a
This commit is contained in:
parent
442bd69222
commit
70fb77b199
@ -1,9 +1,13 @@
|
||||
# Rocksdb Change Log
|
||||
## 6.9.4 (04/30/2020)
|
||||
## 6.9.4 (05/08/2020)
|
||||
### Bug Fixes
|
||||
* Fix a bug caused by overwrite the status with io status in block based table builder when writing data blocks. If status stores the error message (e.g., failure of verify block compression), the bug will make the io status overwrite the status.
|
||||
* Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true.
|
||||
* Fix possible false NotFound status from batched MultiGet using index type kHashSearch.
|
||||
* Fix corruption caused by enabling delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode, along with parallel compactions. The bug can result in two parallel compactions picking the same input files, resulting in the DB resurrecting older and deleted versions of some keys.
|
||||
|
||||
### Behavior Changes
|
||||
* Disable delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode and num_levels = 1 in order to avoid a corruption bug.
|
||||
|
||||
## 6.9.3 (04/28/2020)
|
||||
### Bug Fixes
|
||||
|
@ -1085,6 +1085,8 @@ void CompactionPicker::PickFilesMarkedForCompaction(
|
||||
Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
|
||||
size_t random_file_index = static_cast<size_t>(rnd.Uniform(
|
||||
static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
|
||||
TEST_SYNC_POINT_CALLBACK("CompactionPicker::PickFilesMarkedForCompaction",
|
||||
&random_file_index);
|
||||
|
||||
if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
|
||||
// found the compaction!
|
||||
|
@ -78,8 +78,17 @@ class CompactionPickerTest : public testing::Test {
|
||||
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
||||
}
|
||||
|
||||
// Create a new VersionStorageInfo object so we can add mode files and then
|
||||
// merge it with the existing VersionStorageInfo
|
||||
void AddVersionStorage() {
|
||||
temp_vstorage_.reset(new VersionStorageInfo(
|
||||
&icmp_, ucmp_, options_.num_levels, ioptions_.compaction_style,
|
||||
vstorage_.get(), false));
|
||||
}
|
||||
|
||||
void DeleteVersionStorage() {
|
||||
vstorage_.reset();
|
||||
temp_vstorage_.reset();
|
||||
files_.clear();
|
||||
file_map_.clear();
|
||||
input_files_.clear();
|
||||
@ -88,18 +97,24 @@ class CompactionPickerTest : public testing::Test {
|
||||
void Add(int level, uint32_t file_number, const char* smallest,
|
||||
const char* largest, uint64_t file_size = 1, uint32_t path_id = 0,
|
||||
SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
|
||||
size_t compensated_file_size = 0) {
|
||||
assert(level < vstorage_->num_levels());
|
||||
size_t compensated_file_size = 0, bool marked_for_compact = false) {
|
||||
VersionStorageInfo* vstorage;
|
||||
if (temp_vstorage_) {
|
||||
vstorage = temp_vstorage_.get();
|
||||
} else {
|
||||
vstorage = vstorage_.get();
|
||||
}
|
||||
assert(level < vstorage->num_levels());
|
||||
FileMetaData* f = new FileMetaData(
|
||||
file_number, path_id, file_size,
|
||||
InternalKey(smallest, smallest_seq, kTypeValue),
|
||||
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
|
||||
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
|
||||
largest_seq, marked_for_compact, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
|
||||
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
|
||||
f->compensated_file_size =
|
||||
(compensated_file_size != 0) ? compensated_file_size : file_size;
|
||||
vstorage_->AddFile(level, f);
|
||||
vstorage->AddFile(level, f);
|
||||
files_.emplace_back(f);
|
||||
file_map_.insert({file_number, {f, level}});
|
||||
}
|
||||
@ -122,6 +137,12 @@ class CompactionPickerTest : public testing::Test {
|
||||
}
|
||||
|
||||
void UpdateVersionStorageInfo() {
|
||||
if (temp_vstorage_) {
|
||||
VersionBuilder builder(FileOptions(), &ioptions_, nullptr,
|
||||
vstorage_.get(), nullptr);
|
||||
builder.SaveTo(temp_vstorage_.get());
|
||||
vstorage_ = std::move(temp_vstorage_);
|
||||
}
|
||||
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
||||
vstorage_->UpdateFilesByCompactionPri(ioptions_.compaction_pri);
|
||||
vstorage_->UpdateNumNonEmptyLevels();
|
||||
@ -132,6 +153,28 @@ class CompactionPickerTest : public testing::Test {
|
||||
vstorage_->ComputeFilesMarkedForCompaction();
|
||||
vstorage_->SetFinalized();
|
||||
}
|
||||
void AddFileToVersionStorage(int level, uint32_t file_number,
|
||||
const char* smallest, const char* largest,
|
||||
uint64_t file_size = 1, uint32_t path_id = 0,
|
||||
SequenceNumber smallest_seq = 100,
|
||||
SequenceNumber largest_seq = 100,
|
||||
size_t compensated_file_size = 0,
|
||||
bool marked_for_compact = false) {
|
||||
VersionStorageInfo* base_vstorage = vstorage_.release();
|
||||
vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleUniversal,
|
||||
base_vstorage, false));
|
||||
Add(level, file_number, smallest, largest, file_size, path_id, smallest_seq,
|
||||
largest_seq, compensated_file_size, marked_for_compact);
|
||||
|
||||
VersionBuilder builder(FileOptions(), &ioptions_, nullptr, base_vstorage,
|
||||
nullptr);
|
||||
builder.SaveTo(vstorage_.get());
|
||||
UpdateVersionStorageInfo();
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<VersionStorageInfo> temp_vstorage_;
|
||||
};
|
||||
|
||||
TEST_F(CompactionPickerTest, Empty) {
|
||||
@ -1733,6 +1776,163 @@ TEST_F(CompactionPickerTest, IntraL0ForEarliestSeqno) {
|
||||
ASSERT_EQ(0, compaction->output_level());
|
||||
}
|
||||
|
||||
TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) {
|
||||
const uint64_t kFileSize = 100000;
|
||||
|
||||
ioptions_.compaction_style = kCompactionStyleUniversal;
|
||||
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
|
||||
|
||||
// This test covers the case where a "regular" universal compaction is
|
||||
// scheduled first, followed by a delete triggered compaction. The latter
|
||||
// should fail
|
||||
NewVersionStorage(5, kCompactionStyleUniversal);
|
||||
|
||||
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
|
||||
Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
|
||||
Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300);
|
||||
Add(3, 5U, "010", "080", 8 * kFileSize, 0, 200, 251);
|
||||
Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
|
||||
Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
|
||||
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
|
||||
ASSERT_TRUE(compaction);
|
||||
// Validate that its a compaction to reduce sorted runs
|
||||
ASSERT_EQ(CompactionReason::kUniversalSortedRunNum,
|
||||
compaction->compaction_reason());
|
||||
ASSERT_EQ(0, compaction->output_level());
|
||||
ASSERT_EQ(0, compaction->start_level());
|
||||
ASSERT_EQ(2U, compaction->num_input_files(0));
|
||||
|
||||
AddVersionStorage();
|
||||
// Simulate a flush and mark the file for compaction
|
||||
Add(0, 1U, "150", "200", kFileSize, 0, 551, 600, 0, true);
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction2(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
ASSERT_FALSE(compaction2);
|
||||
}
|
||||
|
||||
TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) {
|
||||
const uint64_t kFileSize = 100000;
|
||||
|
||||
ioptions_.compaction_style = kCompactionStyleUniversal;
|
||||
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
|
||||
|
||||
// This test covers the case where a delete triggered compaction is
|
||||
// scheduled first, followed by a "regular" compaction. The latter
|
||||
// should fail
|
||||
NewVersionStorage(5, kCompactionStyleUniversal);
|
||||
|
||||
// Mark file number 4 for compaction
|
||||
Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
|
||||
Add(3, 5U, "240", "290", 8 * kFileSize, 0, 201, 250);
|
||||
Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
|
||||
Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
|
||||
ASSERT_TRUE(compaction);
|
||||
// Validate that its a delete triggered compaction
|
||||
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
|
||||
compaction->compaction_reason());
|
||||
ASSERT_EQ(3, compaction->output_level());
|
||||
ASSERT_EQ(0, compaction->start_level());
|
||||
ASSERT_EQ(1U, compaction->num_input_files(0));
|
||||
ASSERT_EQ(1U, compaction->num_input_files(1));
|
||||
|
||||
AddVersionStorage();
|
||||
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
|
||||
Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction2(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
ASSERT_FALSE(compaction2);
|
||||
}
|
||||
|
||||
TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) {
|
||||
// The case where universal periodic compaction can be picked
|
||||
// with some newer files being compacted.
|
||||
const uint64_t kFileSize = 100000;
|
||||
|
||||
ioptions_.compaction_style = kCompactionStyleUniversal;
|
||||
|
||||
bool input_level_overlap = false;
|
||||
bool output_level_overlap = false;
|
||||
// Let's mark 2 files in 2 different levels for compaction. The
|
||||
// compaction picker will randomly pick one, so use the sync point to
|
||||
// ensure a deterministic order. Loop until both cases are covered
|
||||
size_t random_index = 0;
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionPicker::PickFilesMarkedForCompaction", [&](void* arg) {
|
||||
size_t* index = static_cast<size_t*>(arg);
|
||||
*index = random_index;
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
while (!input_level_overlap || !output_level_overlap) {
|
||||
// Ensure that the L0 file gets picked first
|
||||
random_index = !input_level_overlap ? 0 : 1;
|
||||
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
|
||||
NewVersionStorage(5, kCompactionStyleUniversal);
|
||||
|
||||
Add(0, 1U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
|
||||
Add(3, 2U, "010", "020", 2 * kFileSize, 0, 201, 248);
|
||||
Add(3, 3U, "250", "270", 2 * kFileSize, 0, 202, 249);
|
||||
Add(3, 4U, "290", "310", 2 * kFileSize, 0, 203, 250);
|
||||
Add(3, 5U, "310", "320", 2 * kFileSize, 0, 204, 251, 0, true);
|
||||
Add(4, 6U, "301", "350", 8 * kFileSize, 0, 101, 150);
|
||||
Add(4, 7U, "501", "750", 8 * kFileSize, 0, 101, 150);
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
|
||||
ASSERT_TRUE(compaction);
|
||||
// Validate that its a delete triggered compaction
|
||||
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
|
||||
compaction->compaction_reason());
|
||||
ASSERT_TRUE(compaction->start_level() == 0 ||
|
||||
compaction->start_level() == 3);
|
||||
if (compaction->start_level() == 0) {
|
||||
// The L0 file was picked. The next compaction will detect an
|
||||
// overlap on its input level
|
||||
input_level_overlap = true;
|
||||
ASSERT_EQ(3, compaction->output_level());
|
||||
ASSERT_EQ(1U, compaction->num_input_files(0));
|
||||
ASSERT_EQ(3U, compaction->num_input_files(1));
|
||||
} else {
|
||||
// The level 3 file was picked. The next compaction will pick
|
||||
// the L0 file and will detect overlap when adding output
|
||||
// level inputs
|
||||
output_level_overlap = true;
|
||||
ASSERT_EQ(4, compaction->output_level());
|
||||
ASSERT_EQ(2U, compaction->num_input_files(0));
|
||||
ASSERT_EQ(1U, compaction->num_input_files(1));
|
||||
}
|
||||
|
||||
vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
|
||||
// After recomputing the compaction score, only one marked file will remain
|
||||
random_index = 0;
|
||||
std::unique_ptr<Compaction> compaction2(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
ASSERT_FALSE(compaction2);
|
||||
DeleteVersionStorage();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -120,8 +120,7 @@ class UniversalCompactionBuilder {
|
||||
LogBuffer* log_buffer_;
|
||||
|
||||
static std::vector<SortedRun> CalculateSortedRuns(
|
||||
const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions,
|
||||
const MutableCFOptions& mutable_cf_options);
|
||||
const VersionStorageInfo& vstorage);
|
||||
|
||||
// Pick a path ID to place a newly generated file, with its estimated file
|
||||
// size.
|
||||
@ -325,8 +324,7 @@ void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
|
||||
|
||||
std::vector<UniversalCompactionBuilder::SortedRun>
|
||||
UniversalCompactionBuilder::CalculateSortedRuns(
|
||||
const VersionStorageInfo& vstorage, const ImmutableCFOptions& /*ioptions*/,
|
||||
const MutableCFOptions& mutable_cf_options) {
|
||||
const VersionStorageInfo& vstorage) {
|
||||
std::vector<UniversalCompactionBuilder::SortedRun> ret;
|
||||
for (FileMetaData* f : vstorage.LevelFiles(0)) {
|
||||
ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
|
||||
@ -336,28 +334,17 @@ UniversalCompactionBuilder::CalculateSortedRuns(
|
||||
uint64_t total_compensated_size = 0U;
|
||||
uint64_t total_size = 0U;
|
||||
bool being_compacted = false;
|
||||
bool is_first = true;
|
||||
for (FileMetaData* f : vstorage.LevelFiles(level)) {
|
||||
total_compensated_size += f->compensated_file_size;
|
||||
total_size += f->fd.GetFileSize();
|
||||
if (mutable_cf_options.compaction_options_universal.allow_trivial_move ==
|
||||
true) {
|
||||
// Size amp, read amp and periodic compactions always include all files
|
||||
// for a non-zero level. However, a delete triggered compaction and
|
||||
// a trivial move might pick a subset of files in a sorted run. So
|
||||
// always check all files in a sorted run and mark the entire run as
|
||||
// being compacted if one or more files are being compacted
|
||||
if (f->being_compacted) {
|
||||
being_compacted = f->being_compacted;
|
||||
}
|
||||
} else {
|
||||
// Compaction always includes all files for a non-zero level, so for a
|
||||
// non-zero level, all the files should share the same being_compacted
|
||||
// value.
|
||||
// This assumption is only valid when
|
||||
// mutable_cf_options.compaction_options_universal.allow_trivial_move
|
||||
// is false
|
||||
assert(is_first || f->being_compacted == being_compacted);
|
||||
}
|
||||
if (is_first) {
|
||||
being_compacted = f->being_compacted;
|
||||
is_first = false;
|
||||
}
|
||||
}
|
||||
if (total_compensated_size > 0) {
|
||||
ret.emplace_back(level, nullptr, total_size, total_compensated_size,
|
||||
@ -372,8 +359,7 @@ UniversalCompactionBuilder::CalculateSortedRuns(
|
||||
Compaction* UniversalCompactionBuilder::PickCompaction() {
|
||||
const int kLevel0 = 0;
|
||||
score_ = vstorage_->CompactionScore(kLevel0);
|
||||
sorted_runs_ =
|
||||
CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_);
|
||||
sorted_runs_ = CalculateSortedRuns(*vstorage_);
|
||||
|
||||
if (sorted_runs_.size() == 0 ||
|
||||
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
|
||||
@ -855,6 +841,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
|
||||
std::vector<CompactionInputFiles> inputs;
|
||||
|
||||
if (vstorage_->num_levels() == 1) {
|
||||
#if defined(ENABLE_SINGLE_LEVEL_DTC)
|
||||
// This is single level universal. Since we're basically trying to reclaim
|
||||
// space by processing files marked for compaction due to high tombstone
|
||||
// density, let's do the same thing as compaction to reduce size amp which
|
||||
@ -877,6 +864,11 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
|
||||
return nullptr;
|
||||
}
|
||||
inputs.push_back(start_level_inputs);
|
||||
#else
|
||||
// Disable due to a known race condition.
|
||||
// TODO: Reenable once the race condition is fixed
|
||||
return nullptr;
|
||||
#endif // ENABLE_SINGLE_LEVEL_DTC
|
||||
} else {
|
||||
int start_level;
|
||||
|
||||
|
@ -1953,6 +1953,7 @@ TEST_F(DBTestUniversalCompaction2, BasicL0toL1) {
|
||||
ASSERT_GT(NumTableFilesAtLevel(6), 0);
|
||||
}
|
||||
|
||||
#if defined(ENABLE_SINGLE_LEVEL_DTC)
|
||||
TEST_F(DBTestUniversalCompaction2, SingleLevel) {
|
||||
const int kNumKeys = 3000;
|
||||
const int kWindowSize = 100;
|
||||
@ -1991,6 +1992,7 @@ TEST_F(DBTestUniversalCompaction2, SingleLevel) {
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(0));
|
||||
}
|
||||
#endif // ENABLE_SINGLE_LEVEL_DTC
|
||||
|
||||
TEST_F(DBTestUniversalCompaction2, MultipleLevels) {
|
||||
const int kWindowSize = 100;
|
||||
|
@ -2375,6 +2375,11 @@ void VersionStorageInfo::ComputeCompactionScore(
|
||||
// compaction score for the whole DB. Adding other levels as if
|
||||
// they are L0 files.
|
||||
for (int i = 1; i < num_levels(); i++) {
|
||||
// Its possible that a subset of the files in a level may be in a
|
||||
// compaction, due to delete triggered compaction or trivial move.
|
||||
// In that case, the below check may not catch a level being
|
||||
// compacted as it only checks the first file. The worst that can
|
||||
// happen is a scheduled compaction thread will find nothing to do.
|
||||
if (!files_[i].empty() && !files_[i][0]->being_compacted) {
|
||||
num_sorted_runs++;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user