Fix bug when DeleteRange() used with paranoid_file_checks == true

Backports part of 750817555867a43f0e7b73dffa44756a9136c808.

The unit test comes from https://github.com/facebook/rocksdb/pull/7521.
This commit is contained in:
Andrew Kryczka 2020-10-13 08:58:27 -07:00
parent 290e990c84
commit 56fee9f1ae
3 changed files with 106 additions and 4 deletions

View File

@ -1,4 +1,8 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased
### Bug Fixes
* Fix false positive flush/compaction `Status::Corruption` failure when `paranoid_file_checks == true` and range tombstones were written to the compaction output files.
## 6.12.5 (2020-10-12) ## 6.12.5 (2020-10-12)
### Bug Fixes ### Bug Fixes
* Since 6.12, memtable lookup should report unrecognized value_type as corruption (#7121). * Since 6.12, memtable lookup should report unrecognized value_type as corruption (#7121).

View File

@ -1288,8 +1288,8 @@ Status CompactionJob::FinishCompactionOutputFile(
auto kv = tombstone.Serialize(); auto kv = tombstone.Serialize();
assert(lower_bound == nullptr || assert(lower_bound == nullptr ||
ucmp->Compare(*lower_bound, kv.second) < 0); ucmp->Compare(*lower_bound, kv.second) < 0);
sub_compact->AddToBuilder(kv.first.Encode(), kv.second, // Range tombstone is not supported by output validator yet.
paranoid_file_checks_); sub_compact->builder->Add(kv.first.Encode(), kv.second);
InternalKey smallest_candidate = std::move(kv.first); InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr && if (lower_bound != nullptr &&
ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) { ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {

View File

@ -104,7 +104,7 @@ class CorruptionTest : public testing::Test {
ASSERT_OK(::ROCKSDB_NAMESPACE::RepairDB(dbname_, options_)); ASSERT_OK(::ROCKSDB_NAMESPACE::RepairDB(dbname_, options_));
} }
void Build(int n, int flush_every = 0) { void Build(int n, int start, int flush_every) {
std::string key_space, value_space; std::string key_space, value_space;
WriteBatch batch; WriteBatch batch;
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
@ -113,13 +113,15 @@ class CorruptionTest : public testing::Test {
dbi->TEST_FlushMemTable(); dbi->TEST_FlushMemTable();
} }
//if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n); //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
Slice key = Key(i, &key_space); Slice key = Key(i + start, &key_space);
batch.Clear(); batch.Clear();
batch.Put(key, Value(i, &value_space)); batch.Put(key, Value(i, &value_space));
ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(db_->Write(WriteOptions(), &batch));
} }
} }
void Build(int n, int flush_every = 0) { Build(n, 0, flush_every); }
void Check(int min_expected, int max_expected) { void Check(int min_expected, int max_expected) {
uint64_t next_expected = 0; uint64_t next_expected = 0;
uint64_t missed = 0; uint64_t missed = 0;
@ -614,6 +616,102 @@ TEST_F(CorruptionTest, ParaniodFileChecksOnCompact) {
} }
} }
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
Options options;
options.paranoid_file_checks = true;
options.create_if_missing = true;
for (bool do_flush : {true, false}) {
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, &db_));
std::string start, end;
assert(db_ != nullptr);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(3, &start), Key(7, &end)));
auto snap = db_->GetSnapshot();
ASSERT_NE(snap, nullptr);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(8, &start), Key(9, &end)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(2, &start), Key(5, &end)));
Build(10);
if (do_flush) {
ASSERT_OK(db_->Flush(FlushOptions()));
} else {
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
}
db_->ReleaseSnapshot(snap);
}
}
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
Options options;
options.paranoid_file_checks = true;
options.create_if_missing = true;
for (bool do_flush : {true, false}) {
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr);
Build(10, 0, 0);
std::string start, end;
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(5, &start), Key(15, &end)));
auto snap = db_->GetSnapshot();
ASSERT_NE(snap, nullptr);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(8, &start), Key(9, &end)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(12, &start), Key(17, &end)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(2, &start), Key(4, &end)));
Build(10, 10, 0);
if (do_flush) {
ASSERT_OK(db_->Flush(FlushOptions()));
} else {
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
}
db_->ReleaseSnapshot(snap);
}
}
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
Options options;
options.paranoid_file_checks = true;
options.create_if_missing = true;
for (bool do_flush : {true, false}) {
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr);
std::string start, end;
Build(10);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(3, &start), Key(7, &end)));
auto snap = db_->GetSnapshot();
ASSERT_NE(snap, nullptr);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(6, &start), Key(8, &end)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(2, &start), Key(5, &end)));
if (do_flush) {
ASSERT_OK(db_->Flush(FlushOptions()));
} else {
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
}
db_->ReleaseSnapshot(snap);
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {