From 3f263ef53695d8add505bd9a7032532059a1faef Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 16 May 2022 15:44:59 -0700 Subject: [PATCH] Add a temporary option for user to opt-out enforcement of SingleDelete contract (#9983) Summary: PR https://github.com/facebook/rocksdb/issues/9888 started to enforce the contract of single delete described in https://github.com/facebook/rocksdb/wiki/Single-Delete. For some of existing use cases, it is desirable to have a transition during which compaction will not fail if the contract is violated. Therefore, we add a temporary option `enforce_single_del_contracts` to allow application to opt out from this new strict behavior. Once transition completes, the flag can be set to `true` again. In a future release, the option will be removed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9983 Test Plan: make check Reviewed By: ajkr Differential Revision: D36333672 Pulled By: riversand963 fbshipit-source-id: dcb703ea0ed08076a1422f1bfb9914afe3c2caa2 --- HISTORY.md | 1 + db/builder.cc | 1 + db/compaction/compaction_iterator.cc | 20 ++++++++++++++------ db/compaction/compaction_iterator.h | 5 ++++- db/compaction/compaction_iterator_test.cc | 3 ++- db/compaction/compaction_job.cc | 6 +++--- db/compaction/compaction_job_test.cc | 15 +++++++++++++++ db/flush_job.cc | 1 + include/rocksdb/options.h | 13 +++++++++++++ options/db_options.cc | 9 ++++++++- options/db_options.h | 1 + 11 files changed, 63 insertions(+), 12 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 48026af2f..cb4f18e79 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,7 @@ * CompactionFilter::Decision has a new value: kRemoveWithSingleDelete. If CompactionFilter returns this decision, then CompactionIterator will use `SingleDelete` to mark a key as removed. * Renamed CompactionFilter::Decision::kRemoveWithSingleDelete to kPurge since the latter sounds more general and hides the implementation details of how compaction iterator handles keys. * Added ability to specify functions for Prepare and Validate to OptionsTypeInfo. Added methods to OptionTypeInfo to set the functions via an API. These methods are intended for RocksDB plugin developers for configuration management. +* Added a new immutable db options, enforce_single_del_contracts. If set to false (default is true), compaction will NOT fail due to a single delete followed by a delete for the same key. The purpose of this temporay option is to help existing use cases migrate. ### Bug Fixes * RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue. diff --git a/db/builder.cc b/db/builder.cc index 00d78de77..9af93a736 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -196,6 +196,7 @@ Status BuildTable( ShouldReportDetailedTime(env, ioptions.stats), true /* internal key corruption is not ok */, range_del_agg.get(), blob_file_builder.get(), ioptions.allow_data_in_errors, + ioptions.enforce_single_del_contracts, /*compaction=*/nullptr, compaction_filter.get(), /*shutting_down=*/nullptr, /*manual_compaction_paused=*/nullptr, diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 574c02ae6..327f1f121 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -28,7 +28,8 @@ CompactionIterator::CompactionIterator( Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, - const Compaction* compaction, const CompactionFilter* compaction_filter, + bool enforce_single_del_contracts, const Compaction* compaction, + const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const std::atomic* manual_compaction_paused, const std::atomic* manual_compaction_canceled, @@ -38,7 +39,7 @@ CompactionIterator::CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, report_detailed_time, expect_valid_internal_key, range_del_agg, - blob_file_builder, allow_data_in_errors, + blob_file_builder, allow_data_in_errors, enforce_single_del_contracts, std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), compaction_filter, shutting_down, manual_compaction_paused, @@ -52,6 +53,7 @@ CompactionIterator::CompactionIterator( Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, + bool enforce_single_del_contracts, std::unique_ptr compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, @@ -80,6 +82,7 @@ CompactionIterator::CompactionIterator( manual_compaction_canceled_(manual_compaction_canceled), info_log_(info_log), allow_data_in_errors_(allow_data_in_errors), + enforce_single_del_contracts_(enforce_single_del_contracts), timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0), full_history_ts_low_(full_history_ts_low), current_user_key_sequence_(0), @@ -657,10 +660,15 @@ void CompactionIterator::NextFromInput() { "TransactionDBOptions::rollback_deletion_type_callback is " "configured properly. Mixing SD and DEL can lead to " "undefined behaviors"; - ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str()); - valid_ = false; - status_ = Status::Corruption(oss.str()); - return; + ++iter_stats_.num_record_drop_obsolete; + ++iter_stats_.num_single_del_mismatch; + if (enforce_single_del_contracts_) { + ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str()); + valid_ = false; + status_ = Status::Corruption(oss.str()); + return; + } + ROCKS_LOG_WARN(info_log_, "%s", oss.str().c_str()); } else if (!is_timestamp_eligible_for_gc) { // We cannot drop the SingleDelete as timestamp is enabled, and // timestamp of this key is greater than or equal to diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index d93e43642..dc994aa5f 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -176,7 +176,7 @@ class CompactionIterator { Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, - const Compaction* compaction = nullptr, + bool enforce_single_del_contracts, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const std::atomic* manual_compaction_paused = nullptr, @@ -193,6 +193,7 @@ class CompactionIterator { Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, + bool enforce_single_del_contracts, std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, @@ -332,6 +333,8 @@ class CompactionIterator { bool allow_data_in_errors_; + const bool enforce_single_del_contracts_; + // Comes from comparator. const size_t timestamp_size_; diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index d9d7ed121..79f598c62 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -279,7 +279,8 @@ class CompactionIteratorTest : public testing::TestWithParam { snapshot_checker_.get(), Env::Default(), false /* report_detailed_time */, false, range_del_agg_.get(), nullptr /* blob_file_builder */, true /*allow_data_in_errors*/, - std::move(compaction), filter, &shutting_down_, + true /*enforce_single_del_contracts*/, std::move(compaction), filter, + &shutting_down_, /*manual_compaction_paused=*/nullptr, /*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr, full_history_ts_low)); diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index cf0f78b57..ed4687906 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1479,9 +1479,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, &range_del_agg, blob_file_builder.get(), db_options_.allow_data_in_errors, - sub_compact->compaction, compaction_filter, shutting_down_, - manual_compaction_paused_, manual_compaction_canceled_, - db_options_.info_log, full_history_ts_low)); + db_options_.enforce_single_del_contracts, sub_compact->compaction, + compaction_filter, shutting_down_, manual_compaction_paused_, + manual_compaction_canceled_, db_options_.info_log, full_history_ts_low)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 72b9bd273..f9a2a2855 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -1107,6 +1107,21 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) { /* expected_oldest_blob_file_number */ 19); } +TEST_F(CompactionJobTest, NoEnforceSingleDeleteContract) { + db_options_.enforce_single_del_contracts = false; + NewDB(); + + auto file = + mock::MakeMockFile({{KeyStr("a", 4U, kTypeSingleDeletion), ""}, + {KeyStr("a", 3U, kTypeDeletion), "dontcare"}}); + AddMockFile(file); + SetLastSequence(4U); + + auto expected_results = mock::MakeMockFile(); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, expected_results); +} + TEST_F(CompactionJobTest, InputSerialization) { // Setup a random CompactionServiceInput CompactionServiceInput input; diff --git a/db/flush_job.cc b/db/flush_job.cc index 66e198f74..adb98fed5 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -464,6 +464,7 @@ Status FlushJob::MemPurge() { env, ShouldReportDetailedTime(env, ioptions->stats), true /* internal key corruption is not ok */, range_del_agg.get(), nullptr, ioptions->allow_data_in_errors, + ioptions->enforce_single_del_contracts, /*compaction=*/nullptr, compaction_filter.get(), /*shutting_down=*/nullptr, /*manual_compaction_paused=*/nullptr, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a554fc737..f0a8bff54 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1337,6 +1337,19 @@ struct DBOptions { // // Default: kNonVolatileBlockTier CacheTier lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier; + + // If set to false, when compaction or flush sees a SingleDelete followed by + // a Delete for the same user key, compaction job will not fail. + // Otherwise, compaction job will fail. + // This is a temporary option to help existing use cases migrate, and + // will be removed in a future release. + // Warning: do not set to false unless you are trying to migrate existing + // data in which the contract of single delete + // (https://github.com/facebook/rocksdb/wiki/Single-Delete) is not enforced, + // thus has Delete mixed with SingleDelete for the same user key. Violation + // of the contract leads to undefined behaviors with high possibility of data + // inconsistency, e.g. deleted old data become visible again, etc. + bool enforce_single_del_contracts = true; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/options/db_options.cc b/options/db_options.cc index 6b647d913..1e8de20de 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -552,6 +552,10 @@ static std::unordered_map OptionTypeInfo::Enum( offsetof(struct ImmutableDBOptions, lowest_used_cache_tier), &cache_tier_string_map, OptionTypeFlags::kNone)}, + {"enforce_single_del_contracts", + {offsetof(struct ImmutableDBOptions, enforce_single_del_contracts), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, }; const std::string OptionsHelper::kDBOptionsName = "DBOptions"; @@ -750,7 +754,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) db_host_id(options.db_host_id), checksum_handoff_file_types(options.checksum_handoff_file_types), lowest_used_cache_tier(options.lowest_used_cache_tier), - compaction_service(options.compaction_service) { + compaction_service(options.compaction_service), + enforce_single_del_contracts(options.enforce_single_del_contracts) { fs = env->GetFileSystem(); clock = env->GetSystemClock().get(); logger = info_log.get(); @@ -921,6 +926,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { allow_data_in_errors); ROCKS_LOG_HEADER(log, " Options.db_host_id: %s", db_host_id.c_str()); + ROCKS_LOG_HEADER(log, " Options.enforce_single_del_contracts: %s", + enforce_single_del_contracts ? "true" : "false"); } bool ImmutableDBOptions::IsWalDirSameAsDBPath() const { diff --git a/options/db_options.h b/options/db_options.h index a00399e67..0462a9c96 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -105,6 +105,7 @@ struct ImmutableDBOptions { Statistics* stats; Logger* logger; std::shared_ptr compaction_service; + bool enforce_single_del_contracts; bool IsWalDirSameAsDBPath() const; bool IsWalDirSameAsDBPath(const std::string& path) const;