Have a way for compaction filter to ignore snapshots
Summary: Provide an API for compaction filter to specify that it needs to be applied even if there are snapshots. Test Plan: DBTestCompactionFilter.CompactionFilterIgnoreSnapshot Reviewers: yhchiang, IslamAbdelRahman, sdong, anthony Reviewed By: anthony Subscribers: yoshinorim, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D51087
This commit is contained in:
parent
88e0527724
commit
81be49c755
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
### New Features
|
### New Features
|
||||||
|
* CompactionFilter has new member function called IgnoreSnapshots which allows CompactionFilter to be called even if there are snapshots later than the key.
|
||||||
* RocksDB will now persist options under the same directory as the RocksDB database on successful DB::Open, CreateColumnFamily, DropColumnFamily, and SetOptions.
|
* RocksDB will now persist options under the same directory as the RocksDB database on successful DB::Open, CreateColumnFamily, DropColumnFamily, and SetOptions.
|
||||||
* Introduce LoadLatestOptions() in rocksdb/utilities/options_util.h. This function can construct the latest DBOptions / ColumnFamilyOptions used by the specified RocksDB intance.
|
* Introduce LoadLatestOptions() in rocksdb/utilities/options_util.h. This function can construct the latest DBOptions / ColumnFamilyOptions used by the specified RocksDB intance.
|
||||||
* Introduce CheckOptionsCompatibility() in rocksdb/utilities/options_util.h. This function checks whether the input set of options is able to open the specified DB successfully.
|
* Introduce CheckOptionsCompatibility() in rocksdb/utilities/options_util.h. This function checks whether the input set of options is able to open the specified DB successfully.
|
||||||
|
@ -42,6 +42,11 @@ CompactionIterator::CompactionIterator(
|
|||||||
earliest_snapshot_ = snapshots_->at(0);
|
earliest_snapshot_ = snapshots_->at(0);
|
||||||
latest_snapshot_ = snapshots_->back();
|
latest_snapshot_ = snapshots_->back();
|
||||||
}
|
}
|
||||||
|
if (compaction_filter_ != nullptr && compaction_filter_->IgnoreSnapshots()) {
|
||||||
|
ignore_snapshots_ = true;
|
||||||
|
} else {
|
||||||
|
ignore_snapshots_ = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CompactionIterator::ResetRecordCounts() {
|
void CompactionIterator::ResetRecordCounts() {
|
||||||
@ -140,7 +145,8 @@ void CompactionIterator::NextFromInput() {
|
|||||||
current_user_key_snapshot_ = 0;
|
current_user_key_snapshot_ = 0;
|
||||||
// apply the compaction filter to the first occurrence of the user key
|
// apply the compaction filter to the first occurrence of the user key
|
||||||
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
|
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
|
||||||
(visible_at_tip_ || ikey_.sequence > latest_snapshot_)) {
|
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
|
||||||
|
ignore_snapshots_)) {
|
||||||
// If the user has specified a compaction filter and the sequence
|
// If the user has specified a compaction filter and the sequence
|
||||||
// number is greater than any external snapshot, then invoke the
|
// number is greater than any external snapshot, then invoke the
|
||||||
// filter. If the return value of the compaction filter is true,
|
// filter. If the return value of the compaction filter is true,
|
||||||
@ -170,6 +176,9 @@ void CompactionIterator::NextFromInput() {
|
|||||||
} else {
|
} else {
|
||||||
// Update the current key to reflect the new sequence number/type without
|
// Update the current key to reflect the new sequence number/type without
|
||||||
// copying the user key.
|
// copying the user key.
|
||||||
|
// TODO(rven): Compaction filter does not process keys in this path
|
||||||
|
// Need to have the compaction filter process multiple versions
|
||||||
|
// if we have versions on both sides of a snapshot
|
||||||
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
|
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
|
||||||
key_ = current_key_.GetKey();
|
key_ = current_key_.GetKey();
|
||||||
ikey_.user_key = current_key_.GetUserKey();
|
ikey_.user_key = current_key_.GetUserKey();
|
||||||
|
@ -98,6 +98,7 @@ class CompactionIterator {
|
|||||||
SequenceNumber visible_at_tip_;
|
SequenceNumber visible_at_tip_;
|
||||||
SequenceNumber earliest_snapshot_;
|
SequenceNumber earliest_snapshot_;
|
||||||
SequenceNumber latest_snapshot_;
|
SequenceNumber latest_snapshot_;
|
||||||
|
bool ignore_snapshots_;
|
||||||
|
|
||||||
// State
|
// State
|
||||||
//
|
//
|
||||||
|
@ -47,6 +47,24 @@ class DeleteFilter : public CompactionFilter {
|
|||||||
virtual const char* Name() const override { return "DeleteFilter"; }
|
virtual const char* Name() const override { return "DeleteFilter"; }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class DeleteISFilter : public CompactionFilter {
|
||||||
|
public:
|
||||||
|
virtual bool Filter(int level, const Slice& key, const Slice& value,
|
||||||
|
std::string* new_value,
|
||||||
|
bool* value_changed) const override {
|
||||||
|
cfilter_count++;
|
||||||
|
int i = std::stoi(key.ToString());
|
||||||
|
if (i > 5 && i <= 105) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual bool IgnoreSnapshots() const override { return true; }
|
||||||
|
|
||||||
|
virtual const char* Name() const override { return "DeleteFilter"; }
|
||||||
|
};
|
||||||
|
|
||||||
class DelayFilter : public CompactionFilter {
|
class DelayFilter : public CompactionFilter {
|
||||||
public:
|
public:
|
||||||
explicit DelayFilter(DBTestBase* d) : db_test(d) {}
|
explicit DelayFilter(DBTestBase* d) : db_test(d) {}
|
||||||
@ -141,6 +159,21 @@ class DeleteFilterFactory : public CompactionFilterFactory {
|
|||||||
virtual const char* Name() const override { return "DeleteFilterFactory"; }
|
virtual const char* Name() const override { return "DeleteFilterFactory"; }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Delete Filter Factory which ignores snapshots
|
||||||
|
class DeleteISFilterFactory : public CompactionFilterFactory {
|
||||||
|
public:
|
||||||
|
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||||
|
const CompactionFilter::Context& context) override {
|
||||||
|
if (context.is_manual_compaction) {
|
||||||
|
return std::unique_ptr<CompactionFilter>(new DeleteISFilter());
|
||||||
|
} else {
|
||||||
|
return std::unique_ptr<CompactionFilter>(nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override { return "DeleteFilterFactory"; }
|
||||||
|
};
|
||||||
|
|
||||||
class DelayFilterFactory : public CompactionFilterFactory {
|
class DelayFilterFactory : public CompactionFilterFactory {
|
||||||
public:
|
public:
|
||||||
explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
|
explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
|
||||||
@ -620,6 +653,68 @@ TEST_F(DBTestCompactionFilter, CompactionFilterSnapshot) {
|
|||||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||||
ASSERT_EQ(0U, CountLiveFiles());
|
ASSERT_EQ(0U, CountLiveFiles());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Compaction filters should only be applied to records that are newer than the
|
||||||
|
// latest snapshot. However, if the compaction filter asks to ignore snapshots
|
||||||
|
// records newer than the snapshot will also be processed
|
||||||
|
TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
|
||||||
|
std::string five = ToString(5);
|
||||||
|
Options options;
|
||||||
|
options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>();
|
||||||
|
options.disable_auto_compactions = true;
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options = CurrentOptions(options);
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
// Put some data.
|
||||||
|
const Snapshot* snapshot = nullptr;
|
||||||
|
for (int table = 0; table < 4; ++table) {
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
Put(ToString(table * 100 + i), "val");
|
||||||
|
}
|
||||||
|
Flush();
|
||||||
|
|
||||||
|
if (table == 0) {
|
||||||
|
snapshot = db_->GetSnapshot();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert(snapshot != nullptr);
|
||||||
|
|
||||||
|
cfilter_count = 0;
|
||||||
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||||
|
// The filter should delete 40 records.
|
||||||
|
ASSERT_EQ(40U, cfilter_count);
|
||||||
|
|
||||||
|
{
|
||||||
|
// Scan the entire database as of the snapshot to ensure
|
||||||
|
// that nothing is left
|
||||||
|
ReadOptions read_options;
|
||||||
|
read_options.snapshot = snapshot;
|
||||||
|
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
|
||||||
|
iter->SeekToFirst();
|
||||||
|
int count = 0;
|
||||||
|
while (iter->Valid()) {
|
||||||
|
count++;
|
||||||
|
iter->Next();
|
||||||
|
}
|
||||||
|
ASSERT_EQ(count, 6);
|
||||||
|
read_options.snapshot = 0;
|
||||||
|
std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options));
|
||||||
|
iter1->SeekToFirst();
|
||||||
|
count = 0;
|
||||||
|
while (iter1->Valid()) {
|
||||||
|
count++;
|
||||||
|
iter1->Next();
|
||||||
|
}
|
||||||
|
// We have deleted 10 keys from 40 using the compaction filter
|
||||||
|
// Keys 6-9 before the snapshot and 100-105 after the snapshot
|
||||||
|
ASSERT_EQ(count, 30);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release the snapshot and compact again -> now all records should be
|
||||||
|
// removed.
|
||||||
|
db_->ReleaseSnapshot(snapshot);
|
||||||
|
}
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -98,6 +98,16 @@ class CompactionFilter {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// By default, compaction will only call Filter() on keys written after the
|
||||||
|
// most recent call to GetSnapshot(). However, if the compaction filter
|
||||||
|
// overrides IgnoreSnapshots to make it return false, the compaction filter
|
||||||
|
// will be called even if the keys were written before the last snapshot.
|
||||||
|
// This behavior is to be used only when we want to delete a set of keys
|
||||||
|
// irrespective of snapshots. In particular, care should be taken
|
||||||
|
// to understand that the values of thesekeys will change even if we are
|
||||||
|
// using a snapshot.
|
||||||
|
virtual bool IgnoreSnapshots() const { return false; }
|
||||||
|
|
||||||
// Returns a name that identifies this compaction filter.
|
// Returns a name that identifies this compaction filter.
|
||||||
// The name will be printed to LOG file on start up for diagnosis.
|
// The name will be printed to LOG file on start up for diagnosis.
|
||||||
virtual const char* Name() const = 0;
|
virtual const char* Name() const = 0;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user