Fixed a bug in CompactionIterator when write-preared transaction is used (#9060)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9060 RocksDB bottommost level compaction may zero out an internal key's sequence if the key's sequence is in the earliest_snapshot. In write-prepared transaction, checking the visibility of a certain sequence in a specific released snapshot may return a "snapshot released" result. Therefore, it is possible, after a certain sequence of events, a PUT has its sequence zeroed out, but a subsequent SingleDelete of the same key will still be output with its original sequence. This violates the ascending order of keys and leads to incorrect result. The solution is to use an extra variable `last_key_seq_zeroed_` to track the information about visibility in earliest snapshot. With this variable, we can know for sure that a SingleDelete is in the earliest snapshot even if the said snapshot is released during compaction before processing the SD. Reviewed By: ltamasi Differential Revision: D31813016 fbshipit-source-id: d8cff59d6f34e0bdf282614034aaea99be9174e1
This commit is contained in:
parent
56810142a5
commit
9b53f14a35
@ -9,6 +9,7 @@
|
||||
* Fixed a bug in CompactionIterator when write-prepared transaction is used. A released earliest write conflict snapshot may cause assertion failure in dbg mode and unexpected key in opt mode.
|
||||
* Fix ticker WRITE_WITH_WAL("rocksdb.write.wal"), this bug is caused by a bad extra `RecordTick(stats_, WRITE_WITH_WAL)` (at 2 place), this fix remove the extra `RecordTick`s and fix the corresponding test case.
|
||||
* EventListener::OnTableFileCreated was previously called with OK status and file_size==0 in cases of no SST file contents written (because there was no content to add) and the empty file deleted before calling the listener. Now the status is Aborted.
|
||||
* Fixed a bug in CompactionIterator when write-preared transaction is used. Releasing earliest_snapshot during compaction may cause a SingleDelete to be output after a PUT of the same user key whose seq has been zeroed.
|
||||
|
||||
### Behavior Changes
|
||||
* `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files.
|
||||
|
@ -433,6 +433,8 @@ void CompactionIterator::NextFromInput() {
|
||||
|
||||
has_outputted_key_ = false;
|
||||
|
||||
last_key_seq_zeroed_ = false;
|
||||
|
||||
current_key_committed_ = KeyCommitted(ikey_.sequence);
|
||||
|
||||
// Apply the compaction filter to the first committed version of the user
|
||||
@ -594,10 +596,16 @@ void CompactionIterator::NextFromInput() {
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"CompactionIterator::NextFromInput:SingleDelete:1",
|
||||
const_cast<Compaction*>(c));
|
||||
// Check whether the next key belongs to the same snapshot as the
|
||||
// SingleDelete.
|
||||
if (prev_snapshot == 0 ||
|
||||
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
|
||||
if (last_key_seq_zeroed_) {
|
||||
++iter_stats_.num_record_drop_hidden;
|
||||
++iter_stats_.num_record_drop_obsolete;
|
||||
assert(bottommost_level_);
|
||||
AdvanceInputIter();
|
||||
} else if (prev_snapshot == 0 ||
|
||||
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
|
||||
// Check whether the next key belongs to the same snapshot as the
|
||||
// SingleDelete.
|
||||
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
|
||||
if (next_ikey.type == kTypeSingleDeletion) {
|
||||
@ -661,6 +669,9 @@ void CompactionIterator::NextFromInput() {
|
||||
// We hit the next snapshot without hitting a put, so the iterator
|
||||
// returns the single delete.
|
||||
valid_ = true;
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"CompactionIterator::NextFromInput:SingleDelete:3",
|
||||
const_cast<Compaction*>(c));
|
||||
}
|
||||
} else {
|
||||
// We are at the end of the input, could not parse the next key, or hit
|
||||
@ -683,6 +694,11 @@ void CompactionIterator::NextFromInput() {
|
||||
if (!bottommost_level_) {
|
||||
++iter_stats_.num_optimized_del_drop_obsolete;
|
||||
}
|
||||
} else if (last_key_seq_zeroed_) {
|
||||
// Skip.
|
||||
++iter_stats_.num_record_drop_hidden;
|
||||
++iter_stats_.num_record_drop_obsolete;
|
||||
assert(bottommost_level_);
|
||||
} else {
|
||||
// Output SingleDelete
|
||||
valid_ = true;
|
||||
@ -1038,6 +1054,9 @@ void CompactionIterator::PrepareOutput() {
|
||||
ikey_.type);
|
||||
}
|
||||
ikey_.sequence = 0;
|
||||
last_key_seq_zeroed_ = true;
|
||||
TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput:ZeroingSeq",
|
||||
&ikey_);
|
||||
if (!timestamp_size_) {
|
||||
current_key_.UpdateInternalKey(0, ikey_.type);
|
||||
} else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
|
||||
|
@ -401,6 +401,10 @@ class CompactionIterator {
|
||||
|
||||
const int level_;
|
||||
|
||||
// True if the previous internal key (same user key)'s sequence number has
|
||||
// just been zeroed out during bottommost compaction.
|
||||
bool last_key_seq_zeroed_{false};
|
||||
|
||||
void AdvanceInputIter() { input_.Next(); }
|
||||
|
||||
void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }
|
||||
|
@ -3067,6 +3067,138 @@ TEST_P(WritePreparedTransactionTest,
|
||||
db->ReleaseSnapshot(snapshot1);
|
||||
}
|
||||
|
||||
TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing) {
|
||||
constexpr size_t kSnapshotCacheBits = 7; // same as default
|
||||
constexpr size_t kCommitCacheBits = 0; // minimum commit cache
|
||||
UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
|
||||
options.disable_auto_compactions = true;
|
||||
ASSERT_OK(ReOpen());
|
||||
|
||||
ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
|
||||
ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
|
||||
ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
|
||||
ASSERT_OK(db->Flush(FlushOptions()));
|
||||
|
||||
{
|
||||
CompactRangeOptions cro;
|
||||
cro.change_level = true;
|
||||
cro.target_level = 2;
|
||||
ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
|
||||
}
|
||||
|
||||
ASSERT_OK(db->SingleDelete(WriteOptions(), "b"));
|
||||
|
||||
// Take a snapshot so that the SD won't be dropped during flush.
|
||||
auto* tmp_snapshot = db->GetSnapshot();
|
||||
|
||||
ASSERT_OK(db->Put(WriteOptions(), "b", "value2"));
|
||||
auto* snapshot = db->GetSnapshot();
|
||||
ASSERT_OK(db->Flush(FlushOptions()));
|
||||
|
||||
db->ReleaseSnapshot(tmp_snapshot);
|
||||
|
||||
// Bump the sequence so that the below bg compaction job's snapshot will be
|
||||
// different from snapshot's sequence.
|
||||
ASSERT_OK(db->Put(WriteOptions(), "z", "foo"));
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) {
|
||||
const auto* const ikey =
|
||||
reinterpret_cast<const ParsedInternalKey*>(arg);
|
||||
assert(ikey);
|
||||
if (ikey->user_key == "b") {
|
||||
assert(ikey->type == kTypeValue);
|
||||
db->ReleaseSnapshot(snapshot);
|
||||
|
||||
// Bump max_evicted_seq.
|
||||
ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
|
||||
}
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
|
||||
/*end=*/nullptr));
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing2) {
|
||||
constexpr size_t kSnapshotCacheBits = 7; // same as default
|
||||
constexpr size_t kCommitCacheBits = 0; // minimum commit cache
|
||||
UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
|
||||
options.disable_auto_compactions = true;
|
||||
ASSERT_OK(ReOpen());
|
||||
|
||||
// Generate an L0 with only SD for one key "b".
|
||||
ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
|
||||
ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
|
||||
// Take a snapshot so that subsequent flush outputs the SD for "b".
|
||||
auto* tmp_snapshot = db->GetSnapshot();
|
||||
ASSERT_OK(db->SingleDelete(WriteOptions(), "b"));
|
||||
ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionIterator::NextFromInput:SingleDelete:3", [&](void* arg) {
|
||||
if (!arg) {
|
||||
db->ReleaseSnapshot(tmp_snapshot);
|
||||
// Bump max_evicted_seq
|
||||
ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
|
||||
}
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(db->Flush(FlushOptions()));
|
||||
// Finish generating L0 with only SD for "b".
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
|
||||
// Move the L0 to L2.
|
||||
{
|
||||
CompactRangeOptions cro;
|
||||
cro.change_level = true;
|
||||
cro.target_level = 2;
|
||||
ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
|
||||
}
|
||||
|
||||
ASSERT_OK(db->Put(WriteOptions(), "b", "value1"));
|
||||
|
||||
auto* snapshot = db->GetSnapshot();
|
||||
|
||||
// Bump seq so that a subsequent flush/compaction job's snapshot is larger
|
||||
// than the above snapshot's seq.
|
||||
ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
|
||||
|
||||
// Generate a second L0.
|
||||
ASSERT_OK(db->Flush(FlushOptions()));
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) {
|
||||
const auto* const ikey =
|
||||
reinterpret_cast<const ParsedInternalKey*>(arg);
|
||||
assert(ikey);
|
||||
if (ikey->user_key == "b") {
|
||||
assert(ikey->type == kTypeValue);
|
||||
db->ReleaseSnapshot(snapshot);
|
||||
|
||||
// Bump max_evicted_seq.
|
||||
ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
|
||||
}
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
|
||||
/*end=*/nullptr));
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
// A more complex test to verify compaction/flush should keep keys visible
|
||||
// to snapshots.
|
||||
TEST_P(WritePreparedTransactionTest,
|
||||
|
Loading…
Reference in New Issue
Block a user