Fix PopSavePoint to merge info into the previous savepoint (#5628)
Summary: Transaction::RollbackToSavePoint undos the modification made since the SavePoint beginning, and also unlocks the corresponding keys, which are tracked in the last SavePoint. Currently ::PopSavePoint simply discard these tracked keys, leaving them locked in the lock manager. This breaks a subsequent ::RollbackToSavePoint behavior as it loses track of such keys, and thus cannot unlock them. The patch fixes ::PopSavePoint by passing on the track key information to the previous SavePoint. Fixes https://github.com/facebook/rocksdb/issues/5618 Pull Request resolved: https://github.com/facebook/rocksdb/pull/5628 Differential Revision: D16505325 Pulled By: lth fbshipit-source-id: 2bc3b30963ab4d36d996d1f66543c93abf358980
This commit is contained in:
parent
74782cec32
commit
230b909da8
@ -192,7 +192,48 @@ Status TransactionBaseImpl::PopSavePoint() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
assert(!save_points_->empty());
|
assert(!save_points_->empty());
|
||||||
save_points_->pop();
|
// If there is another savepoint A below the current savepoint B, then A needs
|
||||||
|
// to inherit tracked_keys in B so that if we rollback to savepoint A, we
|
||||||
|
// remember to unlock keys in B. If there is no other savepoint below, then we
|
||||||
|
// can safely discard savepoint info.
|
||||||
|
if (save_points_->size() == 1) {
|
||||||
|
save_points_->pop();
|
||||||
|
} else {
|
||||||
|
TransactionBaseImpl::SavePoint top;
|
||||||
|
std::swap(top, save_points_->top());
|
||||||
|
save_points_->pop();
|
||||||
|
|
||||||
|
const TransactionKeyMap& curr_cf_key_map = top.new_keys_;
|
||||||
|
TransactionKeyMap& prev_cf_key_map = save_points_->top().new_keys_;
|
||||||
|
|
||||||
|
for (const auto& curr_cf_key_iter : curr_cf_key_map) {
|
||||||
|
uint32_t column_family_id = curr_cf_key_iter.first;
|
||||||
|
const std::unordered_map<std::string, TransactionKeyMapInfo>& curr_keys =
|
||||||
|
curr_cf_key_iter.second;
|
||||||
|
|
||||||
|
// If cfid was not previously tracked, just copy everything over.
|
||||||
|
auto prev_keys_iter = prev_cf_key_map.find(column_family_id);
|
||||||
|
if (prev_keys_iter == prev_cf_key_map.end()) {
|
||||||
|
prev_cf_key_map.emplace(curr_cf_key_iter);
|
||||||
|
} else {
|
||||||
|
std::unordered_map<std::string, TransactionKeyMapInfo>& prev_keys =
|
||||||
|
prev_keys_iter->second;
|
||||||
|
for (const auto& key_iter : curr_keys) {
|
||||||
|
const std::string& key = key_iter.first;
|
||||||
|
const TransactionKeyMapInfo& info = key_iter.second;
|
||||||
|
// If key was not previously tracked, just copy the whole struct over.
|
||||||
|
// Otherwise, some merging needs to occur.
|
||||||
|
auto prev_info = prev_keys.find(key);
|
||||||
|
if (prev_info == prev_keys.end()) {
|
||||||
|
prev_keys.emplace(key_iter);
|
||||||
|
} else {
|
||||||
|
prev_info->second.Merge(info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return write_batch_.PopSavePoint();
|
return write_batch_.PopSavePoint();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,11 +294,11 @@ class TransactionBaseImpl : public Transaction {
|
|||||||
|
|
||||||
struct SavePoint {
|
struct SavePoint {
|
||||||
std::shared_ptr<const Snapshot> snapshot_;
|
std::shared_ptr<const Snapshot> snapshot_;
|
||||||
bool snapshot_needed_;
|
bool snapshot_needed_ = false;
|
||||||
std::shared_ptr<TransactionNotifier> snapshot_notifier_;
|
std::shared_ptr<TransactionNotifier> snapshot_notifier_;
|
||||||
uint64_t num_puts_;
|
uint64_t num_puts_ = 0;
|
||||||
uint64_t num_deletes_;
|
uint64_t num_deletes_ = 0;
|
||||||
uint64_t num_merges_;
|
uint64_t num_merges_ = 0;
|
||||||
|
|
||||||
// Record all keys tracked since the last savepoint
|
// Record all keys tracked since the last savepoint
|
||||||
TransactionKeyMap new_keys_;
|
TransactionKeyMap new_keys_;
|
||||||
@ -312,6 +312,8 @@ class TransactionBaseImpl : public Transaction {
|
|||||||
num_puts_(num_puts),
|
num_puts_(num_puts),
|
||||||
num_deletes_(num_deletes),
|
num_deletes_(num_deletes),
|
||||||
num_merges_(num_merges) {}
|
num_merges_(num_merges) {}
|
||||||
|
|
||||||
|
SavePoint() = default;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Records writes pending in this transaction
|
// Records writes pending in this transaction
|
||||||
|
@ -4030,6 +4030,58 @@ TEST_P(TransactionTest, SavepointTest3) {
|
|||||||
ASSERT_TRUE(s.IsNotFound());
|
ASSERT_TRUE(s.IsNotFound());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(TransactionTest, SavepointTest4) {
|
||||||
|
WriteOptions write_options;
|
||||||
|
ReadOptions read_options;
|
||||||
|
TransactionOptions txn_options;
|
||||||
|
Status s;
|
||||||
|
|
||||||
|
txn_options.lock_timeout = 1; // 1 ms
|
||||||
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
||||||
|
ASSERT_TRUE(txn1);
|
||||||
|
|
||||||
|
txn1->SetSavePoint(); // 1
|
||||||
|
s = txn1->Put("A", "a");
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
txn1->SetSavePoint(); // 2
|
||||||
|
s = txn1->Put("B", "b");
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
s = txn1->PopSavePoint(); // Remove 2
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
// Verify that A/B still exists.
|
||||||
|
std::string value;
|
||||||
|
ASSERT_OK(txn1->Get(read_options, "A", &value));
|
||||||
|
ASSERT_EQ("a", value);
|
||||||
|
|
||||||
|
ASSERT_OK(txn1->Get(read_options, "B", &value));
|
||||||
|
ASSERT_EQ("b", value);
|
||||||
|
|
||||||
|
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
|
||||||
|
|
||||||
|
// Verify that everything was rolled back.
|
||||||
|
s = txn1->Get(read_options, "A", &value);
|
||||||
|
ASSERT_TRUE(s.IsNotFound());
|
||||||
|
|
||||||
|
s = txn1->Get(read_options, "B", &value);
|
||||||
|
ASSERT_TRUE(s.IsNotFound());
|
||||||
|
|
||||||
|
// Nothing should be locked
|
||||||
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
||||||
|
ASSERT_TRUE(txn2);
|
||||||
|
|
||||||
|
s = txn2->Put("A", "");
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
s = txn2->Put("B", "");
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
delete txn2;
|
||||||
|
delete txn1;
|
||||||
|
}
|
||||||
|
|
||||||
TEST_P(TransactionTest, UndoGetForUpdateTest) {
|
TEST_P(TransactionTest, UndoGetForUpdateTest) {
|
||||||
WriteOptions write_options;
|
WriteOptions write_options;
|
||||||
ReadOptions read_options;
|
ReadOptions read_options;
|
||||||
|
@ -31,6 +31,14 @@ struct TransactionKeyMapInfo {
|
|||||||
|
|
||||||
explicit TransactionKeyMapInfo(SequenceNumber seq_no)
|
explicit TransactionKeyMapInfo(SequenceNumber seq_no)
|
||||||
: seq(seq_no), num_writes(0), num_reads(0), exclusive(false) {}
|
: seq(seq_no), num_writes(0), num_reads(0), exclusive(false) {}
|
||||||
|
|
||||||
|
// Used in PopSavePoint to collapse two savepoints together.
|
||||||
|
void Merge(const TransactionKeyMapInfo& info) {
|
||||||
|
assert(seq <= info.seq);
|
||||||
|
num_reads += info.num_reads;
|
||||||
|
num_writes += info.num_writes;
|
||||||
|
exclusive |= info.exclusive;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
using TransactionKeyMap =
|
using TransactionKeyMap =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user