diff --git a/HISTORY.md b/HISTORY.md index 82a73bf77..a8d59829b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,9 @@ ### New Features * DB::GetLiveFilesStorageInfo is ready for production use. +### Public API changes +* Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions. + ## 7.2.0 (04/15/2022) ### Bug Fixes * Fixed bug which caused rocksdb failure in the situation when rocksdb was accessible using UNC path diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 78dcf3ac1..eaf2bc128 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -222,6 +222,20 @@ struct TransactionDBOptions { // pending writes into the database. A value of 0 or less means no limit. int64_t default_write_batch_flush_threshold = 0; + // This option is valid only for write-prepared/write-unprepared. Transaction + // will rely on this callback to determine if a key should be rolled back + // with Delete or SingleDelete when necessary. If the callback returns true, + // then SingleDelete should be used. If the callback is not callable or the + // callback returns false, then a Delete is used. + // The application should ensure thread-safety of this callback. + // The callback should not throw because RocksDB is not exception-safe. + // The callback may be removed if we allow mixing Delete and SingleDelete in + // the future. + std::function + rollback_deletion_type_callback; + private: // 128 entries // Should the default value change, please also update wp_snapshot_cache_bits diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index f1f65e17a..5a90bab88 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -3970,6 +3970,62 @@ TEST_P(WritePreparedTransactionTest, AtomicCommit) { } } +TEST_P(WritePreparedTransactionTest, BasicRollbackDeletionTypeCb) { + options.level0_file_num_compaction_trigger = 2; + // Always use SingleDelete to rollback Put. + txn_db_options.rollback_deletion_type_callback = + [](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; }; + + const auto write_to_db = [&]() { + assert(db); + std::unique_ptr txn0( + db->BeginTransaction(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn0->SetName("txn0")); + ASSERT_OK(txn0->Put("a", "v0")); + ASSERT_OK(txn0->Prepare()); + + // Generate sst1: [PUT('a')] + ASSERT_OK(db->Flush(FlushOptions())); + + { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = options.num_levels - 1; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + } + + ASSERT_OK(txn0->Rollback()); + txn0.reset(); + + ASSERT_OK(db->Put(WriteOptions(), "a", "v1")); + + ASSERT_OK(db->SingleDelete(WriteOptions(), "a")); + // Generate another SST with a SD to cover the oldest PUT('a') + ASSERT_OK(db->Flush(FlushOptions())); + + auto* dbimpl = static_cast_with_check(db->GetRootDB()); + assert(dbimpl); + ASSERT_OK(dbimpl->TEST_WaitForCompact()); + + { + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + } + + { + std::string value; + const Status s = db->Get(ReadOptions(), "a", &value); + ASSERT_TRUE(s.IsNotFound()); + } + }; + + // Destroy and reopen + ASSERT_OK(ReOpen()); + write_to_db(); +} + // Test that we can change write policy from WriteCommitted to WritePrepared // after a clean shutdown (which would empty the WAL) TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 67f49b2cd..97a964a23 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -263,6 +263,10 @@ Status WritePreparedTxn::CommitInternal() { Status WritePreparedTxn::RollbackInternal() { ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, "RollbackInternal prepare_seq: %" PRIu64, GetId()); + + assert(db_impl_); + assert(wpt_db_); + WriteBatch rollback_batch; assert(GetId() != kMaxSequenceNumber); assert(GetId() > 0); @@ -273,8 +277,9 @@ Status WritePreparedTxn::RollbackInternal() { // to prevent callback's seq to be overrriden inside DBImpk::Get roptions.snapshot = wpt_db_->GetMaxSnapshot(); struct RollbackWriteBatchBuilder : public WriteBatch::Handler { - DBImpl* db_; - WritePreparedTxnReadCallback callback; + DBImpl* const db_; + WritePreparedTxnDB* const wpt_db_; + WritePreparedTxnReadCallback callback_; WriteBatch* rollback_batch_; std::map& comparators_; std::map& handles_; @@ -282,14 +287,16 @@ Status WritePreparedTxn::RollbackInternal() { std::map keys_; bool rollback_merge_operands_; ReadOptions roptions_; + RollbackWriteBatchBuilder( DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, WriteBatch* dst_batch, std::map& comparators, std::map& handles, - bool rollback_merge_operands, ReadOptions _roptions) + bool rollback_merge_operands, const ReadOptions& _roptions) : db_(db), - callback(wpt_db, snap_seq), // disable min_uncommitted optimization + wpt_db_(wpt_db), + callback_(wpt_db, snap_seq), // disable min_uncommitted optimization rollback_batch_(dst_batch), comparators_(comparators), handles_(handles), @@ -304,8 +311,8 @@ Status WritePreparedTxn::RollbackInternal() { keys_[cf] = CFKeys(SetComparator(cmp)); } auto it = cf_keys.insert(key); - if (it.second == - false) { // second is false if a element already existed. + // second is false if a element already existed. + if (it.second == false) { return s; } @@ -316,7 +323,7 @@ Status WritePreparedTxn::RollbackInternal() { get_impl_options.column_family = cf_handle; get_impl_options.value = &pinnable_val; get_impl_options.value_found = ¬_used; - get_impl_options.callback = &callback; + get_impl_options.callback = &callback_; s = db_->GetImpl(roptions_, key, get_impl_options); assert(s.ok() || s.IsNotFound()); if (s.ok()) { @@ -325,7 +332,11 @@ Status WritePreparedTxn::RollbackInternal() { } else if (s.IsNotFound()) { // There has been no readable value before txn. By adding a delete we // make sure that there will be none afterwards either. - s = rollback_batch_->Delete(cf_handle, key); + if (wpt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) { + s = rollback_batch_->SingleDelete(cf_handle, key); + } else { + s = rollback_batch_->Delete(cf_handle, key); + } assert(s.ok()); } else { // Unexpected status. Return it to the user. diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 992a99cc1..3b6985737 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -394,7 +394,7 @@ Status WritePreparedTxnDB::NewIterators( return Status::OK(); } -void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { +void WritePreparedTxnDB::Init(const TransactionDBOptions& txn_db_opts) { // Adcance max_evicted_seq_ no more than 100 times before the cache wraps // around. INC_STEP_FOR_MAX_EVICTED = @@ -404,6 +404,8 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { commit_cache_ = std::unique_ptr[]>( new std::atomic[COMMIT_CACHE_SIZE] {}); dummy_max_snapshot_.number_ = kMaxSequenceNumber; + rollback_deletion_type_callback_ = + txn_db_opts.rollback_deletion_type_callback; } void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max, @@ -433,7 +435,7 @@ void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max, delayed_prepared_.insert(to_be_popped); ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64 - " new_max=%" PRIu64, + " new_max=%" PRIu64 ")", static_cast(delayed_prepared_.size()), to_be_popped, new_max); delayed_prepared_empty_.store(false, std::memory_order_release); diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 623118d78..502fea56a 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -465,6 +465,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Get a dummy snapshot that refers to kMaxSequenceNumber Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; } + bool ShouldRollbackWithSingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) { + return rollback_deletion_type_callback_ + ? rollback_deletion_type_callback_(this, column_family, key) + : false; + } + + std::function + rollback_deletion_type_callback_; + private: friend class AddPreparedCallback; friend class PreparedHeap_BasicsTest_Test; @@ -504,7 +514,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WriteUnpreparedTxnDB; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; - void Init(const TransactionDBOptions& /* unused */); + void Init(const TransactionDBOptions& txn_db_opts); void WPRecordTick(uint32_t ticker_type) const { RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);