diff --git a/HISTORY.md b/HISTORY.md index 2055e6d74..baf61150a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Fixed a bug where manual flush would block forever even though flush options had wait=false. * Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen. * Fixed a bug in async_io path where incorrect length of data is read by FilePrefetchBuffer if data is consumed from two populated buffers and request for more data is sent. +* Fixed a CompactionFilter bug. Compaction filter used to use `Delete` to remove keys, even if the keys should be removed with `SingleDelete`. Mixing `Delete` and `SingleDelete` may cause undefined behavior. ### New Features * DB::GetLiveFilesStorageInfo is ready for production use. @@ -12,6 +13,7 @@ ### Public API changes * Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions. * EXPERIMENTAL: Add new API AbortIO in file_system to abort the read requests submitted asynchronously. +* CompactionFilter::Decision has a new value: kRemoveWithSingleDelete. If CompactionFilter returns this decision, then CompactionIterator will use `SingleDelete` to mark a key as removed. ### Bug Fixes * RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue. diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 9f5bc4bc8..be7b94997 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -307,6 +307,14 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, // no value associated with delete value_.clear(); iter_stats_.num_record_drop_user++; + } else if (filter == CompactionFilter::Decision::kRemoveWithSingleDelete) { + // convert the current key to a single delete; key_ is pointing into + // current_key_ at this point, so updating current_key_ updates key() + ikey_.type = kTypeSingleDeletion; + current_key_.UpdateInternalKey(ikey_.sequence, kTypeSingleDeletion); + // no value associated with single delete + value_.clear(); + iter_stats_.num_record_drop_user++; } else if (filter == CompactionFilter::Decision::kChangeValue) { if (ikey_.type == kTypeBlobIndex) { // value transfer from blob file to inlined data diff --git a/db/db_compaction_filter_test.cc b/db/db_compaction_filter_test.cc index abf8b6c8f..5a3f27d12 100644 --- a/db/db_compaction_filter_test.cc +++ b/db/db_compaction_filter_test.cc @@ -968,6 +968,71 @@ TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalseRecovery) { ASSERT_TRUE(TryReopen(options).IsNotSupported()); } +TEST_F(DBTestCompactionFilter, DropKeyWithSingleDelete) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + + Reopen(options); + + ASSERT_OK(Put("a", "v0")); + ASSERT_OK(Put("b", "v0")); + const Snapshot* snapshot = db_->GetSnapshot(); + + ASSERT_OK(SingleDelete("b")); + ASSERT_OK(Flush()); + + { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = options.num_levels - 1; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + } + + db_->ReleaseSnapshot(snapshot); + Close(); + + class DeleteFilterV2 : public CompactionFilter { + public: + Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/, + const Slice& /*existing_value*/, + std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (key.starts_with("b")) { + return Decision::kRemoveWithSingleDelete; + } + return Decision::kRemove; + } + + const char* Name() const override { return "DeleteFilterV2"; } + } delete_filter_v2; + + options.compaction_filter = &delete_filter_v2; + options.level0_file_num_compaction_trigger = 2; + Reopen(options); + + ASSERT_OK(Put("b", "v1")); + ASSERT_OK(Put("x", "v1")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("r", "v1")); + ASSERT_OK(Put("z", "v1")); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + Close(); + + options.compaction_filter = nullptr; + Reopen(options); + ASSERT_OK(SingleDelete("b")); + ASSERT_OK(Flush()); + { + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db_stress_tool/db_stress_compaction_filter.h b/db_stress_tool/db_stress_compaction_filter.h index f55062e0a..c967622db 100644 --- a/db_stress_tool/db_stress_compaction_filter.h +++ b/db_stress_tool/db_stress_compaction_filter.h @@ -46,11 +46,13 @@ class DbStressCompactionFilter : public CompactionFilter { // Reaching here means we acquired the lock. bool key_exists = state_->Exists(cf_id_, key_num); + const bool allow_overwrite = state_->AllowsOverwrite(key_num); key_mutex->Unlock(); if (!key_exists) { - return Decision::kRemove; + return allow_overwrite ? Decision::kRemove + : Decision::kRemoveWithSingleDelete; } return Decision::kKeep; } diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index 5cc5ca8dc..a56454abe 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -58,8 +58,8 @@ void ThreadBody(void* v) { bool RunStressTest(StressTest* stress) { SystemClock* clock = db_stress_env->GetSystemClock().get(); - stress->InitDb(); SharedState shared(db_stress_env, stress); + stress->InitDb(&shared); stress->FinishInitDb(&shared); #ifndef NDEBUG diff --git a/db_stress_tool/db_stress_shared_state.cc b/db_stress_tool/db_stress_shared_state.cc index bc1363504..078cc2fef 100644 --- a/db_stress_tool/db_stress_shared_state.cc +++ b/db_stress_tool/db_stress_shared_state.cc @@ -12,8 +12,6 @@ #include "db_stress_tool/db_stress_shared_state.h" namespace ROCKSDB_NAMESPACE { -const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe; -const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff; #if defined(ROCKSDB_SUPPORT_THREAD_LOCAL) #if defined(OS_SOLARIS) __thread bool SharedState::ignore_read_error; diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index e7eb4aade..b05144548 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -45,9 +45,9 @@ class SharedState { public: // indicates a key may have any value (or not be present) as an operation on // it is incomplete. - static const uint32_t UNKNOWN_SENTINEL; + static constexpr uint32_t UNKNOWN_SENTINEL = 0xfffffffe; // indicates a key should definitely be deleted - static const uint32_t DELETION_SENTINEL; + static constexpr uint32_t DELETION_SENTINEL = 0xffffffff; // Errors when reading filter blocks are ignored, so we use a thread // local variable updated via sync points to keep track of errors injected @@ -81,36 +81,9 @@ class SharedState { stress_test_(stress_test), verification_failure_(false), should_stop_test_(false), - no_overwrite_ids_(FLAGS_column_families), + no_overwrite_ids_(GenerateNoOverwriteIds()), expected_state_manager_(nullptr), printing_verification_results_(false) { - // Pick random keys in each column family that will not experience - // overwrite - - fprintf(stdout, "Choosing random keys with no overwrite\n"); - Random64 rnd(seed_); - // Start with the identity permutation. Subsequent iterations of - // for loop below will start with perm of previous for loop - int64_t* permutation = new int64_t[max_key_]; - for (int64_t i = 0; i < max_key_; i++) { - permutation[i] = i; - } - // Now do the Knuth shuffle - int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100; - // Only need to figure out first num_no_overwrite_keys of permutation - no_overwrite_ids_.reserve(num_no_overwrite_keys); - for (int64_t i = 0; i < num_no_overwrite_keys; i++) { - int64_t rand_index = i + rnd.Next() % (max_key_ - i); - // Swap i and rand_index; - int64_t temp = permutation[i]; - permutation[i] = permutation[rand_index]; - permutation[rand_index] = temp; - // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of - // permutation - no_overwrite_ids_.insert(permutation[i]); - } - delete[] permutation; - Status status; // TODO: We should introduce a way to explicitly disable verification // during shutdown. When that is disabled and FLAGS_expected_values_dir @@ -293,7 +266,7 @@ class SharedState { pending); } - bool AllowsOverwrite(int64_t key) { + bool AllowsOverwrite(int64_t key) const { return no_overwrite_ids_.find(key) == no_overwrite_ids_.end(); } @@ -335,6 +308,36 @@ class SharedState { ignore_read_error = true; } + // Pick random keys in each column family that will not experience overwrite. + std::unordered_set GenerateNoOverwriteIds() const { + fprintf(stdout, "Choosing random keys with no overwrite\n"); + // Start with the identity permutation. Subsequent iterations of + // for loop below will start with perm of previous for loop + std::vector permutation(max_key_); + for (int64_t i = 0; i < max_key_; ++i) { + permutation[i] = i; + } + // Now do the Knuth shuffle + const int64_t num_no_overwrite_keys = + (max_key_ * FLAGS_nooverwritepercent) / 100; + // Only need to figure out first num_no_overwrite_keys of permutation + std::unordered_set ret; + ret.reserve(num_no_overwrite_keys); + Random64 rnd(seed_); + for (int64_t i = 0; i < num_no_overwrite_keys; i++) { + assert(i < max_key_); + int64_t rand_index = i + rnd.Next() % (max_key_ - i); + // Swap i and rand_index; + int64_t temp = permutation[i]; + permutation[i] = permutation[rand_index]; + permutation[rand_index] = temp; + // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of + // permutation + ret.insert(permutation[i]); + } + return ret; + } + port::Mutex mu_; port::CondVar cv_; const uint32_t seed_; @@ -355,7 +358,7 @@ class SharedState { std::atomic should_stop_test_; // Keys that should not be overwritten - std::unordered_set no_overwrite_ids_; + const std::unordered_set no_overwrite_ids_; std::unique_ptr expected_state_manager_; // Cannot store `port::Mutex` directly in vector since it is not copyable diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 443b825bc..37835bb21 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -281,12 +281,12 @@ bool StressTest::BuildOptionsTable() { return true; } -void StressTest::InitDb() { +void StressTest::InitDb(SharedState* shared) { uint64_t now = clock_->NowMicros(); fprintf(stdout, "%s Initializing db_stress\n", clock_->TimeToString(now / 1000000).c_str()); PrintEnv(); - Open(); + Open(shared); BuildOptionsTable(); } @@ -568,7 +568,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, fprintf(stdout, "%s Reopening database in read-only\n", clock_->TimeToString(now / 1000000).c_str()); // Reopen as read-only, can ignore all options related to updates - Open(); + Open(shared); } else { fprintf(stderr, "Failed to preload db"); exit(1); @@ -2302,10 +2302,12 @@ void StressTest::PrintEnv() const { fprintf(stdout, "------------------------------------------------\n"); } -void StressTest::Open() { +void StressTest::Open(SharedState* shared) { assert(db_ == nullptr); #ifndef ROCKSDB_LITE assert(txn_db_ == nullptr); +#else + (void)shared; #endif if (FLAGS_options_file.empty()) { BlockBasedTableOptions block_based_options; @@ -2752,7 +2754,7 @@ void StressTest::Open() { static_cast(FLAGS_wp_snapshot_cache_bits); txn_db_options.wp_commit_cache_bits = static_cast(FLAGS_wp_commit_cache_bits); - PrepareTxnDbOptions(txn_db_options); + PrepareTxnDbOptions(shared, txn_db_options); s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, cf_descriptors, &column_families_, &txn_db_); if (!s.ok()) { @@ -2912,7 +2914,7 @@ void StressTest::Reopen(ThreadState* thread) { auto now = clock_->NowMicros(); fprintf(stdout, "%s Reopening database for the %dth time\n", clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_); - Open(); + Open(thread->shared); if ((FLAGS_sync_fault_injection || FLAGS_disable_wal) && IsStateTracked()) { Status s = thread->shared->SaveAtAndAfter(db_); diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index ee37eb101..2307e1261 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -30,7 +30,7 @@ class StressTest { bool BuildOptionsTable(); - void InitDb(); + void InitDb(SharedState*); // The initialization work is split into two parts to avoid a circular // dependency with `SharedState`. virtual void FinishInitDb(SharedState*); @@ -219,7 +219,7 @@ class StressTest { void PrintEnv() const; - void Open(); + void Open(SharedState* shared); void Reopen(ThreadState* thread); @@ -228,7 +228,8 @@ class StressTest { virtual void RegisterAdditionalListeners() {} #ifndef ROCKSDB_LITE - virtual void PrepareTxnDbOptions(TransactionDBOptions& /*txn_db_opts*/) {} + virtual void PrepareTxnDbOptions(SharedState* /*shared*/, + TransactionDBOptions& /*txn_db_opts*/) {} #endif std::shared_ptr cache_; diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 619a119cb..e29a30980 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -533,7 +533,7 @@ void MultiOpsTxnsStressTest::RegisterAdditionalListeners() { #ifndef ROCKSDB_LITE void MultiOpsTxnsStressTest::PrepareTxnDbOptions( - TransactionDBOptions& txn_db_opts) { + SharedState* /*shared*/, TransactionDBOptions& txn_db_opts) { // MultiOpsTxnStressTest uses SingleDelete to delete secondary keys, thus we // register this callback to let TxnDb know that when rolling back // a transaction, use only SingleDelete to cancel prior Put from the same diff --git a/db_stress_tool/multi_ops_txns_stress.h b/db_stress_tool/multi_ops_txns_stress.h index a7db1c69e..d711f2357 100644 --- a/db_stress_tool/multi_ops_txns_stress.h +++ b/db_stress_tool/multi_ops_txns_stress.h @@ -265,7 +265,8 @@ class MultiOpsTxnsStressTest : public StressTest { void RegisterAdditionalListeners() override; #ifndef ROCKSDB_LITE - void PrepareTxnDbOptions(TransactionDBOptions& txn_db_opts) override; + void PrepareTxnDbOptions(SharedState* /*shared*/, + TransactionDBOptions& txn_db_opts) override; #endif // !ROCKSDB_LITE Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a, diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 58e32016d..75579d0ce 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -12,6 +12,7 @@ #ifndef NDEBUG #include "utilities/fault_injection_fs.h" #endif // NDEBUG +#include "rocksdb/utilities/transaction_db.h" namespace ROCKSDB_NAMESPACE { class NonBatchedOpsStressTest : public StressTest { @@ -930,6 +931,21 @@ class NonBatchedOpsStressTest : public StressTest { } return true; } + +#ifndef ROCKSDB_LITE + void PrepareTxnDbOptions(SharedState* shared, + TransactionDBOptions& txn_db_opts) override { + txn_db_opts.rollback_deletion_type_callback = + [shared](TransactionDB*, ColumnFamilyHandle*, const Slice& key) { + assert(shared); + uint64_t key_num = 0; + bool ok = GetIntVal(key.ToString(), &key_num); + assert(ok); + (void)ok; + return !shared->AllowsOverwrite(key_num); + }; + } +#endif // ROCKSDB_LITE }; StressTest* CreateNonBatchedOpsStressTest() { diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index bdf872fd5..29f7fae8f 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -39,6 +39,7 @@ class CompactionFilter : public Customizable { enum class Decision { kKeep, kRemove, + kRemoveWithSingleDelete, kChangeValue, kRemoveAndSkipUntil, kChangeBlobIndex, // used internally by BlobDB. diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 696448989..53aa95ee0 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -384,6 +384,8 @@ multiops_txn_default_params = { "flush_one_in": 1000, "key_spaces_path": setup_multiops_txn_key_spaces_file(), "rollback_one_in": 4, + # Re-enable once we have a compaction for MultiOpsTxnStressTest + "enable_compaction_filter": 0, } multiops_wc_txn_params = {