WritePrepared Txn: rollback_merge_operands hack

Summary:
This is a hack as temporary fix of MyRocks with rollbacking  the merge operands. The way MyRocks uses merge operands is without protection of locks, which violates the assumption behind the rollback algorithm. They are ok with not being rolled back as it would just create a gap in the autoincrement column. The hack add an option to disable the rollback of merge operands by default and only enables it to let the unit test pass.
Closes https://github.com/facebook/rocksdb/pull/3711

Differential Revision: D7597177

Pulled By: maysamyabandeh

fbshipit-source-id: 544be0f666c7e7abb7f651ec8b23124e05056728
This commit is contained in:
Maysam Yabandeh 2018-04-12 11:52:15 -07:00
parent 88fe4ef1f1
commit 74fc31ec92
4 changed files with 23 additions and 4 deletions

View File

@ -4,11 +4,13 @@
* Add a BlockBasedTableOption to align uncompressed data blocks on the smaller of block size or page size boundary, to reduce flash reads by avoiding reads spanning 4K pages. * Add a BlockBasedTableOption to align uncompressed data blocks on the smaller of block size or page size boundary, to reduce flash reads by avoiding reads spanning 4K pages.
### New Features ### New Features
* TransactionDBOptions::write_policy can be configured to enable WritePrepared 2PC transactions. Read more about them in the wiki.
### Bug Fixes ### Bug Fixes
* Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob. * Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob.
* Fix WAL corruption caused by race condition between user write thread and FlushWAL when two_write_queue is not set. * Fix WAL corruption caused by race condition between user write thread and FlushWAL when two_write_queue is not set.
* Fix memory leak when pin_l0_filter_and_index_blocks_in_cache is used with partitioned filters * Fix memory leak when pin_l0_filter_and_index_blocks_in_cache is used with partitioned filters
* Disable rollback of merge operands in WritePrepared transactions to work around an issue in MyRocks. It can be enabled back by setting TransactionDBOptions::rollback_merge_operands to true.
### Java API Changes ### Java API Changes
* Add `BlockBasedTableConfig.setBlockCache` to allow sharing a block cache across DB instances. * Add `BlockBasedTableConfig.setBlockCache` to allow sharing a block cache across DB instances.

View File

@ -85,6 +85,14 @@ struct TransactionDBOptions {
// before the commit phase. The DB then needs to provide the mechanisms to // before the commit phase. The DB then needs to provide the mechanisms to
// tell apart committed from uncommitted data. // tell apart committed from uncommitted data.
TxnDBWritePolicy write_policy = TxnDBWritePolicy::WRITE_COMMITTED; TxnDBWritePolicy write_policy = TxnDBWritePolicy::WRITE_COMMITTED;
// TODO(myabandeh): remove this option
// Note: this is a temporary option as a hot fix in rollback of writeprepared
// txns in myrocks. MyRocks uses merge operands for autoinc column id without
// however obtaining locks. This breaks the assumption behind the rollback
// logic in myrocks. This hack of simply not rolling back merge operands works
// for the special way that myrocks uses this operands.
bool rollback_merge_operands = false;
}; };
struct TransactionOptions { struct TransactionOptions {

View File

@ -66,6 +66,7 @@ class TransactionTestBase : public ::testing::Test {
txn_db_options.transaction_lock_timeout = 0; txn_db_options.transaction_lock_timeout = 0;
txn_db_options.default_lock_timeout = 0; txn_db_options.default_lock_timeout = 0;
txn_db_options.write_policy = write_policy; txn_db_options.write_policy = write_policy;
txn_db_options.rollback_merge_operands = true;
Status s; Status s;
if (use_stackable_db == false) { if (use_stackable_db == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db); s = TransactionDB::Open(options, txn_db_options, dbname, &db);

View File

@ -218,15 +218,18 @@ Status WritePreparedTxn::RollbackInternal() {
std::map<uint32_t, const Comparator*>& comparators_; std::map<uint32_t, const Comparator*>& comparators_;
using CFKeys = std::set<Slice, SetComparator>; using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_; std::map<uint32_t, CFKeys> keys_;
bool rollback_merge_operands_;
RollbackWriteBatchBuilder( RollbackWriteBatchBuilder(
DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
WriteBatch* dst_batch, WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators) std::map<uint32_t, const Comparator*>& comparators,
bool rollback_merge_operands)
: db_(db), : db_(db),
callback(wpt_db, snap_seq, callback(wpt_db, snap_seq,
0), // 0 disables min_uncommitted optimization 0), // 0 disables min_uncommitted optimization
rollback_batch_(dst_batch), rollback_batch_(dst_batch),
comparators_(comparators) {} comparators_(comparators),
rollback_merge_operands_(rollback_merge_operands) {}
Status Rollback(uint32_t cf, const Slice& key) { Status Rollback(uint32_t cf, const Slice& key) {
Status s; Status s;
@ -275,7 +278,11 @@ Status WritePreparedTxn::RollbackInternal() {
Status MergeCF(uint32_t cf, const Slice& key, Status MergeCF(uint32_t cf, const Slice& key,
const Slice& /*val*/) override { const Slice& /*val*/) override {
if (rollback_merge_operands_) {
return Rollback(cf, key); return Rollback(cf, key);
} else {
return Status::OK();
}
} }
Status MarkNoop(bool) override { return Status::OK(); } Status MarkNoop(bool) override { return Status::OK(); }
@ -289,7 +296,8 @@ Status WritePreparedTxn::RollbackInternal() {
protected: protected:
virtual bool WriteAfterCommit() const override { return false; } virtual bool WriteAfterCommit() const override { return false; }
} rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch, } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch,
*wpt_db_->GetCFComparatorMap()); *wpt_db_->GetCFComparatorMap(),
wpt_db_->txn_db_options_.rollback_merge_operands);
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
assert(s.ok()); assert(s.ok());
if (!s.ok()) { if (!s.ok()) {