Add rollback_deletion_type_callback to TxnDBOptions (#9873)

Summary:
This PR does not affect write-committed.

Add a member, `rollback_deletion_type_callback` to TransactionDBOptions
so that a write-prepared transaction, when rolling back, can call this
callback to decide if a `Delete` or `SingleDelete` should be used to
cancel a prior `Put` written to the database during prepare phase.

The purpose of this PR is to prevent mixing `Delete` and `SingleDelete`
for the same key, causing undefined behaviors. Without this PR, the
following can happen:

```
// The application always issues SingleDelete when deleting keys.

txn1->Put('a');
txn1->Prepare(); // writes to memtable and potentially gets flushed/compacted to Lmax
txn1->Rollback();  // inserts DELETE('a')

txn2->Put('a');
txn2->Commit();  // writes to memtable and potentially gets flushed/compacted
```

In the database, we may have
```
L0:   [PUT('a', s=100)]
L1:   [DELETE('a', s=90)]
Lmax: [PUT('a', s=0)]
```

If a compaction compacts L0 and L1, then we have
```
L1:    [PUT('a', s=100)]
Lmax:  [PUT('a', s=0)]
```

If a future transaction issues a SingleDelete, we have
```
L0:    [SD('a', s=110)]
L1:    [PUT('a', s=100)]
Lmax:  [PUT('a', s=0)]
```

Then, a compaction including L0, L1 and Lmax leads to
```
Lmax:  [PUT('a', s=0)]
```

which is incorrect.

Similar bugs reported and addressed in
https://github.com/cockroachdb/pebble/issues/1255. Based on our team's
current priority, we have decided to take this approach for now. We may
come back and revisit in the future.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9873

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D35762170

Pulled By: riversand963

fbshipit-source-id: b28d56eefc786b53c9844b9ef4a7807acdd82c8d
This commit is contained in:
Yanqin Jin 2022-04-20 18:57:32 -07:00 committed by Facebook GitHub Bot
parent 1bac873fcf
commit d13825e586
6 changed files with 107 additions and 11 deletions

View File

@ -3,6 +3,9 @@
### New Features ### New Features
* DB::GetLiveFilesStorageInfo is ready for production use. * DB::GetLiveFilesStorageInfo is ready for production use.
### Public API changes
* Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions.
## 7.2.0 (04/15/2022) ## 7.2.0 (04/15/2022)
### Bug Fixes ### Bug Fixes
* Fixed bug which caused rocksdb failure in the situation when rocksdb was accessible using UNC path * Fixed bug which caused rocksdb failure in the situation when rocksdb was accessible using UNC path

View File

@ -222,6 +222,20 @@ struct TransactionDBOptions {
// pending writes into the database. A value of 0 or less means no limit. // pending writes into the database. A value of 0 or less means no limit.
int64_t default_write_batch_flush_threshold = 0; int64_t default_write_batch_flush_threshold = 0;
// This option is valid only for write-prepared/write-unprepared. Transaction
// will rely on this callback to determine if a key should be rolled back
// with Delete or SingleDelete when necessary. If the callback returns true,
// then SingleDelete should be used. If the callback is not callable or the
// callback returns false, then a Delete is used.
// The application should ensure thread-safety of this callback.
// The callback should not throw because RocksDB is not exception-safe.
// The callback may be removed if we allow mixing Delete and SingleDelete in
// the future.
std::function<bool(TransactionDB* /*db*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/)>
rollback_deletion_type_callback;
private: private:
// 128 entries // 128 entries
// Should the default value change, please also update wp_snapshot_cache_bits // Should the default value change, please also update wp_snapshot_cache_bits

View File

@ -3970,6 +3970,62 @@ TEST_P(WritePreparedTransactionTest, AtomicCommit) {
} }
} }
TEST_P(WritePreparedTransactionTest, BasicRollbackDeletionTypeCb) {
options.level0_file_num_compaction_trigger = 2;
// Always use SingleDelete to rollback Put.
txn_db_options.rollback_deletion_type_callback =
[](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; };
const auto write_to_db = [&]() {
assert(db);
std::unique_ptr<Transaction> txn0(
db->BeginTransaction(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn0->SetName("txn0"));
ASSERT_OK(txn0->Put("a", "v0"));
ASSERT_OK(txn0->Prepare());
// Generate sst1: [PUT('a')]
ASSERT_OK(db->Flush(FlushOptions()));
{
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = options.num_levels - 1;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
}
ASSERT_OK(txn0->Rollback());
txn0.reset();
ASSERT_OK(db->Put(WriteOptions(), "a", "v1"));
ASSERT_OK(db->SingleDelete(WriteOptions(), "a"));
// Generate another SST with a SD to cover the oldest PUT('a')
ASSERT_OK(db->Flush(FlushOptions()));
auto* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
assert(dbimpl);
ASSERT_OK(dbimpl->TEST_WaitForCompact());
{
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
}
{
std::string value;
const Status s = db->Get(ReadOptions(), "a", &value);
ASSERT_TRUE(s.IsNotFound());
}
};
// Destroy and reopen
ASSERT_OK(ReOpen());
write_to_db();
}
// Test that we can change write policy from WriteCommitted to WritePrepared // Test that we can change write policy from WriteCommitted to WritePrepared
// after a clean shutdown (which would empty the WAL) // after a clean shutdown (which would empty the WAL)
TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) { TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {

View File

@ -263,6 +263,10 @@ Status WritePreparedTxn::CommitInternal() {
Status WritePreparedTxn::RollbackInternal() { Status WritePreparedTxn::RollbackInternal() {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"RollbackInternal prepare_seq: %" PRIu64, GetId()); "RollbackInternal prepare_seq: %" PRIu64, GetId());
assert(db_impl_);
assert(wpt_db_);
WriteBatch rollback_batch; WriteBatch rollback_batch;
assert(GetId() != kMaxSequenceNumber); assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0); assert(GetId() > 0);
@ -273,8 +277,9 @@ Status WritePreparedTxn::RollbackInternal() {
// to prevent callback's seq to be overrriden inside DBImpk::Get // to prevent callback's seq to be overrriden inside DBImpk::Get
roptions.snapshot = wpt_db_->GetMaxSnapshot(); roptions.snapshot = wpt_db_->GetMaxSnapshot();
struct RollbackWriteBatchBuilder : public WriteBatch::Handler { struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
DBImpl* db_; DBImpl* const db_;
WritePreparedTxnReadCallback callback; WritePreparedTxnDB* const wpt_db_;
WritePreparedTxnReadCallback callback_;
WriteBatch* rollback_batch_; WriteBatch* rollback_batch_;
std::map<uint32_t, const Comparator*>& comparators_; std::map<uint32_t, const Comparator*>& comparators_;
std::map<uint32_t, ColumnFamilyHandle*>& handles_; std::map<uint32_t, ColumnFamilyHandle*>& handles_;
@ -282,14 +287,16 @@ Status WritePreparedTxn::RollbackInternal() {
std::map<uint32_t, CFKeys> keys_; std::map<uint32_t, CFKeys> keys_;
bool rollback_merge_operands_; bool rollback_merge_operands_;
ReadOptions roptions_; ReadOptions roptions_;
RollbackWriteBatchBuilder( RollbackWriteBatchBuilder(
DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
WriteBatch* dst_batch, WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators, std::map<uint32_t, const Comparator*>& comparators,
std::map<uint32_t, ColumnFamilyHandle*>& handles, std::map<uint32_t, ColumnFamilyHandle*>& handles,
bool rollback_merge_operands, ReadOptions _roptions) bool rollback_merge_operands, const ReadOptions& _roptions)
: db_(db), : db_(db),
callback(wpt_db, snap_seq), // disable min_uncommitted optimization wpt_db_(wpt_db),
callback_(wpt_db, snap_seq), // disable min_uncommitted optimization
rollback_batch_(dst_batch), rollback_batch_(dst_batch),
comparators_(comparators), comparators_(comparators),
handles_(handles), handles_(handles),
@ -304,8 +311,8 @@ Status WritePreparedTxn::RollbackInternal() {
keys_[cf] = CFKeys(SetComparator(cmp)); keys_[cf] = CFKeys(SetComparator(cmp));
} }
auto it = cf_keys.insert(key); auto it = cf_keys.insert(key);
if (it.second == // second is false if a element already existed.
false) { // second is false if a element already existed. if (it.second == false) {
return s; return s;
} }
@ -316,7 +323,7 @@ Status WritePreparedTxn::RollbackInternal() {
get_impl_options.column_family = cf_handle; get_impl_options.column_family = cf_handle;
get_impl_options.value = &pinnable_val; get_impl_options.value = &pinnable_val;
get_impl_options.value_found = &not_used; get_impl_options.value_found = &not_used;
get_impl_options.callback = &callback; get_impl_options.callback = &callback_;
s = db_->GetImpl(roptions_, key, get_impl_options); s = db_->GetImpl(roptions_, key, get_impl_options);
assert(s.ok() || s.IsNotFound()); assert(s.ok() || s.IsNotFound());
if (s.ok()) { if (s.ok()) {
@ -325,7 +332,11 @@ Status WritePreparedTxn::RollbackInternal() {
} else if (s.IsNotFound()) { } else if (s.IsNotFound()) {
// There has been no readable value before txn. By adding a delete we // There has been no readable value before txn. By adding a delete we
// make sure that there will be none afterwards either. // make sure that there will be none afterwards either.
if (wpt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) {
s = rollback_batch_->SingleDelete(cf_handle, key);
} else {
s = rollback_batch_->Delete(cf_handle, key); s = rollback_batch_->Delete(cf_handle, key);
}
assert(s.ok()); assert(s.ok());
} else { } else {
// Unexpected status. Return it to the user. // Unexpected status. Return it to the user.

View File

@ -394,7 +394,7 @@ Status WritePreparedTxnDB::NewIterators(
return Status::OK(); return Status::OK();
} }
void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { void WritePreparedTxnDB::Init(const TransactionDBOptions& txn_db_opts) {
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
// around. // around.
INC_STEP_FOR_MAX_EVICTED = INC_STEP_FOR_MAX_EVICTED =
@ -404,6 +404,8 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>( commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>(
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {}); new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
dummy_max_snapshot_.number_ = kMaxSequenceNumber; dummy_max_snapshot_.number_ = kMaxSequenceNumber;
rollback_deletion_type_callback_ =
txn_db_opts.rollback_deletion_type_callback;
} }
void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max, void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
@ -433,7 +435,7 @@ void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
delayed_prepared_.insert(to_be_popped); delayed_prepared_.insert(to_be_popped);
ROCKS_LOG_WARN(info_log_, ROCKS_LOG_WARN(info_log_,
"prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64 "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
" new_max=%" PRIu64, " new_max=%" PRIu64 ")",
static_cast<uint64_t>(delayed_prepared_.size()), static_cast<uint64_t>(delayed_prepared_.size()),
to_be_popped, new_max); to_be_popped, new_max);
delayed_prepared_empty_.store(false, std::memory_order_release); delayed_prepared_empty_.store(false, std::memory_order_release);

View File

@ -465,6 +465,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Get a dummy snapshot that refers to kMaxSequenceNumber // Get a dummy snapshot that refers to kMaxSequenceNumber
Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; } Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
bool ShouldRollbackWithSingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
return rollback_deletion_type_callback_
? rollback_deletion_type_callback_(this, column_family, key)
: false;
}
std::function<bool(TransactionDB*, ColumnFamilyHandle*, const Slice&)>
rollback_deletion_type_callback_;
private: private:
friend class AddPreparedCallback; friend class AddPreparedCallback;
friend class PreparedHeap_BasicsTest_Test; friend class PreparedHeap_BasicsTest_Test;
@ -504,7 +514,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WriteUnpreparedTxnDB; friend class WriteUnpreparedTxnDB;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
void Init(const TransactionDBOptions& /* unused */); void Init(const TransactionDBOptions& txn_db_opts);
void WPRecordTick(uint32_t ticker_type) const { void WPRecordTick(uint32_t ticker_type) const {
RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type); RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);