[RocksDB] Compaction Filter Cleanup
Summary: This hopefully gives the right semantics to compaction filter. Will write a small wiki to explain the ideas. Test Plan: make check; db_stress Reviewers: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D11121
This commit is contained in:
parent
7a5f71d19a
commit
1afdf28701
@ -1641,12 +1641,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
|||||||
|
|
||||||
SequenceNumber visible_at_tip = 0;
|
SequenceNumber visible_at_tip = 0;
|
||||||
SequenceNumber earliest_snapshot;
|
SequenceNumber earliest_snapshot;
|
||||||
|
SequenceNumber latest_snapshot = 0;
|
||||||
snapshots_.getAll(compact->existing_snapshots);
|
snapshots_.getAll(compact->existing_snapshots);
|
||||||
if (compact->existing_snapshots.size() == 0) {
|
if (compact->existing_snapshots.size() == 0) {
|
||||||
// optimize for fast path if there are no snapshots
|
// optimize for fast path if there are no snapshots
|
||||||
visible_at_tip = versions_->LastSequence();
|
visible_at_tip = versions_->LastSequence();
|
||||||
earliest_snapshot = visible_at_tip;
|
earliest_snapshot = visible_at_tip;
|
||||||
} else {
|
} else {
|
||||||
|
latest_snapshot = compact->existing_snapshots.back();
|
||||||
// Add the current seqno as the 'latest' virtual
|
// Add the current seqno as the 'latest' virtual
|
||||||
// snapshot to the end of this list.
|
// snapshot to the end of this list.
|
||||||
compact->existing_snapshots.push_back(versions_->LastSequence());
|
compact->existing_snapshots.push_back(versions_->LastSequence());
|
||||||
@ -1680,6 +1682,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
|||||||
kMaxSequenceNumber;
|
kMaxSequenceNumber;
|
||||||
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
|
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
|
||||||
std::string compaction_filter_value;
|
std::string compaction_filter_value;
|
||||||
|
std::vector<char> delete_key; // for compaction filter
|
||||||
MergeHelper merge(user_comparator(), options_.merge_operator,
|
MergeHelper merge(user_comparator(), options_.merge_operator,
|
||||||
options_.info_log.get(),
|
options_.info_log.get(),
|
||||||
false /* internal key corruption is expected */);
|
false /* internal key corruption is expected */);
|
||||||
@ -1727,6 +1730,41 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
|||||||
has_current_user_key = true;
|
has_current_user_key = true;
|
||||||
last_sequence_for_key = kMaxSequenceNumber;
|
last_sequence_for_key = kMaxSequenceNumber;
|
||||||
visible_in_snapshot = kMaxSequenceNumber;
|
visible_in_snapshot = kMaxSequenceNumber;
|
||||||
|
|
||||||
|
// apply the compaction filter to the first occurrence of the user key
|
||||||
|
if (options_.compaction_filter &&
|
||||||
|
ikey.type == kTypeValue &&
|
||||||
|
(visible_at_tip || ikey.sequence > latest_snapshot)) {
|
||||||
|
// If the user has specified a compaction filter and the sequence
|
||||||
|
// number is greater than any external snapshot, then invoke the
|
||||||
|
// filter.
|
||||||
|
// If the return value of the compaction filter is true, replace
|
||||||
|
// the entry with a delete marker.
|
||||||
|
bool value_changed = false;
|
||||||
|
compaction_filter_value.clear();
|
||||||
|
bool to_delete =
|
||||||
|
options_.compaction_filter->Filter(compact->compaction->level(),
|
||||||
|
ikey.user_key, value,
|
||||||
|
&compaction_filter_value,
|
||||||
|
&value_changed);
|
||||||
|
if (to_delete) {
|
||||||
|
// make a copy of the original key
|
||||||
|
delete_key.assign(key.data(), key.data() + key.size());
|
||||||
|
// convert it to a delete
|
||||||
|
UpdateInternalKey(&delete_key[0], delete_key.size(),
|
||||||
|
ikey.sequence, kTypeDeletion);
|
||||||
|
// anchor the key again
|
||||||
|
key = Slice(&delete_key[0], delete_key.size());
|
||||||
|
// needed because ikey is backed by key
|
||||||
|
ParseInternalKey(key, &ikey);
|
||||||
|
// no value associated with delete
|
||||||
|
value.clear();
|
||||||
|
RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER);
|
||||||
|
} else if (value_changed) {
|
||||||
|
value = compaction_filter_value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are no snapshots, then this kv affect visibility at tip.
|
// If there are no snapshots, then this kv affect visibility at tip.
|
||||||
@ -1773,31 +1811,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
|||||||
key = merge.key();
|
key = merge.key();
|
||||||
ParseInternalKey(key, &ikey);
|
ParseInternalKey(key, &ikey);
|
||||||
value = merge.value();
|
value = merge.value();
|
||||||
} else if (options_.compaction_filter &&
|
|
||||||
ikey.type != kTypeDeletion &&
|
|
||||||
visible_at_tip) {
|
|
||||||
// If the user has specified a compaction filter and there are no
|
|
||||||
// outstanding external snapshots, then invoke the filter.
|
|
||||||
// If the return value of the compaction filter is true,
|
|
||||||
// drop this key from the output.
|
|
||||||
assert(ikey.type == kTypeValue);
|
|
||||||
assert(!drop);
|
|
||||||
bool value_changed = false;
|
|
||||||
compaction_filter_value.clear();
|
|
||||||
drop = options_.compaction_filter->Filter(compact->compaction->level(),
|
|
||||||
ikey.user_key, value,
|
|
||||||
&compaction_filter_value,
|
|
||||||
&value_changed);
|
|
||||||
// Another example of statistics update without holding the lock
|
|
||||||
// TODO: clean it up
|
|
||||||
if (drop) {
|
|
||||||
RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER);
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the application wants to change the value, then do so here.
|
|
||||||
if (value_changed) {
|
|
||||||
value = compaction_filter_value;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
last_sequence_for_key = ikey.sequence;
|
last_sequence_for_key = ikey.sequence;
|
||||||
|
Loading…
Reference in New Issue
Block a user