timestamp ordering txn first time commit

This commit is contained in:
wolfkdy 2020-02-13 04:48:53 +08:00
parent 3e49249d30
commit fbea0ff238
7 changed files with 2884 additions and 0 deletions

View File

@ -0,0 +1,96 @@
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
namespace rocksdb {
class Iterator;
class TransactionDB;
class WriteBatchWithIndex;
using TransactionName = std::string;
using TransactionID = uint64_t;
//TimeStamp in rocksdb
using RocksTimeStamp = uint64_t;
//TimeStamp Ordering Transaction
class TOTransaction {
public:
virtual ~TOTransaction() {}
//set commit timestamp for transaction, if the application set the commit timestamp twice, an error will be returned
virtual Status SetCommitTimeStamp(const RocksTimeStamp& timestamp) = 0;
//set read timestamp for transaction, if the application set the commit timestamp twice, an error will be returned
virtual Status SetReadTimeStamp(const RocksTimeStamp& timestamp, const uint32_t& round) = 0;
virtual Status GetReadTimeStamp(RocksTimeStamp* timestamp) const = 0;
virtual Status Commit() = 0;
virtual Status Rollback() = 0;
virtual Status Get(ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) = 0;
virtual Status Get(ReadOptions& options, const Slice& key,
std::string* value) = 0;
virtual Iterator* GetIterator(ReadOptions& read_options) = 0;
virtual Iterator* GetIterator(ReadOptions& read_options,
ColumnFamilyHandle* column_family) = 0;
virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
virtual Status Put(const Slice& key, const Slice& value) = 0;
virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key) = 0;
virtual Status Delete(const Slice& key) = 0;
virtual WriteBatchWithIndex* GetWriteBatch() = 0;
virtual Status SetName(const TransactionName& name) = 0;
virtual TransactionName GetName() const { return name_; }
virtual TransactionID GetID() const { return 0; }
enum TOTransactionState {
kStarted = 0,
kPrepared = 1,
kCommitted = 2,
kRollback = 3,
};
TOTransactionState GetState() const { return txn_state_; }
void SetState(TOTransactionState state) { txn_state_ = state; }
protected:
explicit TOTransaction(const DB* /*db*/) {}
TOTransaction() : txn_state_(kStarted) {}
TransactionName name_;
std::atomic<TOTransactionState> txn_state_;
uint64_t id_ = 0;
virtual void SetId(uint64_t id) {
assert(id_ == 0);
id_ = id;
}
};
} // namespace rocksdb
#endif

View File

@ -0,0 +1,103 @@
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <utility>
#include <vector>
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/totransaction.h"
#include "rocksdb/utilities/stackable_db.h"
namespace rocksdb {
//TimeStamp Ordering Transaction DB Options
#define DEFAULT_NUM_STRIPES 32
struct TOTransactionStat {
size_t max_conflict_bytes;
size_t cur_conflict_bytes;
size_t uk_num;
size_t ck_num;
size_t alive_txns_num;
size_t read_q_num;
size_t commit_q_num;
uint64_t oldest_ts;
uint64_t min_read_ts;
uint64_t max_commit_ts;
uint64_t committed_max_txnid;
uint64_t min_uncommit_ts;
uint64_t update_max_commit_ts_times;
uint64_t update_max_commit_ts_retries;
uint64_t txn_commits;
uint64_t txn_aborts;
uint64_t commit_without_ts_times;
uint64_t read_without_ts_times;
uint64_t read_with_ts_times;
uint64_t read_q_walk_len_sum;
uint64_t read_q_walk_times;
uint64_t commit_q_walk_len_sum;
uint64_t commit_q_walk_times;
};
struct TOTransactionDBOptions {
size_t num_stripes = DEFAULT_NUM_STRIPES;
size_t max_conflict_check_bytes_size = 200*1024*1024;
};
enum TimeStampType {
kOldest = 0,
kStable = 1,
kAllCommitted = 2,
kTimeStampMax,
};
//TimeStamp Ordering Transaction Options
struct TOTransactionOptions {
size_t max_write_batch_size = 1000;
};
//TimeStamp Ordering Transaction Options
struct TOTxnOptions {
size_t max_write_batch_size = 1000;
const Snapshot* txn_snapshot = nullptr;
Logger* log_ = nullptr;
};
class TOTransactionDB : public StackableDB {
public:
static Status Open(const Options& options,
const TOTransactionDBOptions& txn_db_options,
const std::string& dbname, TOTransactionDB** dbptr);
static Status Open(const DBOptions& db_options,
const TOTransactionDBOptions& txn_db_options,
const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
TOTransactionDB** dbptr);
// The lifecycle of returned pointer should be managed by the application level
virtual TOTransaction* BeginTransaction(
const WriteOptions& write_options,
const TOTransactionOptions& txn_options) = 0;
virtual Status SetTimeStamp(const TimeStampType& ts_type, const RocksTimeStamp& ts) = 0;
virtual Status QueryTimeStamp(const TimeStampType& ts_type, RocksTimeStamp* timestamp) = 0;
virtual Status Stat(TOTransactionStat* stat) = 0;
//virtual Status Close();
protected:
//std::shared_ptr<Logger> info_log_ = nullptr;
// To Create an ToTransactionDB, call Open()
explicit TOTransactionDB(DB* db) : StackableDB(db) {}
};
} // namespace rocksdb
#endif

View File

@ -0,0 +1,777 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/transactions/totransaction_db_impl.h"
#include "util/string_util.h"
#include "util/logging.h"
namespace rocksdb {
Status TOTransactionDB::Open(const Options& options,
const TOTransactionDBOptions& txn_db_options,
const std::string& dbname, TOTransactionDB** dbptr) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;
Status s = Open(db_options, txn_db_options, dbname, column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
// I can delete the handle since DBImpl is always holding a reference to
// default column family
delete handles[0];
}
return s;
}
Status TOTransactionDB::Open(const DBOptions& db_options,
const TOTransactionDBOptions& txn_db_options,
const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
TOTransactionDB** dbptr) {
Status s;
DB* db = nullptr;
if(db_options.open_read_replica){
ROCKS_LOG_INFO(db_options.info_log, "##### TOTDB open on readonly mode #####");
s = DB::OpenForReadOnly(db_options, dbname, column_families, handles,
&db, false);
if (s.ok()) {
auto v = new TOTransactionDBImpl(db, txn_db_options, true);
v->StartBackgroundCleanThread();
*dbptr = v;
}
}else{
ROCKS_LOG_INFO(db_options.info_log, "##### TOTDB open on normal mode #####");
s = DB::Open(db_options, dbname, column_families, handles, &db);
if (s.ok()) {
auto v = new TOTransactionDBImpl(db, txn_db_options, false);
v->StartBackgroundCleanThread();
*dbptr = v;
}
}
ROCKS_LOG_DEBUG(db_options.info_log, "##### TOTDB open success #####");
return s;
}
Status TOTransactionDBImpl::UnCommittedKeys::RemoveKeyInLock(const Slice& key,
const size_t& stripe_num,
std::atomic<int64_t>* mem_usage) {
UnCommittedLockMapStripe* stripe = lock_map_stripes_.at(stripe_num);
auto iter = stripe->uncommitted_keys_map_.find(key.ToString());
assert(iter != stripe->uncommitted_keys_map_.end());
auto ccbytes = mem_usage->fetch_sub(
iter->first.size() + sizeof(iter->second),
std::memory_order_relaxed);
assert(ccbytes >= 0);
(void)ccbytes;
stripe->uncommitted_keys_map_.erase(iter);
return Status::OK();
}
Status TOTransactionDBImpl::UnCommittedKeys::CheckKeyAndAddInLock(const Slice& key,
const TransactionID& txn_id,
const size_t& stripe_num,
const size_t& max_mem_usage,
std::atomic<int64_t>* mem_usage) {
UnCommittedLockMapStripe* stripe = lock_map_stripes_.at(stripe_num);
auto iter = stripe->uncommitted_keys_map_.find(key.ToString());
if (iter != stripe->uncommitted_keys_map_.end()) {
// Check whether the key is modified by the same txn
if (iter->second != txn_id) {
ROCKS_LOG_WARN(info_log_, "TOTDB WriteConflict another txn id(%llu) is modifying key(%s) \n",
iter->second, key.ToString());
return Status::Busy();
} else {
return Status::OK();
}
}
auto addedSize =
mem_usage->load(std::memory_order_relaxed) + key.size() + sizeof(txn_id);
if (addedSize > max_mem_usage) {
ROCKS_LOG_WARN(info_log_, "TOTDB WriteConflict mem usage(%ll) is greater than limit(%llu) \n",
addedSize, max_mem_usage);
return Status::Busy();
}
mem_usage->fetch_add(key.size() + sizeof(txn_id), std::memory_order_relaxed);
stripe->uncommitted_keys_map_.insert({key.ToString(), txn_id});
return Status::OK();
}
size_t TOTransactionDBImpl::UnCommittedKeys::CountInLock() const {
size_t res = 0;
for (size_t i = 0; i < lock_map_stripes_.size(); ++i) {
res += lock_map_stripes_[i]->uncommitted_keys_map_.size();
}
return res;
}
Status TOTransactionDBImpl::CommittedKeys::AddKeyInLock(const Slice& key,
const TransactionID& commit_txn_id,
const RocksTimeStamp& commit_ts,
const size_t& stripe_num,
std::atomic<int64_t>* mem_usage) {
CommittedLockMapStripe* stripe = lock_map_stripes_.at(stripe_num);
if (stripe->committed_keys_map_.find(key.ToString()) == stripe->committed_keys_map_.end()) {
mem_usage->fetch_add(key.size() + sizeof(commit_ts) + sizeof(commit_txn_id));
}
stripe->committed_keys_map_[key.ToString()] = {commit_txn_id, commit_ts};
return Status::OK();
}
Status TOTransactionDBImpl::CommittedKeys::RemoveKeyInLock(const Slice& key,
const TransactionID& txn_id,
const size_t& stripe_num,
std::atomic<int64_t>* mem_usage) {
CommittedLockMapStripe* stripe = lock_map_stripes_.at(stripe_num);
auto iter = stripe->committed_keys_map_.find(key.ToString());
assert(iter != stripe->committed_keys_map_.end());
if (iter->second.first <= txn_id) {
auto ccbytes = mem_usage->fetch_sub(
key.size() + sizeof(iter->second.first) + sizeof(iter->second.second),
std::memory_order_relaxed);
assert(ccbytes >= 0);
(void)ccbytes;
stripe->committed_keys_map_.erase(iter);
}
return Status::OK();
}
Status TOTransactionDBImpl::CommittedKeys::CheckKeyInLock(const Slice& key,
const TransactionID& txn_id,
const RocksTimeStamp& timestamp,
const size_t& stripe_num) {
// padding to avoid false sharing
CommittedLockMapStripe* stripe = lock_map_stripes_.at(stripe_num);
auto iter = stripe->committed_keys_map_.find(key.ToString());
if (iter != stripe->committed_keys_map_.end()) {
// Check whether some txn commits the key after txn_id began
const auto& back = iter->second;
if (back.first > txn_id) {
ROCKS_LOG_WARN(info_log_, "TOTDB WriteConflict a committed txn commit_id(%llu) greater than my txnid(%llu) \n",
back.first, txn_id);
return Status::Busy();
}
// Find the latest committed txn for this key and its commit ts
if (back.second > timestamp) {
ROCKS_LOG_WARN(info_log_, "TOTDB WriteConflict a committed txn commit_ts(%llu) greater than my read_ts(%llu) \n",
back.second, timestamp);
return Status::Busy();
}
}
return Status::OK();
}
size_t TOTransactionDBImpl::CommittedKeys::CountInLock() const {
size_t res = 0;
for (size_t i = 0; i < lock_map_stripes_.size(); ++i) {
res += lock_map_stripes_[i]->committed_keys_map_.size();
}
return res;
}
Status TOTransactionDBImpl::AddToActiveTxns(const std::shared_ptr<ATN>& active_txn) {
active_txns_.insert({active_txn->txn_id_, active_txn});
return Status::OK();
}
TOTransaction* TOTransactionDBImpl::BeginTransaction(const WriteOptions& write_options,
const TOTransactionOptions& txn_options) {
std::shared_ptr<ATN> newActiveTxnNode = nullptr;
TOTransaction* newTransaction = nullptr;
{
std::lock_guard<std::mutex> lock(active_txns_mutex_);
TOTxnOptions totxn_option;
totxn_option.max_write_batch_size = txn_options.max_write_batch_size;
totxn_option.txn_snapshot = dbimpl_->GetSnapshot();
totxn_option.log_ = info_log_;
newActiveTxnNode = std::shared_ptr<ATN>(new ATN);
newTransaction = new TOTransactionImpl(this, write_options, totxn_option, newActiveTxnNode);
// Add the transaction to active txns
newActiveTxnNode->txn_id_ = newTransaction->GetID();
newActiveTxnNode->txn_snapshot = totxn_option.txn_snapshot;
AddToActiveTxns(newActiveTxnNode);
}
ROCKS_LOG_DEBUG(info_log_, "TOTDB begin a txn id(%llu) snapshot(%llu) \n", newActiveTxnNode->txn_id_,
newActiveTxnNode->txn_snapshot->GetSequenceNumber());
return newTransaction;
}
Status TOTransactionDBImpl::CheckWriteConflict(ColumnFamilyHandle* column_family,
const Slice& key,
const TransactionID& txn_id,
const RocksTimeStamp& readts) {
//if first check the uc key and ck busy ,it will need remove uc key ,so we check commit key first
auto stripe_num = GetStripe(key.ToString());
assert(keys_mutex_.size() > stripe_num);
std::lock_guard<std::mutex> lock(*keys_mutex_[stripe_num]);
// Check whether some txn commits the key after current txn started
// Check whether the commit ts of latest committed txn for key is less than my read ts
Status s = committed_keys_.CheckKeyInLock(key, txn_id, readts, stripe_num);
if (!s.ok()) {
ROCKS_LOG_DEBUG(info_log_, "TOTDB txn id(%llu) key(%s) conflict ck \n", txn_id, key.ToString(true));
return s;
}
// Check whether the key is in uncommitted keys
// if not, add the key to uncommitted keys
s = uncommitted_keys_.CheckKeyAndAddInLock(key, txn_id, stripe_num,
max_conflict_bytes_, &current_conflict_bytes_);
if (!s.ok()) {
ROCKS_LOG_DEBUG(info_log_, "TOTDB txn id(%llu) key(%s) conflict uk \n", txn_id, key.ToString(true));
return s;
}
return Status::OK();
}
void TOTransactionDBImpl::CleanCommittedKeys() {
TransactionID txn = 0;
RocksTimeStamp ts = 0;
while(clean_job_.IsRunning()) {
if (clean_job_.NeedToClean(&txn, &ts)) {
for (size_t i = 0; i < num_stripes_; i++) {
std::lock_guard<std::mutex> lock(*keys_mutex_[i]);
CommittedLockMapStripe* stripe = committed_keys_.lock_map_stripes_.at(i);
auto map_iter = stripe->committed_keys_map_.begin();
while (map_iter != stripe->committed_keys_map_.end()) {
auto history = map_iter->second;
if (history.first <= txn && (history.second < ts || history.second == 0)) {
auto ccbytes = current_conflict_bytes_.fetch_sub(
map_iter->first.size() + sizeof(history.first) + sizeof(history.second), std::memory_order_relaxed);
assert(ccbytes >= 0);
(void)ccbytes;
map_iter = stripe->committed_keys_map_.erase(map_iter);
} else {
map_iter ++;
}
}
}
clean_job_.FinishClean(txn, ts);
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
// End run clean thread
}
void TOTransactionDBImpl::StartBackgroundCleanThread() {
clean_thread_ = std::thread([this] { CleanCommittedKeys(); });
}
void TOTransactionDBImpl::AdvanceTS(RocksTimeStamp* pMaxToCleanTs) {
RocksTimeStamp maxToCleanTs = 0;
{
ReadLock rl(&ts_meta_mutex_);
if (oldest_ts_ != nullptr) {
maxToCleanTs = *oldest_ts_;
}
}
{
ReadLock rl(&read_ts_mutex_);
for (auto it = read_q_.begin(); it != read_q_.end();) {
if (it->second->state_.load() == TOTransaction::kStarted) {
assert(it->second->read_ts_set_);
maxToCleanTs = std::min(maxToCleanTs, it->second->read_ts_);
break;
}
it++;
}
}
*pMaxToCleanTs = maxToCleanTs;
return;
}
Status TOTransactionDBImpl::AddReadQueue(const std::shared_ptr<ATN>& core,
const RocksTimeStamp& ts,
const uint32_t& round) {
// the puzzle is the critical-area length of ts_meta_mutex_
// between AddReadQueue and AddCommitQueue
RocksTimeStamp realTs = ts;
ReadLock rl(&ts_meta_mutex_);
if (oldest_ts_ != nullptr) {
if (realTs < *oldest_ts_) {
if (round == 0) {
return Status::InvalidArgument("read-ts smaller than oldest ts");
}
realTs = *oldest_ts_;
} else {
// otherwise, realTs >= *oldest_ts_
}
}
// take care of the critical area, read_ts_mutex_ is within
// ts_meta_mutex_
WriteLock wl(&read_ts_mutex_);
assert(!core->read_ts_set_);
assert(core->state_.load() == TOTransaction::kStarted);
// we have to clean commited/aboarted txns, right?
// we just start from the beginning and clean until the first active txn
// This is only a strategy and has nothing to do with the correctness
// You may design other strategies.
for (auto it = read_q_.begin(); it != read_q_.end();) {
if (it->second->state_.load() == TOTransaction::kStarted) {
break;
}
// TODO: add walk stat
it = read_q_.erase(it);
}
core->read_ts_set_ = true;
core->read_ts_ = realTs;
read_q_.insert({{realTs, core->txn_id_}, core});
return Status::OK();
}
Status TOTransactionDBImpl::AddCommitQueue(const std::shared_ptr<ATN>& core,
const RocksTimeStamp& ts) {
// we have no need to protect this action within
// ts_meta_mutex_, because mongodb primary can guarantee AddCommitQueue
// is within the hlc-allocator critical-area. And oldestTs is forked from
// allcommitted-ts, so there is no way that pending-add ts is smaller
// than oldestTs. But a check without ts_meta_mutex_ is not costive
// and at some degree can guarantee corectness.
{
ReadLock rl(&ts_meta_mutex_);
// see the comments above, we have no need to hold the rl too long.
if (oldest_ts_ != nullptr && ts < *oldest_ts_) {
return Status::InvalidArgument("commit-ts smaller than oldest ts");
}
}
if (core->commit_ts_set_) {
if (ts < core->first_commit_ts_) {
return Status::InvalidArgument("commit-ts smaller than first-commit-ts");
}
if (ts < core->commit_ts_) {
// in wt3.0, there is no restriction like this one.
return Status::InvalidArgument("commit-ts must be monotonic in a txn");
}
core->commit_ts_ = ts;
return Status::OK();
}
{
// if we dont have a commit_ts, the ts set this time
// shall be the first-commit-timestamp. And the commit-ts(es) set in
// the following times will always be greater than first-commit-ts.
// we publish first-commit-ts into the commit-queue, first-commit-ts
// is fixed, never changed during a transaction.
WriteLock wl(&commit_ts_mutex_);
assert(!core->commit_ts_set_);
assert(core->state_.load() == TOTransaction::kStarted);
// we have to clean commited/aboarted txns, right?
// we just start from the beginning and clean until the first active txn
// This is only a strategy and has nothing to do with the correctness
// You may design other strategies.
for (auto it = commit_q_.begin(); it != commit_q_.end();) {
if (it->second->state_.load() == TOTransaction::kStarted) {
break;
}
// TODO: add walk stat
it = commit_q_.erase(it);
}
assert(!core->commit_ts_set_);
core->commit_ts_set_ = true;
core->first_commit_ts_ = ts;
core->commit_ts_ = ts;
commit_q_.insert({{ts, core->txn_id_}, core});
}
return Status::OK();
}
Status TOTransactionDBImpl::CommitTransaction(std::shared_ptr<ATN> core,
const std::set<std::string>& written_keys) {
TransactionID maxToCleanTxnId = 0;
RocksTimeStamp maxToCleanTs = 0;
bool needClean = false;
ROCKS_LOG_DEBUG(info_log_,
"TOTDB start to commit txn id(%llu) commit ts(%llu)\n",
core->txn_id_,
core->commit_ts_);
// Update Active Txns
{
std::lock_guard<std::mutex> lock(active_txns_mutex_);
auto iter = active_txns_.find(core->txn_id_);
assert(iter != active_txns_.end());
assert(iter->second->state_.load() == TOTransaction::kStarted);
iter->second->state_.store(TOTransaction::kCommitted);
dbimpl_->ReleaseSnapshot(iter->second->txn_snapshot);
core->commit_txn_id_ = TOTransactionImpl::GenTxnID();
iter = active_txns_.erase(iter);
if (core->txn_id_ > committed_max_txnid_) {
committed_max_txnid_ = core->txn_id_;
}
if (iter == active_txns_.begin()) {
needClean = true;
if (!active_txns_.empty()) {
maxToCleanTxnId = active_txns_.begin()->first - 1;
} else {
maxToCleanTxnId = committed_max_txnid_;
}
}
}
//it's okey to publish commit_ts a little later
if (core->commit_ts_set_) {
auto prev = committed_max_ts_.load(std::memory_order_relaxed);
while (core->commit_ts_ > prev) {
update_max_commit_ts_times_.fetch_add(1, std::memory_order_relaxed);
if (committed_max_ts_.compare_exchange_strong(prev, core->commit_ts_)) {
has_commit_ts_.store(true);
break;
}
update_max_commit_ts_retries_.fetch_add(1, std::memory_order_relaxed);
prev = committed_max_ts_.load(std::memory_order_relaxed);
}
} else {
commit_without_ts_times_.fetch_add(1, std::memory_order_relaxed);
}
AdvanceTS(&maxToCleanTs);
// txnid_ts_keys_map[{commit_txn_id, commit_ts}] = std::list<KeyModifyHistory::iterator>();
// Move Uncommited keys for this txn to committed keys
std::map<size_t, std::set<std::string>> stripe_keys_map;
auto keys_iter = written_keys.begin();
while (keys_iter != written_keys.end()) {
auto stripe_num = GetStripe(*keys_iter);
if (stripe_keys_map.find(stripe_num) == stripe_keys_map.end()) {
stripe_keys_map[stripe_num] = {};
}
stripe_keys_map[stripe_num].insert(std::move(*keys_iter));
keys_iter++;
}
auto stripe_keys_iter = stripe_keys_map.begin();
while (stripe_keys_iter != stripe_keys_map.end()) {
std::lock_guard<std::mutex> lock(*keys_mutex_[stripe_keys_iter->first]);
// the key in one txn insert to the CK with the max commit ts
for (auto & key : stripe_keys_iter->second) {
committed_keys_.AddKeyInLock(key, core->commit_txn_id_,
core->commit_ts_, stripe_keys_iter->first, &current_conflict_bytes_);
uncommitted_keys_.RemoveKeyInLock(key, stripe_keys_iter->first, &current_conflict_bytes_);
}
stripe_keys_iter++;
}
ROCKS_LOG_DEBUG(info_log_, "TOTDB end commit txn id(%llu) cid(%llu) commit ts(%llu)\n",
core->txn_id_, core->commit_txn_id_, core->commit_ts_);
// Clean committed keys
if (needClean) {
// Clean committed keys async
// Clean keys whose commited txnid <= maxToCleanTxnId
// and committed ts < maxToCleanTs
ROCKS_LOG_DEBUG(info_log_, "TOTDB going to clean txnid(%llu) ts(%llu) \n",
maxToCleanTxnId, maxToCleanTs);
clean_job_.SetCleanInfo(maxToCleanTxnId, maxToCleanTs);
}
txn_commits_.fetch_add(1, std::memory_order_relaxed);
if (core->read_ts_set_) {
read_with_ts_times_.fetch_add(1, std::memory_order_relaxed);
} else {
read_without_ts_times_.fetch_add(1, std::memory_order_relaxed);
}
return Status::OK();
}
Status TOTransactionDBImpl::RollbackTransaction(std::shared_ptr<ATN> core,
const std::set<std::string>& written_keys) {
ROCKS_LOG_DEBUG(info_log_, "TOTDB start to rollback txn id(%llu) \n", core->txn_id_);
// Remove txn for active txns
bool needClean = false;
TransactionID maxToCleanTxnId = 0;
RocksTimeStamp maxToCleanTs = 0;
{
std::lock_guard<std::mutex> lock(active_txns_mutex_);
auto iter = active_txns_.find(core->txn_id_);
assert(iter != active_txns_.end());
assert(iter->second->state_.load() == TOTransaction::kStarted);
iter->second->state_.store(TOTransaction::kRollback);
dbimpl_->ReleaseSnapshot(iter->second->txn_snapshot);
iter = active_txns_.erase(iter);
if (iter == active_txns_.begin()) {
needClean = true;
if (!active_txns_.empty()) {
maxToCleanTxnId = active_txns_.begin()->first - 1;
} else {
maxToCleanTxnId = committed_max_txnid_;
}
}
}
// Calculation the min clean ts between oldest and the read ts
AdvanceTS(&maxToCleanTs);
// Remove written keys from uncommitted keys
std::map<size_t, std::set<std::string>> stripe_keys_map;
auto keys_iter = written_keys.begin();
while (keys_iter != written_keys.end()) {
std::set<std::string> stripe_keys;
auto stripe_num = GetStripe(*keys_iter);
if (stripe_keys_map.find(stripe_num) == stripe_keys_map.end()) {
stripe_keys_map[stripe_num] = {};
}
stripe_keys_map[stripe_num].insert(std::move(*keys_iter));
keys_iter++;
}
auto stripe_keys_iter = stripe_keys_map.begin();
while (stripe_keys_iter != stripe_keys_map.end()) {
std::lock_guard<std::mutex> lock(*keys_mutex_[stripe_keys_iter->first]);
for (auto & key : stripe_keys_iter->second) {
uncommitted_keys_.RemoveKeyInLock(key, stripe_keys_iter->first, &current_conflict_bytes_);
}
stripe_keys_iter++;
}
ROCKS_LOG_DEBUG(info_log_, "TOTDB end rollback txn id(%llu) \n", core->txn_id_);
if (needClean) {
ROCKS_LOG_DEBUG(info_log_, "TOTDB going to clean txnid(%llu) ts(%llu) \n",
maxToCleanTxnId, maxToCleanTs);
clean_job_.SetCleanInfo(maxToCleanTxnId, maxToCleanTs);
}
txn_aborts_.fetch_add(1, std::memory_order_relaxed);
if (core->read_ts_set_) {
read_with_ts_times_.fetch_add(1, std::memory_order_relaxed);
} else {
read_without_ts_times_.fetch_add(1, std::memory_order_relaxed);
}
return Status::OK();
}
Status TOTransactionDBImpl::SetTimeStamp(const TimeStampType& ts_type,
const RocksTimeStamp& ts) {
if (ts_type == kAllCommitted) {
// TODO:
// NOTE; actually, it should be called kCommittedTimestamp
// and kCommittedTimestamp can be set backwards in wt.
// But currently, we dont have this need.
return Status::InvalidArgument("kAllCommittedTs can not set");
}
if (ts_type == kOldest) {
// NOTE: here we must take lock, every txn's readTs-setting
// has to be in the same critical area within the set of kOldest
{
WriteLock wl(&ts_meta_mutex_);
if (oldest_ts_ != nullptr && *oldest_ts_ > ts) {
return Status::InvalidArgument("oldestTs can not travel back");
}
oldest_ts_.reset(new RocksTimeStamp(ts));
}
auto pin_ts = ts;
{
ReadLock rl(&read_ts_mutex_);
uint64_t walk_cnt = 0;
for (auto it = read_q_.begin(); it != read_q_.end();) {
if (it->second->state_.load() == TOTransaction::kStarted) {
assert(it->second->read_ts_set_);
pin_ts = std::min(pin_ts, it->second->read_ts_);
break;
}
it++;
walk_cnt++;
}
read_q_walk_len_sum_.fetch_add(read_q_.size(), std::memory_order_relaxed);
read_q_walk_times_.fetch_add(walk_cnt, std::memory_order_relaxed);
}
dbimpl_->AdvancePinTs(pin_ts);
ROCKS_LOG_DEBUG(info_log_, "TOTDB set TS type(%d) value(%llu)\n", ts_type, ts);
return Status::OK();
}
return Status::InvalidArgument("invalid ts type");
}
Status TOTransactionDBImpl::QueryTimeStamp(const TimeStampType& ts_type,
RocksTimeStamp* timestamp) {
if (ts_type == kAllCommitted) {
if (!has_commit_ts_.load(/* seq_cst */)) {
return Status::NotFound("not found");
}
auto tmp = committed_max_ts_.load(std::memory_order_relaxed);
ReadLock rl(&commit_ts_mutex_);
uint64_t walk_cnt = 0;
for (auto it = commit_q_.begin(); it != commit_q_.end(); ++it) {
if (it->second->state_.load(std::memory_order_relaxed) != TOTransaction::kStarted) {
walk_cnt++;
continue;
}
assert(it->second->commit_ts_set_);
assert(it->second->first_commit_ts_ > 0);
assert(it->second->commit_ts_ >= it->second->first_commit_ts_);
tmp = std::min(tmp, it->second->first_commit_ts_-1);
break;
}
commit_q_walk_len_sum_.fetch_add(commit_q_.size(), std::memory_order_relaxed);
commit_q_walk_times_.fetch_add(walk_cnt, std::memory_order_relaxed);
*timestamp = tmp;
return Status::OK();
}
if (ts_type == kOldest) {
// NOTE: query oldest is not a frequent thing, so I just
// take the rlock
ReadLock rl(&ts_meta_mutex_);
if (oldest_ts_ == nullptr) {
return Status::NotFound("not found");
}
*timestamp = *oldest_ts_;
ROCKS_LOG_DEBUG(info_log_, "TOTDB query TS type(%llu) value(%llu) \n", ts_type, *timestamp);
return Status::OK();
}
return Status::InvalidArgument("invalid ts_type");
}
Status TOTransactionDBImpl::Stat(TOTransactionStat* stat) {
if (stat == nullptr) {
return Status::InvalidArgument("can not accept null as input");
}
memset(stat, 0, sizeof(TOTransactionStat));
stat->max_conflict_bytes = max_conflict_bytes_;
stat->cur_conflict_bytes = current_conflict_bytes_.load(std::memory_order_relaxed);
{
std::vector<std::unique_lock<std::mutex>> lks;
for (size_t i = 0; i < num_stripes_; i++) {
lks.emplace_back(std::unique_lock<std::mutex>(*keys_mutex_[i]));
}
stat->uk_num = uncommitted_keys_.CountInLock();
stat->ck_num = committed_keys_.CountInLock();
}
{
std::lock_guard<std::mutex> lock(active_txns_mutex_);
stat->alive_txns_num = active_txns_.size();
}
{
ReadLock rl(&read_ts_mutex_);
stat->read_q_num = read_q_.size();
for (auto it = read_q_.begin(); it != read_q_.end(); it++) {
assert(it->second->read_ts_set_);
if (it->second->state_.load(std::memory_order_relaxed) == TOTransaction::kStarted) {
stat->min_read_ts = it->second->read_ts_;
break;
}
}
}
{
ReadLock rl(&commit_ts_mutex_);
stat->commit_q_num = commit_q_.size();
for (auto it = commit_q_.begin(); it != commit_q_.end(); it++) {
assert(it->second->commit_ts_set_);
if (it->second->state_.load(std::memory_order_relaxed) == TOTransaction::kStarted) {
stat->min_uncommit_ts = it->second->commit_ts_;
break;
}
}
}
{
ReadLock rl(&ts_meta_mutex_);
stat->oldest_ts = oldest_ts_ == nullptr ? 0 : *oldest_ts_;
}
stat->max_commit_ts = has_commit_ts_.load() ? committed_max_ts_.load() : 0;
stat->update_max_commit_ts_times = update_max_commit_ts_times_.load(std::memory_order_relaxed);
stat->update_max_commit_ts_retries = update_max_commit_ts_retries_.load(std::memory_order_relaxed);
stat->committed_max_txnid = committed_max_txnid_;
stat->txn_commits = txn_commits_.load(std::memory_order_relaxed);
stat->txn_aborts = txn_aborts_.load(std::memory_order_relaxed);
stat->commit_without_ts_times = commit_without_ts_times_.load(std::memory_order_relaxed);
stat->read_without_ts_times = read_without_ts_times_.load(std::memory_order_relaxed);
stat->read_with_ts_times = read_with_ts_times_.load(std::memory_order_relaxed);
stat->read_q_walk_len_sum = read_q_walk_len_sum_.load(std::memory_order_relaxed);
stat->read_q_walk_times = read_q_walk_times_.load(std::memory_order_relaxed);
stat->commit_q_walk_len_sum = commit_q_walk_len_sum_.load(std::memory_order_relaxed);
stat->commit_q_walk_times = commit_q_walk_times_.load(std::memory_order_relaxed);
return Status::OK();
}
Status TOTransactionDBImpl::BackgroundCleanJob::SetCleanInfo(const TransactionID& txn_id,
const RocksTimeStamp& time_stamp) {
std::lock_guard<std::mutex> lock(thread_mutex_);
txnid_ = txn_id;
ts_ = time_stamp;
return Status::OK();
}
bool TOTransactionDBImpl::BackgroundCleanJob::IsRunning() {
std::lock_guard<std::mutex> lock(thread_mutex_);
return thread_state_ == kRunning;
}
bool TOTransactionDBImpl::BackgroundCleanJob::NeedToClean(TransactionID* txn_id,
RocksTimeStamp* time_stamp) {
std::lock_guard<std::mutex> lock(thread_mutex_);
if (thread_state_ != kRunning) {
return false;
}
*txn_id = txnid_;
*time_stamp = ts_;
return (txnid_ != 0);
}
void TOTransactionDBImpl::BackgroundCleanJob::FinishClean(const TransactionID& txn_id,
const RocksTimeStamp& time_stamp) {
std::lock_guard<std::mutex> lock(thread_mutex_);
if (txn_id == txnid_ && ts_ == time_stamp) {
txnid_ = 0;
ts_ = 0;
}
}
void TOTransactionDBImpl::BackgroundCleanJob::StopThread() {
std::lock_guard<std::mutex> lock(thread_mutex_);
thread_state_ = kStopped;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,319 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include <array>
#include "rocksdb/db.h"
#include "db/db_impl.h"
#include "util/autovector.h"
#include "util/mutexlock.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/totransaction_db.h"
#include "util/murmurhash.h"
#include "utilities/transactions/totransaction_impl.h"
namespace rocksdb {
class TOTransactionDBImpl : public TOTransactionDB {
public:
TOTransactionDBImpl(DB* db, const TOTransactionDBOptions& txn_db_options, bool read_only)
: TOTransactionDB(db),
dbimpl_(reinterpret_cast<DBImpl*>(db)),
read_only_(read_only),
txn_db_options_(txn_db_options),
num_stripes_(DEFAULT_NUM_STRIPES),
committed_max_txnid_(0),
current_conflict_bytes_(0),
max_conflict_bytes_(1.1*txn_db_options.max_conflict_check_bytes_size),
txn_commits_(0),
txn_aborts_(0),
committed_max_ts_(0),
has_commit_ts_(false),
update_max_commit_ts_times_(0),
update_max_commit_ts_retries_(0),
commit_without_ts_times_(0),
read_without_ts_times_(0),
read_with_ts_times_(0),
read_q_walk_times_(0),
read_q_walk_len_sum_(0),
commit_q_walk_times_(0),
commit_q_walk_len_sum_(0),
oldest_ts_(nullptr){
if (max_conflict_bytes_ == 0) {
// we preserve at least 100MB for conflict check
max_conflict_bytes_ = 100*1024*1024;
}
info_log_ = dbimpl_->GetDBOptions().info_log.get();
uncommitted_keys_.SetLogger(info_log_);
committed_keys_.SetLogger(info_log_);
active_txns_.clear();
// Init default num_stripes
num_stripes_ = (txn_db_options.num_stripes > 0)
? txn_db_options.num_stripes
: DEFAULT_NUM_STRIPES;
uncommitted_keys_.lock_map_stripes_.reserve(num_stripes_);
for (size_t i = 0; i < num_stripes_; i++) {
UnCommittedLockMapStripe* stripe = new UnCommittedLockMapStripe();
uncommitted_keys_.lock_map_stripes_.push_back(stripe);
}
committed_keys_.lock_map_stripes_.reserve(num_stripes_);
for (size_t i = 0; i < num_stripes_; i++) {
CommittedLockMapStripe* stripe = new CommittedLockMapStripe();
committed_keys_.lock_map_stripes_.push_back(stripe);
}
keys_mutex_.reserve(num_stripes_);
for (size_t i = 0; i < num_stripes_; i++) {
std::mutex* key_mutex = new std::mutex();
keys_mutex_.push_back(key_mutex);
}
}
~TOTransactionDBImpl() {
// Clean resources
clean_job_.StopThread();
clean_thread_.join();
{
for (auto& it : uncommitted_keys_.lock_map_stripes_) {
delete it;
}
uncommitted_keys_.lock_map_stripes_.clear();
for (auto& it : committed_keys_.lock_map_stripes_) {
delete it;
}
committed_keys_.lock_map_stripes_.clear();
for (auto& it : keys_mutex_) {
delete it;
}
keys_mutex_.clear();
}
std::lock_guard<std::mutex> lock(active_txns_mutex_);
active_txns_.clear();
}
void StartBackgroundCleanThread();
void SetMaxConflictBytes(uint64_t bytes) {
max_conflict_bytes_ = bytes;
}
virtual TOTransaction* BeginTransaction(const WriteOptions& write_options,
const TOTransactionOptions& txn_options) override;
using ATN = TOTransactionImpl::ActiveTxnNode;
Status CommitTransaction(std::shared_ptr<ATN> core,
const std::set<std::string>& written_keys);
Status RollbackTransaction(std::shared_ptr<ATN> core,
const std::set<std::string>& written_keys);
Status SetTimeStamp(const TimeStampType& ts_type, const RocksTimeStamp& ts) override;
Status QueryTimeStamp(const TimeStampType& ts_type, RocksTimeStamp* timestamp) override;
Status Stat(TOTransactionStat* stat) override;
Status CheckWriteConflict(ColumnFamilyHandle* column_family,
const Slice& key,
const TransactionID& txn_id,
const RocksTimeStamp& readts);
Status AddCommitQueue(const std::shared_ptr<ATN>& core,
const RocksTimeStamp& ts);
Status AddReadQueue(const std::shared_ptr<ATN>& core,
const RocksTimeStamp& ts,
const uint32_t& round);
void AdvanceTS(RocksTimeStamp* maxToCleanTs);
void CleanCommittedKeys();
bool IsReadOnly() const { return read_only_; }
// Committed key, first commit txnid, second commit ts
typedef std::pair<TransactionID, RocksTimeStamp> KeyModifyHistory;
protected:
DBImpl* dbimpl_;
bool read_only_;
const TOTransactionDBOptions txn_db_options_;
Logger* info_log_ = nullptr;
size_t num_stripes_;
TransactionID committed_max_txnid_;
std::atomic<int64_t> current_conflict_bytes_;
int64_t max_conflict_bytes_;
std::atomic<uint64_t> txn_commits_;
std::atomic<uint64_t> txn_aborts_;
class BackgroundCleanJob {
std::mutex thread_mutex_;
TransactionID txnid_;
RocksTimeStamp ts_;
enum ThreadState {
kRunning,
kStopped
};
ThreadState thread_state_;
public:
BackgroundCleanJob()
:txnid_(0),ts_(0) {
thread_state_ = kRunning;
}
~BackgroundCleanJob() {
}
Status SetCleanInfo(const TransactionID& txn_id,
const RocksTimeStamp& time_stamp);
bool IsRunning();
bool NeedToClean(TransactionID* txn_id,
RocksTimeStamp* time_stamp);
void FinishClean(const TransactionID& txn_id,
const RocksTimeStamp& time_stamp);
void StopThread();
};
private:
using TSTXN = std::pair<RocksTimeStamp, TransactionID>;
// Add txn to active txns
Status AddToActiveTxns(const std::shared_ptr<ATN>& active_txn);
void RemoveUncommittedKeysOnCleanup(const std::set<std::string>& written_keys);
// Active txns
std::mutex active_txns_mutex_;
std::map<TransactionID, std::shared_ptr<ATN>> active_txns_;
// txns sorted by {commit_ts, txnid}
port::RWMutex commit_ts_mutex_;
std::map<TSTXN, std::shared_ptr<ATN>> commit_q_;
// txns sorted by {read_ts, txnid}
port::RWMutex read_ts_mutex_;
std::map<TSTXN, std::shared_ptr<ATN>> read_q_;
struct UnCommittedLockMapStripe {
std::map<std::string, TransactionID> uncommitted_keys_map_;
};
size_t GetStripe(const std::string& key) const {
assert(num_stripes_ > 0);
static murmur_hash hash;
size_t stripe = hash(key) % num_stripes_;
return stripe;
}
// Uncommitted keys
struct UnCommittedKeys {
std::vector<UnCommittedLockMapStripe*> lock_map_stripes_;
Logger* info_log_;
public:
// Remove key from uncommitted keys
void SetLogger(Logger* info_log) {
info_log_ = info_log;
}
Status RemoveKeyInLock(const Slice& key, const size_t& stripe_num,
std::atomic<int64_t>* mem_usage);
// Check write conflict and add the key to uncommitted keys
Status CheckKeyAndAddInLock(const Slice& key,
const TransactionID& txn_id,
const size_t& stripe_num,
const size_t& max_mem_usage,
std::atomic<int64_t>* mem_usage);
size_t CountInLock() const;
};
struct CommittedLockMapStripe {
//std::mutex map_mutex_;
std::map<std::string, KeyModifyHistory> committed_keys_map_;
};
struct CommittedKeys {
std::vector<CommittedLockMapStripe*> lock_map_stripes_;
Logger* info_log_;
public:
void SetLogger(Logger* info_log) {
info_log_ = info_log;
}
// Add key to committed keys
Status AddKeyInLock(const Slice& key,
const TransactionID& commit_txn_id,
const RocksTimeStamp& commit_ts,
const size_t& stripe_num,
std::atomic<int64_t>* mem_usage);
// Remove key from committed keys
Status RemoveKeyInLock(const Slice& key,
const TransactionID& txn_id,
const size_t& stripe_num,
std::atomic<int64_t>* mem_usage);
// Check write conflict
Status CheckKeyInLock(const Slice& key,
const TransactionID& txn_id,
const RocksTimeStamp& timestamp,
const size_t& stripe_num);
size_t CountInLock() const;
};
std::vector<std::mutex*> keys_mutex_;
UnCommittedKeys uncommitted_keys_;
CommittedKeys committed_keys_;
BackgroundCleanJob clean_job_;
std::thread clean_thread_;
// NOTE(wolfkdy): commit_ts_ is not protected by ts_meta_mutex_
// remember to publish commit_ts_ before has_commit_ts_
std::atomic<RocksTimeStamp> committed_max_ts_;
std::atomic<bool> has_commit_ts_;
std::atomic<uint64_t> update_max_commit_ts_times_;
std::atomic<uint64_t> update_max_commit_ts_retries_;
std::atomic<uint64_t> commit_without_ts_times_;
std::atomic<uint64_t> read_without_ts_times_;
std::atomic<uint64_t> read_with_ts_times_;
std::atomic<uint64_t> read_q_walk_times_;
std::atomic<uint64_t> read_q_walk_len_sum_;
std::atomic<uint64_t> commit_q_walk_times_;
std::atomic<uint64_t> commit_q_walk_len_sum_;
// TODO(wolfkdy): use optional<>
port::RWMutex ts_meta_mutex_;
// protected by ts_meta_mutex_
std::unique_ptr<RocksTimeStamp> oldest_ts_;
// protected by ts_meta_mutex_
std::unique_ptr<RocksTimeStamp> pinned_ts_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,279 @@
// Copyright (c) 2019-present. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "db/column_family.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/totransaction_db.h"
#include "util/string_util.h"
#include "util/logging.h"
#include "util/cast_util.h"
#include "utilities/transactions/totransaction_db_impl.h"
#include "utilities/transactions/totransaction_impl.h"
namespace rocksdb {
struct WriteOptions;
std::atomic<TransactionID> TOTransactionImpl::txn_id_counter_(1);
TransactionID TOTransactionImpl::GenTxnID() {
return txn_id_counter_.fetch_add(1);
}
TOTransactionImpl::TOTransactionImpl(TOTransactionDB* txn_db,
const WriteOptions& write_options,
const TOTxnOptions& txn_option,
const std::shared_ptr<ActiveTxnNode>& core)
: txn_id_(0),
db_(txn_db->GetRootDB()),
write_options_(write_options),
txn_option_(txn_option),
cmp_(GetColumnFamilyUserComparator(db_->DefaultColumnFamily())),
write_batch_(cmp_, 0, true, 0),
core_(core) {
txn_db_impl_ = static_cast_with_check<TOTransactionDBImpl, TOTransactionDB>(txn_db);
assert(txn_db_impl_);
db_impl_ = static_cast_with_check<DBImpl, DB>(txn_db->GetRootDB());
Initialize();
}
void TOTransactionImpl::Initialize() {
txn_id_ = GenTxnID();
txn_state_ = kStarted;
}
TOTransactionImpl::~TOTransactionImpl() {
// Do rollback if this transaction is not committed or rolled back
if (txn_state_ < kCommitted) {
Rollback();
}
}
Status TOTransactionImpl::SetReadTimeStamp(const RocksTimeStamp& timestamp,
const uint32_t& round) {
if (txn_state_ >= kCommitted) {
return Status::NotSupported("this txn is committed or rollback");
}
if (core_->read_ts_set_) {
return Status::NotSupported("set read ts is supposed to be set only once");
}
ROCKS_LOG_DEBUG(txn_option_.log_, "TOTDB txn id(%llu) set read ts(%llu) force(%d)\n",
txn_id_, timestamp, round);
Status s = txn_db_impl_->AddReadQueue(core_, timestamp, round);
if (!s.ok()) {
return s;
}
assert(core_->read_ts_set_);
assert(core_->read_ts_ >= timestamp);
return s;
}
Status TOTransactionImpl::SetCommitTimeStamp(const RocksTimeStamp& timestamp) {
if (txn_state_ >= kCommitted) {
return Status::NotSupported("this txn is committed or rollback");
}
if (timestamp == 0) {
return Status::NotSupported("not allowed to set committs to 0");
}
if (core_->commit_ts_set_) {
if (core_->commit_ts_ > timestamp) {
return Status::NotSupported("commit ts need equal with the pre set");
}
if (core_->commit_ts_ == timestamp) {
return Status::OK();
}
}
assert((!core_->commit_ts_set_) || core_->commit_ts_ < timestamp);
// publish commit_ts to global view
auto s = txn_db_impl_->AddCommitQueue(core_, timestamp);
if (!s.ok()) {
return s;
}
assert(core_->commit_ts_set_ && (core_->first_commit_ts_ <= core_->commit_ts_));
ROCKS_LOG_DEBUG(txn_option_.log_, "TOTDB txn id(%llu) set commit ts(%llu)\n",
core_->txn_id_, timestamp);
return Status::OK();
}
Status TOTransactionImpl::GetReadTimeStamp(RocksTimeStamp* timestamp) const {
if ((!timestamp) || (!core_->read_ts_set_)) {
return Status::InvalidArgument("need set read ts, and parameter should not be null");
}
*timestamp = core_->read_ts_;
return Status::OK();
}
WriteBatchWithIndex* TOTransactionImpl::GetWriteBatch() {
return &write_batch_;
}
Status TOTransactionImpl::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
if (txn_db_impl_->IsReadOnly()) {
return Status::NotSupported("readonly db cannot accept put");
}
if (txn_state_ >= kCommitted) {
return Status::NotSupported("this txn is already committed or rollback");
}
Status s = CheckWriteConflict(column_family, key);
if (s.ok()) {
writtenKeys_.insert(key.ToString());
GetWriteBatch()->Put(column_family, key, value);
write_options_.asif_commit_timestamps.emplace_back(core_->commit_ts_);
}
return s;
}
Status TOTransactionImpl::Put(const Slice& key, const Slice& value) {
return Put(db_->DefaultColumnFamily(), key, value);
}
Status TOTransactionImpl::Get(ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
if (txn_state_ >= kCommitted) {
return Status::NotSupported("this txn is already committed or rollback");
}
// Check the options, if read ts is set use read ts
options.read_timestamp = core_->read_ts_;
options.snapshot = txn_option_.txn_snapshot;
return write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
value);
}
Status TOTransactionImpl::Get(ReadOptions& options, const Slice& key,
std::string* value) {
return Get(options, db_->DefaultColumnFamily(), key, value);
}
Status TOTransactionImpl::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
if (txn_db_impl_->IsReadOnly()) {
return Status::NotSupported("readonly db cannot accept del");
}
if (txn_state_ >= kCommitted) {
return Status::NotSupported("this txn is already committed or rollback");
}
Status s = CheckWriteConflict(column_family, key);
if (s.ok()) {
writtenKeys_.insert(key.ToString());
GetWriteBatch()->Delete(column_family, key);
write_options_.asif_commit_timestamps.emplace_back(core_->commit_ts_);
}
return s;
}
Status TOTransactionImpl::Delete(const Slice& key) {
return Delete(db_->DefaultColumnFamily(), key);
}
Iterator* TOTransactionImpl::GetIterator(ReadOptions& read_options) {
return GetIterator(read_options, db_->DefaultColumnFamily());
}
Iterator* TOTransactionImpl::GetIterator(ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
if (txn_state_ >= kCommitted) {
return nullptr;
}
read_options.read_timestamp = core_->read_ts_;
read_options.snapshot = txn_option_.txn_snapshot;
Iterator* db_iter = db_->NewIterator(read_options, column_family);
if (db_iter == nullptr) {
return nullptr;
}
return write_batch_.NewIteratorWithBase(column_family, db_iter);
}
Status TOTransactionImpl::CheckWriteConflict(ColumnFamilyHandle* column_family,
const Slice& key) {
return txn_db_impl_->CheckWriteConflict(column_family, key, GetID(), core_->read_ts_);
}
Status TOTransactionImpl::Commit() {
if (txn_state_ >= kCommitted) {
return Status::InvalidArgument("txn already committed or rollback.");
}
assert(write_options_.asif_commit_timestamps.size()
== static_cast<size_t>(GetWriteBatch()->GetWriteBatch()->Count()));
if (core_->commit_ts_set_) {
for (size_t i = 0; i < write_options_.asif_commit_timestamps.size(); ++i) {
if (write_options_.asif_commit_timestamps[i] == 0) {
write_options_.asif_commit_timestamps[i] = core_->commit_ts_;
}
}
}
Status s;
if (GetWriteBatch()->GetWriteBatch()->Count() != 0) {
assert(!txn_db_impl_->IsReadOnly());
// NOTE(wolfkdy): It's a simple modification for readonly transaction.
// PutLogData will not increase Count. So, If in the future
// PutLogData is added into TOTransactionDB, this shortcut should be redesigned.
s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch());
}
if (s.ok()) {
txn_state_.store(kCommitted);
// Change active txn set,
// Move uncommitted keys to committed keys,
// Clean data when the committed txn is activeTxnSet's header
// TODO(wolfkdy): in fact, here we must not fail
s = txn_db_impl_->CommitTransaction(core_, writtenKeys_);
} else {
s = Status::InvalidArgument("Transaction is fail for commit.");
}
ROCKS_LOG_DEBUG(txn_option_.log_, "TOTDB txn id(%llu) committed \n", txn_id_);
return s;
}
Status TOTransactionImpl::Rollback() {
if (txn_state_ >= kCommitted) {
return Status::InvalidArgument("txn is already committed or rollback.");
}
GetWriteBatch()->Clear();
txn_state_.store(kRollback);
// Change active txn set,
// Clean uncommitted keys
Status s = txn_db_impl_->RollbackTransaction(core_, writtenKeys_);
ROCKS_LOG_DEBUG(txn_option_.log_, "TOTDB txn id(%llu) rollback \n", txn_id_);
return s;
}
Status TOTransactionImpl::SetName(const TransactionName& name) {
name_ = name;
return Status::OK();
}
}
#endif

View File

@ -0,0 +1,119 @@
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/db.h"
#include "db/db_impl.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/totransaction.h"
#include "rocksdb/utilities/totransaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
namespace rocksdb {
class TOTransactionDBImpl;
class TOTransactionImpl : public TOTransaction {
public:
struct ActiveTxnNode {
TransactionID txn_id_;
TransactionID commit_txn_id_;
bool commit_ts_set_;
RocksTimeStamp commit_ts_;
RocksTimeStamp first_commit_ts_;
bool read_ts_set_;
RocksTimeStamp read_ts_;
std::atomic<TOTransaction::TOTransactionState> state_;
const Snapshot* txn_snapshot;
public:
ActiveTxnNode()
: txn_id_(0),
commit_txn_id_(0),
commit_ts_set_(false),
commit_ts_(0),
first_commit_ts_(0),
read_ts_set_(false),
read_ts_(std::numeric_limits<uint64_t>::max()),
state_(TOTransaction::kStarted) {
}
};
TOTransactionImpl(TOTransactionDB* db,
const WriteOptions& options,
const TOTxnOptions& txn_options,
const std::shared_ptr<ActiveTxnNode>& core);
virtual ~TOTransactionImpl();
void Initialize();
virtual Status SetCommitTimeStamp(const RocksTimeStamp& timestamp) override;
virtual Status SetReadTimeStamp(const RocksTimeStamp& timestamp, const uint32_t& round) override;
virtual Status GetReadTimeStamp(RocksTimeStamp* timestamp) const override;
virtual Status Commit() override;
virtual Status Rollback() override;
virtual Status Get(ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override;
virtual Status Get(ReadOptions& options, const Slice& key,
std::string* value) override;
virtual Iterator* GetIterator(ReadOptions& read_options) override;
virtual Iterator* GetIterator(ReadOptions& read_options,
ColumnFamilyHandle* column_family) override;
virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
virtual Status Put(const Slice& key, const Slice& value) override;
virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
virtual Status Delete(const Slice& key) override;
virtual Status SetName(const TransactionName& name) override;
virtual TransactionID GetID() const override { return txn_id_; };
virtual WriteBatchWithIndex* GetWriteBatch() override;
// Check write conflict. If there is no write conflict, add the key to uncommitted keys
Status CheckWriteConflict(ColumnFamilyHandle* column_family, const Slice& key);
// Generate a new unique transaction identifier
static TransactionID GenTxnID();
private:
// Used to create unique ids for transactions.
static std::atomic<TransactionID> txn_id_counter_;
// Unique ID for this transaction
TransactionID txn_id_;
// Updated keys in this transaction
std::set<std::string> writtenKeys_;
DB* db_;
DBImpl* db_impl_;
TOTransactionDBImpl* txn_db_impl_;
WriteOptions write_options_;
TOTxnOptions txn_option_;
const Comparator* cmp_;
WriteBatchWithIndex write_batch_;
std::shared_ptr<ActiveTxnNode> core_;
};
} // namespace rocksdb
#endif

File diff suppressed because it is too large Load Diff