Drop unnecessary deletion markers during compaction (issue - 3842) (#4289)
Summary: This PR fixes issue 3842. We drop deletion markers iff 1. We are the bottom most level AND 2. All other occurrences of the key are in the same snapshot range as the delete I've also enhanced db_stress_test to add an option that does a full compare of the keys. This is done by a single thread (thread # 0). For tests I've run (so far) make check -j64 db_stress db_stress --acquire_snapshot_one_in=1000 --ops_per_thread=100000 /* to verify that new code doesnt break existing tests */ ./db_stress --compare_full_db_state_snapshot=true --acquire_snapshot_one_in=1000 --ops_per_thread=100000 /* to verify new test code */ Pull Request resolved: https://github.com/facebook/rocksdb/pull/4289 Differential Revision: D9491165 Pulled By: shrikanthshankar fbshipit-source-id: ce144834f31736c189aaca81bed356ba990331e2
This commit is contained in:
parent
8022500ecc
commit
4848bd0c4e
@ -505,6 +505,31 @@ void CompactionIterator::NextFromInput() {
|
||||
++iter_stats_.num_optimized_del_drop_obsolete;
|
||||
}
|
||||
input_->Next();
|
||||
} else if ((ikey_.type == kTypeDeletion) && bottommost_level_ &&
|
||||
ikeyNotNeededForIncrementalSnapshot()) {
|
||||
// Handle the case where we have a delete key at the bottom most level
|
||||
// We can skip outputting the key iff there are no subsequent puts for this
|
||||
// key
|
||||
ParsedInternalKey next_ikey;
|
||||
input_->Next();
|
||||
// Skip over all versions of this key that happen to occur in the same snapshot
|
||||
// range as the delete
|
||||
while (input_->Valid() &&
|
||||
ParseInternalKey(input_->key(), &next_ikey) &&
|
||||
cmp_->Equal(ikey_.user_key, next_ikey.user_key) &&
|
||||
(prev_snapshot == 0 || next_ikey.sequence > prev_snapshot ||
|
||||
(snapshot_checker_ != nullptr &&
|
||||
UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence,
|
||||
prev_snapshot))))) {
|
||||
input_->Next();
|
||||
}
|
||||
// If you find you still need to output a row with this key, we need to output the
|
||||
// delete too
|
||||
if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
|
||||
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
|
||||
valid_ = true;
|
||||
at_next_ = true;
|
||||
}
|
||||
} else if (ikey_.type == kTypeMerge) {
|
||||
if (!merge_helper_->HasOperator()) {
|
||||
status_ = Status::InvalidArgument(
|
||||
|
@ -671,8 +671,12 @@ TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
|
||||
TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) {
|
||||
AddSnapshot(1);
|
||||
RunTest({test::KeyStr("a", 1, kTypeDeletion),
|
||||
test::KeyStr("b", 2, kTypeDeletion)},
|
||||
{"", ""}, {test::KeyStr("b", 2, kTypeDeletion)}, {""},
|
||||
test::KeyStr("b", 3, kTypeDeletion),
|
||||
test::KeyStr("b", 1, kTypeValue)},
|
||||
{"", "", ""},
|
||||
{test::KeyStr("b", 3, kTypeDeletion),
|
||||
test::KeyStr("b", 0, kTypeValue)},
|
||||
{"", ""},
|
||||
kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
|
||||
nullptr /*compaction_filter*/, true /*bottommost_level*/);
|
||||
}
|
||||
@ -841,13 +845,26 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
|
||||
{test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion),
|
||||
test::KeyStr("c", 3, kTypeDeletion)},
|
||||
{"", "", ""},
|
||||
{test::KeyStr("b", 2, kTypeDeletion),
|
||||
test::KeyStr("c", 3, kTypeDeletion)},
|
||||
{},
|
||||
{"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
|
||||
nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
|
||||
true /*bottommost_level*/);
|
||||
}
|
||||
|
||||
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
|
||||
NotRemoveDeletionIfValuePresentToEarlierSnapshot) {
|
||||
AddSnapshot(2,1);
|
||||
RunTest(
|
||||
{test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 1, kTypeValue),
|
||||
test::KeyStr("b", 3, kTypeValue)},
|
||||
{"", "", ""},
|
||||
{test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 0, kTypeValue),
|
||||
test::KeyStr("b", 3, kTypeValue)},
|
||||
{"", "", ""}, kMaxSequenceNumber /*last_commited_seq*/,
|
||||
nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
|
||||
true /*bottommost_level*/);
|
||||
}
|
||||
|
||||
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
|
||||
NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) {
|
||||
AddSnapshot(2, 1);
|
||||
|
@ -419,6 +419,10 @@ DEFINE_int32(acquire_snapshot_one_in, 0,
|
||||
"If non-zero, then acquires a snapshot once every N operations on "
|
||||
"average.");
|
||||
|
||||
DEFINE_bool(compare_full_db_state_snapshot, false,
|
||||
"If set we compare state of entire db (in one of the threads) with"
|
||||
"each snapshot.");
|
||||
|
||||
DEFINE_uint64(snapshot_hold_ops, 0,
|
||||
"If non-zero, then releases snapshots N operations after they're "
|
||||
"acquired.");
|
||||
@ -651,6 +655,18 @@ static std::string Key(int64_t val) {
|
||||
return big_endian_key;
|
||||
}
|
||||
|
||||
static bool GetIntVal(std::string big_endian_key, uint64_t *key_p) {
|
||||
unsigned int size_key = sizeof(*key_p);
|
||||
assert(big_endian_key.size() == size_key);
|
||||
std::string little_endian_key;
|
||||
little_endian_key.resize(size_key);
|
||||
for (size_t i = 0 ; i < size_key; ++i) {
|
||||
little_endian_key[i] = big_endian_key[size_key - 1 - i];
|
||||
}
|
||||
Slice little_endian_slice = Slice(little_endian_key);
|
||||
return GetFixed64(&little_endian_slice, key_p);
|
||||
}
|
||||
|
||||
static std::string StringToHex(const std::string& str) {
|
||||
std::string result = "0x";
|
||||
result.append(Slice(str).ToString(true));
|
||||
@ -1207,6 +1223,8 @@ struct ThreadState {
|
||||
Status status;
|
||||
// The value of the Get
|
||||
std::string value;
|
||||
// optional state of all keys in the db
|
||||
std::vector<bool> *key_vec;
|
||||
};
|
||||
std::queue<std::pair<uint64_t, SnapshotState> > snapshot_queue;
|
||||
|
||||
@ -1713,6 +1731,21 @@ class StressTest {
|
||||
")");
|
||||
}
|
||||
}
|
||||
if (snap_state.key_vec != nullptr) {
|
||||
std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
|
||||
std::unique_ptr<std::vector<bool>> tmp_bitvec(new std::vector<bool>(FLAGS_max_key));
|
||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
||||
uint64_t key_val;
|
||||
if (GetIntVal(iterator->key().ToString(), &key_val)) {
|
||||
(*tmp_bitvec.get())[key_val] = true;
|
||||
}
|
||||
}
|
||||
if (!std::equal(snap_state.key_vec->begin(),
|
||||
snap_state.key_vec->end(),
|
||||
tmp_bitvec.get()->begin())) {
|
||||
return Status::Corruption("Found inconsistent keys at this snapshot");
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -1796,6 +1829,7 @@ class StressTest {
|
||||
while (!thread->snapshot_queue.empty()) {
|
||||
db_->ReleaseSnapshot(
|
||||
thread->snapshot_queue.front().second.snapshot);
|
||||
delete thread->snapshot_queue.front().second.key_vec;
|
||||
thread->snapshot_queue.pop();
|
||||
}
|
||||
thread->shared->IncVotedReopen();
|
||||
@ -1970,9 +2004,23 @@ class StressTest {
|
||||
// will later read the same key before releasing the snapshot and verify
|
||||
// that the results are the same.
|
||||
auto status_at = db_->Get(ropt, column_family, key, &value_at);
|
||||
std::vector<bool> *key_vec = nullptr;
|
||||
|
||||
if (FLAGS_compare_full_db_state_snapshot &&
|
||||
(thread->tid == 0)) {
|
||||
key_vec = new std::vector<bool>(FLAGS_max_key);
|
||||
std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
|
||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
||||
uint64_t key_val;
|
||||
if (GetIntVal(iterator->key().ToString(), &key_val)) {
|
||||
(*key_vec)[key_val] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ThreadState::SnapshotState snap_state = {
|
||||
snapshot, rand_column_family, column_family->GetName(),
|
||||
keystr, status_at, value_at};
|
||||
keystr, status_at, value_at, key_vec};
|
||||
thread->snapshot_queue.emplace(
|
||||
std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops),
|
||||
snap_state);
|
||||
@ -1990,6 +2038,7 @@ class StressTest {
|
||||
VerificationAbort(shared, "Snapshot gave inconsistent state", s);
|
||||
}
|
||||
db_->ReleaseSnapshot(snap_state.snapshot);
|
||||
delete snap_state.key_vec;
|
||||
thread->snapshot_queue.pop();
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user