Transactions: Release Locks when rolling back to a savepoint
Summary: Transaction::RollbackToSavePoint() will now release any locks that were taken since the previous SavePoint. To do this cleanly, I moved tracked_keys_ management into TransactionBase. Test Plan: New Transaction test. Reviewers: igor, rven, sdong Reviewed By: sdong Subscribers: dhruba, spetrunia, leveldb Differential Revision: https://reviews.facebook.net/D46761
This commit is contained in:
parent
9f3a66a936
commit
a3fc49bfdd
@ -38,7 +38,6 @@ OptimisticTransactionImpl::~OptimisticTransactionImpl() {
|
||||
|
||||
void OptimisticTransactionImpl::Clear() {
|
||||
TransactionBaseImpl::Clear();
|
||||
tracked_keys_.clear();
|
||||
}
|
||||
|
||||
Status OptimisticTransactionImpl::Commit() {
|
||||
@ -83,18 +82,7 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||
|
||||
std::string key_str = key.ToString();
|
||||
|
||||
auto iter = tracked_keys_[cfh_id].find(key_str);
|
||||
if (iter == tracked_keys_[cfh_id].end()) {
|
||||
// key not yet seen, store it.
|
||||
tracked_keys_[cfh_id].insert({std::move(key_str), seq});
|
||||
} else {
|
||||
SequenceNumber old_seq = iter->second;
|
||||
if (seq < old_seq) {
|
||||
// Snapshot has changed since we last saw this key, need to
|
||||
// store the earliest seen sequence number.
|
||||
tracked_keys_[cfh_id][key_str] = seq;
|
||||
}
|
||||
}
|
||||
TrackKey(cfh_id, key_str, seq);
|
||||
|
||||
// Always return OK. Confilct checking will happen at commit time.
|
||||
return Status::OK();
|
||||
@ -113,19 +101,7 @@ Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) {
|
||||
assert(dynamic_cast<DBImpl*>(db) != nullptr);
|
||||
auto db_impl = reinterpret_cast<DBImpl*>(db);
|
||||
|
||||
return TransactionUtil::CheckKeysForConflicts(db_impl, &tracked_keys_);
|
||||
}
|
||||
|
||||
uint64_t OptimisticTransactionImpl::GetNumKeys() const {
|
||||
uint64_t count = 0;
|
||||
|
||||
// sum up locked keys in all column families
|
||||
for (const auto& key_map_iter : tracked_keys_) {
|
||||
const auto& keys = key_map_iter.second;
|
||||
count += keys.size();
|
||||
}
|
||||
|
||||
return count;
|
||||
return TransactionUtil::CheckKeysForConflicts(db_impl, GetTrackedKeys());
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -38,10 +38,6 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
|
||||
|
||||
void Rollback() override;
|
||||
|
||||
uint64_t GetNumKeys() const override;
|
||||
|
||||
const TransactionKeyMap* GetTrackedKeys() const { return &tracked_keys_; }
|
||||
|
||||
protected:
|
||||
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
|
||||
bool untracked = false) override;
|
||||
@ -49,12 +45,6 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
|
||||
private:
|
||||
OptimisticTransactionDB* const txn_db_;
|
||||
|
||||
// Map of Column Family IDs to keys and corresponding sequence numbers.
|
||||
// The sequence number stored for a key will be used during commit to make
|
||||
// sure this key has
|
||||
// not changed since this sequence number.
|
||||
TransactionKeyMap tracked_keys_;
|
||||
|
||||
friend class OptimisticTransactionCallback;
|
||||
|
||||
// Returns OK if it is safe to commit this transaction. Returns Status::Busy
|
||||
|
@ -28,6 +28,7 @@ TransactionBaseImpl::~TransactionBaseImpl() {}
|
||||
void TransactionBaseImpl::Clear() {
|
||||
save_points_.reset(nullptr);
|
||||
write_batch_->Clear();
|
||||
tracked_keys_.clear();
|
||||
num_puts_ = 0;
|
||||
num_deletes_ = 0;
|
||||
num_merges_ = 0;
|
||||
@ -71,12 +72,25 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
|
||||
num_deletes_ = save_point.num_deletes_;
|
||||
num_merges_ = save_point.num_merges_;
|
||||
|
||||
save_points_->pop();
|
||||
|
||||
// Rollback batch
|
||||
Status s = write_batch_->RollbackToSavePoint();
|
||||
assert(s.ok());
|
||||
|
||||
// Rollback any keys that were tracked since the last savepoint
|
||||
auto key_map = GetTrackedKeysSinceSavePoint();
|
||||
assert(key_map);
|
||||
for (auto& key_map_iter : *key_map) {
|
||||
uint32_t column_family_id = key_map_iter.first;
|
||||
auto& keys = key_map_iter.second;
|
||||
|
||||
for (auto& key_iter : keys) {
|
||||
const std::string& key = key_iter.first;
|
||||
tracked_keys_[column_family_id].erase(key);
|
||||
}
|
||||
}
|
||||
|
||||
save_points_->pop();
|
||||
|
||||
return s;
|
||||
} else {
|
||||
assert(write_batch_->RollbackToSavePoint().IsNotFound());
|
||||
@ -306,6 +320,42 @@ uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
|
||||
|
||||
uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
|
||||
|
||||
uint64_t TransactionBaseImpl::GetNumKeys() const {
|
||||
uint64_t count = 0;
|
||||
|
||||
// sum up locked keys in all column families
|
||||
for (const auto& key_map_iter : tracked_keys_) {
|
||||
const auto& keys = key_map_iter.second;
|
||||
count += keys.size();
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
|
||||
SequenceNumber seq) {
|
||||
auto iter = tracked_keys_[cfh_id].find(key);
|
||||
if (iter == tracked_keys_[cfh_id].end()) {
|
||||
tracked_keys_[cfh_id].insert({key, seq});
|
||||
|
||||
if (save_points_ != nullptr && !save_points_->empty()) {
|
||||
// Aren't tracking this key, add it.
|
||||
save_points_->top().new_keys_[cfh_id][key] = seq;
|
||||
}
|
||||
} else if (seq < iter->second) {
|
||||
// Now tracking this key with an earlier sequence number
|
||||
iter->second = seq;
|
||||
}
|
||||
}
|
||||
|
||||
const TransactionKeyMap* TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
|
||||
if (save_points_ != nullptr && !save_points_->empty()) {
|
||||
return &save_points_->top().new_keys_;
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "rocksdb/utilities/transaction.h"
|
||||
#include "rocksdb/utilities/transaction_db.h"
|
||||
#include "rocksdb/utilities/write_batch_with_index.h"
|
||||
#include "utilities/transactions/transaction_util.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -166,7 +167,19 @@ class TransactionBaseImpl : public Transaction {
|
||||
|
||||
uint64_t GetNumMerges() const override;
|
||||
|
||||
uint64_t GetNumKeys() const override;
|
||||
|
||||
// Get list of keys in this transaction that must not have any conflicts
|
||||
// with writes in other transactions.
|
||||
const TransactionKeyMap& GetTrackedKeys() const { return tracked_keys_; }
|
||||
|
||||
protected:
|
||||
// Add a key to the list of tracked keys.
|
||||
// seqno is the earliest seqno this key was involved with this transaction.
|
||||
void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno);
|
||||
|
||||
const TransactionKeyMap* GetTrackedKeysSinceSavePoint();
|
||||
|
||||
DB* const db_;
|
||||
|
||||
const WriteOptions write_options_;
|
||||
@ -194,6 +207,9 @@ class TransactionBaseImpl : public Transaction {
|
||||
uint64_t num_deletes_;
|
||||
uint64_t num_merges_;
|
||||
|
||||
// Record all keys tracked since the last savepoint
|
||||
TransactionKeyMap new_keys_;
|
||||
|
||||
SavePoint(std::shared_ptr<ManagedSnapshot> snapshot, uint64_t num_puts,
|
||||
uint64_t num_deletes, uint64_t num_merges)
|
||||
: snapshot_(snapshot),
|
||||
@ -202,11 +218,18 @@ class TransactionBaseImpl : public Transaction {
|
||||
num_merges_(num_merges) {}
|
||||
};
|
||||
|
||||
private:
|
||||
// Stack of the Snapshot saved at each save point. Saved snapshots may be
|
||||
// nullptr if there was no snapshot at the time SetSavePoint() was called.
|
||||
std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint>> save_points_;
|
||||
|
||||
private:
|
||||
// Map from column_family_id to map of keys that are involved in this
|
||||
// transaction.
|
||||
// Pessimistic Transactions will do conflict checking before adding a key
|
||||
// by calling TrackKey().
|
||||
// Optimistic Transactions will wait till commit time to do conflict checking.
|
||||
TransactionKeyMap tracked_keys_;
|
||||
|
||||
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
|
||||
bool untracked = false);
|
||||
};
|
||||
|
@ -141,7 +141,8 @@ Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id,
|
||||
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv());
|
||||
}
|
||||
|
||||
void TransactionDBImpl::UnLock(TransactionImpl* txn, TransactionKeyMap* keys) {
|
||||
void TransactionDBImpl::UnLock(TransactionImpl* txn,
|
||||
const TransactionKeyMap* keys) {
|
||||
lock_mgr_.UnLock(txn, keys, GetEnv());
|
||||
}
|
||||
|
||||
|
@ -54,7 +54,7 @@ class TransactionDBImpl : public TransactionDB {
|
||||
|
||||
Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key);
|
||||
|
||||
void UnLock(TransactionImpl* txn, TransactionKeyMap* keys);
|
||||
void UnLock(TransactionImpl* txn, const TransactionKeyMap* keys);
|
||||
void UnLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key);
|
||||
|
||||
void AddColumnFamily(const ColumnFamilyHandle* handle);
|
||||
|
@ -58,14 +58,12 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
|
||||
}
|
||||
|
||||
TransactionImpl::~TransactionImpl() {
|
||||
txn_db_impl_->UnLock(this, &tracked_keys_);
|
||||
txn_db_impl_->UnLock(this, &GetTrackedKeys());
|
||||
}
|
||||
|
||||
void TransactionImpl::Clear() {
|
||||
txn_db_impl_->UnLock(this, &GetTrackedKeys());
|
||||
TransactionBaseImpl::Clear();
|
||||
|
||||
txn_db_impl_->UnLock(this, &tracked_keys_);
|
||||
tracked_keys_.clear();
|
||||
}
|
||||
|
||||
bool TransactionImpl::IsExpired() const {
|
||||
@ -126,6 +124,16 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) {
|
||||
|
||||
void TransactionImpl::Rollback() { Clear(); }
|
||||
|
||||
Status TransactionImpl::RollbackToSavePoint() {
|
||||
// Unlock any keys locked since last transaction
|
||||
auto keys = GetTrackedKeysSinceSavePoint();
|
||||
if (keys) {
|
||||
txn_db_impl_->UnLock(this, keys);
|
||||
}
|
||||
|
||||
return TransactionBaseImpl::RollbackToSavePoint();
|
||||
}
|
||||
|
||||
// Lock all keys in this batch.
|
||||
// On success, caller should unlock keys_to_unlock
|
||||
Status TransactionImpl::LockBatch(WriteBatch* batch,
|
||||
@ -221,19 +229,16 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||
bool check_snapshot = !untracked;
|
||||
|
||||
// lock this key if this transactions hasn't already locked it
|
||||
auto iter = tracked_keys_[cfh_id].find(key_str);
|
||||
if (iter == tracked_keys_[cfh_id].end()) {
|
||||
SequenceNumber tracked_seqno = kMaxSequenceNumber;
|
||||
auto tracked_keys = GetTrackedKeys();
|
||||
auto iter = tracked_keys[cfh_id].find(key_str);
|
||||
if (iter == tracked_keys[cfh_id].end()) {
|
||||
previously_locked = false;
|
||||
|
||||
s = txn_db_impl_->TryLock(this, cfh_id, key_str);
|
||||
|
||||
if (s.ok()) {
|
||||
// Record that we've locked this key
|
||||
auto result = tracked_keys_[cfh_id].insert({key_str, kMaxSequenceNumber});
|
||||
iter = result.first;
|
||||
}
|
||||
} else {
|
||||
previously_locked = true;
|
||||
tracked_seqno = iter->second;
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
@ -244,17 +249,17 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||
// key has not been modified after. This is useful if this same
|
||||
// transaction
|
||||
// later tries to lock this key again.
|
||||
if (iter->second == kMaxSequenceNumber) {
|
||||
if (tracked_seqno == kMaxSequenceNumber) {
|
||||
// Since we haven't checked a snapshot, we only know this key has not
|
||||
// been modified since after we locked it.
|
||||
iter->second = db_->GetLatestSequenceNumber();
|
||||
tracked_seqno = db_->GetLatestSequenceNumber();
|
||||
}
|
||||
} else {
|
||||
// If the key has been previous validated at a sequence number earlier
|
||||
// than the curent snapshot's sequence number, we already know it has not
|
||||
// been modified.
|
||||
SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber();
|
||||
bool already_validated = iter->second <= seq;
|
||||
bool already_validated = tracked_seqno <= seq;
|
||||
|
||||
if (!already_validated) {
|
||||
s = CheckKeySequence(column_family, key);
|
||||
@ -262,19 +267,23 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||
if (s.ok()) {
|
||||
// Record that there have been no writes to this key after this
|
||||
// sequence.
|
||||
iter->second = seq;
|
||||
tracked_seqno = seq;
|
||||
} else {
|
||||
// Failed to validate key
|
||||
if (!previously_locked) {
|
||||
// Unlock key we just locked
|
||||
txn_db_impl_->UnLock(this, cfh_id, key.ToString());
|
||||
tracked_keys_[cfh_id].erase(iter);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
// Let base class know we've conflict checked this key.
|
||||
TrackKey(cfh_id, key_str, tracked_seqno);
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
@ -298,18 +307,6 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family,
|
||||
return result;
|
||||
}
|
||||
|
||||
uint64_t TransactionImpl::GetNumKeys() const {
|
||||
uint64_t count = 0;
|
||||
|
||||
// sum up locked keys in all column families
|
||||
for (const auto& key_map_iter : tracked_keys_) {
|
||||
const auto& keys = key_map_iter.second;
|
||||
count += keys.size();
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -44,7 +44,7 @@ class TransactionImpl : public TransactionBaseImpl {
|
||||
|
||||
void Rollback() override;
|
||||
|
||||
uint64_t GetNumKeys() const override;
|
||||
Status RollbackToSavePoint() override;
|
||||
|
||||
// Generate a new unique transaction identifier
|
||||
static TransactionID GenTxnID();
|
||||
@ -86,12 +86,6 @@ class TransactionImpl : public TransactionBaseImpl {
|
||||
// Timeout in microseconds when locking a key or -1 if there is no timeout.
|
||||
int64_t lock_timeout_;
|
||||
|
||||
// Map from column_family_id to map of keys to Sequence Numbers. Stores keys
|
||||
// that have been locked.
|
||||
// The key is known to not have been modified after the Sequence Number
|
||||
// stored.
|
||||
TransactionKeyMap tracked_keys_;
|
||||
|
||||
void Clear() override;
|
||||
|
||||
Status CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key);
|
||||
|
@ -1488,6 +1488,105 @@ TEST_F(TransactionTest, SavepointTest) {
|
||||
delete txn;
|
||||
}
|
||||
|
||||
TEST_F(TransactionTest, SavepointTest2) {
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options, snapshot_read_options;
|
||||
TransactionOptions txn_options;
|
||||
string value;
|
||||
Status s;
|
||||
|
||||
txn_options.lock_timeout = 1; // 1 ms
|
||||
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn1);
|
||||
|
||||
s = txn1->Put("A", "");
|
||||
ASSERT_OK(s);
|
||||
|
||||
txn1->SetSavePoint(); // 1
|
||||
|
||||
s = txn1->Put("A", "a");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn1->Put("C", "c");
|
||||
ASSERT_OK(s);
|
||||
|
||||
txn1->SetSavePoint(); // 2
|
||||
|
||||
s = txn1->Put("A", "a");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put("B", "b");
|
||||
ASSERT_OK(s);
|
||||
|
||||
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2
|
||||
|
||||
// Verify that "A" and "C" is still locked while "B" is not
|
||||
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn2);
|
||||
|
||||
s = txn2->Put("A", "a2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("C", "c2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("B", "b2");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn1->Put("A", "aa");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put("B", "bb");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
|
||||
s = txn2->Commit();
|
||||
ASSERT_OK(s);
|
||||
delete txn2;
|
||||
|
||||
s = txn1->Put("A", "aaa");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put("B", "bbb");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put("C", "ccc");
|
||||
ASSERT_OK(s);
|
||||
|
||||
txn1->SetSavePoint(); // 3
|
||||
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3
|
||||
|
||||
// Verify that "A", "B", "C" are still locked
|
||||
txn2 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn2);
|
||||
|
||||
s = txn2->Put("A", "a2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("B", "b2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("C", "c2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
|
||||
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
|
||||
|
||||
// Verify that only "A" is locked
|
||||
s = txn2->Put("A", "a3");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("B", "b3");
|
||||
ASSERT_OK(s);
|
||||
s = txn2->Put("C", "c3po");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn1->Commit();
|
||||
ASSERT_OK(s);
|
||||
delete txn1;
|
||||
|
||||
// Verify "A" "C" "B" are no longer locked
|
||||
s = txn2->Put("A", "a4");
|
||||
ASSERT_OK(s);
|
||||
s = txn2->Put("B", "b4");
|
||||
ASSERT_OK(s);
|
||||
s = txn2->Put("C", "c4");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn2->Commit();
|
||||
ASSERT_OK(s);
|
||||
delete txn2;
|
||||
}
|
||||
|
||||
TEST_F(TransactionTest, TimeoutTest) {
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
|
@ -100,11 +100,11 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
||||
return result;
|
||||
}
|
||||
|
||||
Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
|
||||
TransactionKeyMap* key_map) {
|
||||
Status TransactionUtil::CheckKeysForConflicts(
|
||||
DBImpl* db_impl, const TransactionKeyMap& key_map) {
|
||||
Status result;
|
||||
|
||||
for (auto& key_map_iter : *key_map) {
|
||||
for (auto& key_map_iter : key_map) {
|
||||
uint32_t cf_id = key_map_iter.first;
|
||||
const auto& keys = key_map_iter.second;
|
||||
|
||||
|
@ -46,7 +46,8 @@ class TransactionUtil {
|
||||
//
|
||||
// REQUIRED: this function should only be called on the write thread or if the
|
||||
// mutex is held.
|
||||
static Status CheckKeysForConflicts(DBImpl* db_impl, TransactionKeyMap* keys);
|
||||
static Status CheckKeysForConflicts(DBImpl* db_impl,
|
||||
const TransactionKeyMap& keys);
|
||||
|
||||
private:
|
||||
static Status CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
||||
|
Loading…
Reference in New Issue
Block a user