Deprecate CompactionFilter::IgnoreSnapshots() = false (#4954)
Summary: We found that the behavior of CompactionFilter::IgnoreSnapshots() = false isn't what we have expected. We thought that snapshot will always be preserved. However, we just realized that, if no snapshot is created while compaction starts, and a snapshot is created after that, the data seen from the snapshot can successfully be dropped by the compaction. This creates a strange behavior to the feature, which is hard to explain. Like what is documented in code comment, this feature is not very useful with snapshot anyway. The decision is to deprecate the feature. We keep the function to avoid to break users code. However, we will fail compactions if false is returned. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4954 Differential Revision: D13981900 Pulled By: siying fbshipit-source-id: 2db8c2c3865acd86a28dca625945d1481b1d1e36
This commit is contained in:
parent
cf3a671733
commit
f48758e939
@ -9,6 +9,7 @@
|
|||||||
* Add support for block checksums verification for external SST files before ingestion.
|
* Add support for block checksums verification for external SST files before ingestion.
|
||||||
|
|
||||||
### Public API Change
|
### Public API Change
|
||||||
|
* Disallow CompactionFilter::IgnoreSnapshots() = false, because it is not very useful and the behavior is confusing. The filter will filter everything if there is no snapshot declared by the time the compaction starts. However, users can define a snapshot after the compaction starts and before it finishes and this new snapshot won't be repeatable, because after the compaction finishes, some keys may be dropped.
|
||||||
* CompactionPri = kMinOverlappingRatio also uses compensated file size, which boosts file with lots of tombstones to be compacted first.
|
* CompactionPri = kMinOverlappingRatio also uses compensated file size, which boosts file with lots of tombstones to be compacted first.
|
||||||
* Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate.
|
* Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate.
|
||||||
* `TableProperties::num_entries` and `TableProperties::num_deletions` now also account for number of range tombstones.
|
* `TableProperties::num_entries` and `TableProperties::num_deletions` now also account for number of range tombstones.
|
||||||
|
2
db/c.cc
2
db/c.cc
@ -2866,7 +2866,7 @@ rocksdb_compactionfilter_t* rocksdb_compactionfilter_create(
|
|||||||
result->state_ = state;
|
result->state_ = state;
|
||||||
result->destructor_ = destructor;
|
result->destructor_ = destructor;
|
||||||
result->filter_ = filter;
|
result->filter_ = filter;
|
||||||
result->ignore_snapshots_ = false;
|
result->ignore_snapshots_ = true;
|
||||||
result->name_ = name;
|
result->name_ = name;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,6 @@ CompactionIterator::CompactionIterator(
|
|||||||
compaction_filter_(compaction_filter),
|
compaction_filter_(compaction_filter),
|
||||||
shutting_down_(shutting_down),
|
shutting_down_(shutting_down),
|
||||||
preserve_deletes_seqnum_(preserve_deletes_seqnum),
|
preserve_deletes_seqnum_(preserve_deletes_seqnum),
|
||||||
ignore_snapshots_(false),
|
|
||||||
current_user_key_sequence_(0),
|
current_user_key_sequence_(0),
|
||||||
current_user_key_snapshot_(0),
|
current_user_key_snapshot_(0),
|
||||||
merge_out_iter_(merge_helper_),
|
merge_out_iter_(merge_helper_),
|
||||||
@ -102,13 +101,6 @@ CompactionIterator::CompactionIterator(
|
|||||||
assert(snapshots_->at(i - 1) < snapshots_->at(i));
|
assert(snapshots_->at(i - 1) < snapshots_->at(i));
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if (compaction_filter_ != nullptr) {
|
|
||||||
if (compaction_filter_->IgnoreSnapshots()) {
|
|
||||||
ignore_snapshots_ = true;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ignore_snapshots_ = false;
|
|
||||||
}
|
|
||||||
input_->SetPinnedItersMgr(&pinned_iters_mgr_);
|
input_->SetPinnedItersMgr(&pinned_iters_mgr_);
|
||||||
TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
|
TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
|
||||||
}
|
}
|
||||||
@ -180,9 +172,7 @@ void CompactionIterator::Next() {
|
|||||||
void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
|
void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
|
||||||
Slice* skip_until) {
|
Slice* skip_until) {
|
||||||
if (compaction_filter_ != nullptr &&
|
if (compaction_filter_ != nullptr &&
|
||||||
(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) &&
|
(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) {
|
||||||
(visible_at_tip_ || ignore_snapshots_ ||
|
|
||||||
DEFINITELY_NOT_IN_SNAPSHOT(ikey_.sequence, latest_snapshot_))) {
|
|
||||||
// If the user has specified a compaction filter and the sequence
|
// If the user has specified a compaction filter and the sequence
|
||||||
// number is greater than any external snapshot, then invoke the
|
// number is greater than any external snapshot, then invoke the
|
||||||
// filter. If the return value of the compaction filter is true,
|
// filter. If the return value of the compaction filter is true,
|
||||||
|
@ -168,8 +168,6 @@ class CompactionIterator {
|
|||||||
SequenceNumber earliest_snapshot_;
|
SequenceNumber earliest_snapshot_;
|
||||||
SequenceNumber latest_snapshot_;
|
SequenceNumber latest_snapshot_;
|
||||||
|
|
||||||
bool ignore_snapshots_;
|
|
||||||
|
|
||||||
// State
|
// State
|
||||||
//
|
//
|
||||||
// Points to a copy of the current compaction iterator output (current_key_)
|
// Points to a copy of the current compaction iterator output (current_key_)
|
||||||
|
@ -816,6 +816,24 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
|
uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
|
||||||
|
|
||||||
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
|
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
|
||||||
|
|
||||||
|
// Create compaction filter and fail the compaction if
|
||||||
|
// IgnoreSnapshots() = false because it is not supported anymore
|
||||||
|
const CompactionFilter* compaction_filter =
|
||||||
|
cfd->ioptions()->compaction_filter;
|
||||||
|
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
|
||||||
|
if (compaction_filter == nullptr) {
|
||||||
|
compaction_filter_from_factory =
|
||||||
|
sub_compact->compaction->CreateCompactionFilter();
|
||||||
|
compaction_filter = compaction_filter_from_factory.get();
|
||||||
|
}
|
||||||
|
if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
|
||||||
|
sub_compact->status = Status::NotSupported(
|
||||||
|
"CompactionFilter::IgnoreSnapshots() = false is not supported "
|
||||||
|
"anymore.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
|
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
|
||||||
existing_snapshots_);
|
existing_snapshots_);
|
||||||
|
|
||||||
@ -883,13 +901,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto compaction_filter = cfd->ioptions()->compaction_filter;
|
|
||||||
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
|
|
||||||
if (compaction_filter == nullptr) {
|
|
||||||
compaction_filter_from_factory =
|
|
||||||
sub_compact->compaction->CreateCompactionFilter();
|
|
||||||
compaction_filter = compaction_filter_from_factory.get();
|
|
||||||
}
|
|
||||||
MergeHelper merge(
|
MergeHelper merge(
|
||||||
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
|
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
|
||||||
compaction_filter, db_options_.info_log.get(),
|
compaction_filter, db_options_.info_log.get(),
|
||||||
|
@ -699,44 +699,7 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
// Compaction filters should only be applied to records that are newer than the
|
// Compaction filters aplies to all records, regardless snapshots.
|
||||||
// latest snapshot. This test inserts records and applies a delete filter.
|
|
||||||
TEST_F(DBTestCompactionFilter, CompactionFilterSnapshot) {
|
|
||||||
Options options = CurrentOptions();
|
|
||||||
options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
|
|
||||||
options.disable_auto_compactions = true;
|
|
||||||
options.create_if_missing = true;
|
|
||||||
DestroyAndReopen(options);
|
|
||||||
|
|
||||||
// Put some data.
|
|
||||||
const Snapshot* snapshot = nullptr;
|
|
||||||
for (int table = 0; table < 4; ++table) {
|
|
||||||
for (int i = 0; i < 10; ++i) {
|
|
||||||
Put(ToString(table * 100 + i), "val");
|
|
||||||
}
|
|
||||||
Flush();
|
|
||||||
|
|
||||||
if (table == 0) {
|
|
||||||
snapshot = db_->GetSnapshot();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert(snapshot != nullptr);
|
|
||||||
|
|
||||||
cfilter_count = 0;
|
|
||||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
||||||
// The filter should delete 10 records.
|
|
||||||
ASSERT_EQ(30U, cfilter_count);
|
|
||||||
|
|
||||||
// Release the snapshot and compact again -> now all records should be
|
|
||||||
// removed.
|
|
||||||
db_->ReleaseSnapshot(snapshot);
|
|
||||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
||||||
ASSERT_EQ(0U, CountLiveFiles());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compaction filters should only be applied to records that are newer than the
|
|
||||||
// latest snapshot. However, if the compaction filter asks to ignore snapshots
|
|
||||||
// records newer than the snapshot will also be processed
|
|
||||||
TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
|
TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
|
||||||
std::string five = ToString(5);
|
std::string five = ToString(5);
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
@ -874,6 +837,38 @@ TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) {
|
|||||||
EXPECT_EQ("v50", val);
|
EXPECT_EQ("v50", val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class TestNotSupportedFilter : public CompactionFilter {
|
||||||
|
public:
|
||||||
|
bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
|
||||||
|
std::string* /*new_value*/,
|
||||||
|
bool* /*value_changed*/) const override {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override { return "NotSupported"; }
|
||||||
|
bool IgnoreSnapshots() const override { return false; }
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalse) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.compaction_filter = new TestNotSupportedFilter();
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
Put("a", "v10");
|
||||||
|
Put("z", "v20");
|
||||||
|
Flush();
|
||||||
|
|
||||||
|
Put("a", "v10");
|
||||||
|
Put("z", "v20");
|
||||||
|
Flush();
|
||||||
|
|
||||||
|
// Comapction should fail because IgnoreSnapshots() = false
|
||||||
|
EXPECT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
|
||||||
|
.IsNotSupported());
|
||||||
|
|
||||||
|
delete options.compaction_filter;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -75,14 +75,11 @@ class CompactionFilter {
|
|||||||
// to modify the existing_value and pass it back through new_value.
|
// to modify the existing_value and pass it back through new_value.
|
||||||
// value_changed needs to be set to true in this case.
|
// value_changed needs to be set to true in this case.
|
||||||
//
|
//
|
||||||
// If you use snapshot feature of RocksDB (i.e. call GetSnapshot() API on a
|
// Note that RocksDB snapshots (i.e. call GetSnapshot() API on a
|
||||||
// DB* object), CompactionFilter might not be very useful for you. Due to
|
// DB* object) will not guarantee to preserve the state of the DB with
|
||||||
// guarantees we need to maintain, compaction process will not call Filter()
|
// CompactionFilter. Data seen from a snapshot might disppear after a
|
||||||
// on any keys that were written before the latest snapshot. In other words,
|
// compaction finishes. If you use snapshots, think twice about whether you
|
||||||
// compaction will only call Filter() on keys written after your most recent
|
// want to use compaction filter and whether you are using it in a safe way.
|
||||||
// call to GetSnapshot(). In most cases, Filter() will not be called very
|
|
||||||
// often. This is something we're fixing. See the discussion at:
|
|
||||||
// https://www.facebook.com/groups/mysqlonrocksdb/permalink/999723240091865/
|
|
||||||
//
|
//
|
||||||
// If multithreaded compaction is being used *and* a single CompactionFilter
|
// If multithreaded compaction is being used *and* a single CompactionFilter
|
||||||
// instance was supplied via Options::compaction_filter, this method may be
|
// instance was supplied via Options::compaction_filter, this method may be
|
||||||
@ -135,9 +132,9 @@ class CompactionFilter {
|
|||||||
//
|
//
|
||||||
// Caveats:
|
// Caveats:
|
||||||
// - The keys are skipped even if there are snapshots containing them,
|
// - The keys are skipped even if there are snapshots containing them,
|
||||||
// as if IgnoreSnapshots() was true; i.e. values removed
|
// i.e. values removed by kRemoveAndSkipUntil can disappear from a
|
||||||
// by kRemoveAndSkipUntil can disappear from a snapshot - beware
|
// snapshot - beware if you're using TransactionDB or
|
||||||
// if you're using TransactionDB or DB::GetSnapshot().
|
// DB::GetSnapshot().
|
||||||
// - If value for a key was overwritten or merged into (multiple Put()s
|
// - If value for a key was overwritten or merged into (multiple Put()s
|
||||||
// or Merge()s), and compaction filter skips this key with
|
// or Merge()s), and compaction filter skips this key with
|
||||||
// kRemoveAndSkipUntil, it's possible that it will remove only
|
// kRemoveAndSkipUntil, it's possible that it will remove only
|
||||||
@ -176,15 +173,12 @@ class CompactionFilter {
|
|||||||
return Decision::kKeep;
|
return Decision::kKeep;
|
||||||
}
|
}
|
||||||
|
|
||||||
// By default, compaction will only call Filter() on keys written after the
|
// This function is deprecated. Snapshots will always be ignored for
|
||||||
// most recent call to GetSnapshot(). However, if the compaction filter
|
// compaction filters, because we realized that not ignoring snapshots doesn't
|
||||||
// overrides IgnoreSnapshots to make it return true, the compaction filter
|
// provide the gurantee we initially thought it would provide. Repeatable
|
||||||
// will be called even if the keys were written before the last snapshot.
|
// reads will not be guaranteed anyway. If you override the function and
|
||||||
// This behavior is to be used only when we want to delete a set of keys
|
// returns false, we will fail the compaction.
|
||||||
// irrespective of snapshots. In particular, care should be taken
|
virtual bool IgnoreSnapshots() const { return true; }
|
||||||
// to understand that the values of these keys will change even if we are
|
|
||||||
// using a snapshot.
|
|
||||||
virtual bool IgnoreSnapshots() const { return false; }
|
|
||||||
|
|
||||||
// Returns a name that identifies this compaction filter.
|
// Returns a name that identifies this compaction filter.
|
||||||
// The name will be printed to LOG file on start up for diagnosis.
|
// The name will be printed to LOG file on start up for diagnosis.
|
||||||
|
@ -99,7 +99,7 @@ struct RocksLuaCompactionFilterOptions {
|
|||||||
bool ignore_value = false;
|
bool ignore_value = false;
|
||||||
|
|
||||||
// A boolean flag to determine whether to ignore snapshots.
|
// A boolean flag to determine whether to ignore snapshots.
|
||||||
bool ignore_snapshots = false;
|
bool ignore_snapshots = true;
|
||||||
|
|
||||||
// When specified a non-null pointer, the first "error_limit_per_filter"
|
// When specified a non-null pointer, the first "error_limit_per_filter"
|
||||||
// errors of each CompactionFilter that is lua related will be included
|
// errors of each CompactionFilter that is lua related will be included
|
||||||
|
Loading…
Reference in New Issue
Block a user