rocksdb/utilities/transactions/optimistic_transaction_db_impl.h
wolfkdy 1ab1231acf parallel occ (#6240)
Summary:
This is a continuation of https://github.com/facebook/rocksdb/pull/5320/files
I open a new mr for these purposes, half a year has past since the old mr is posted so it's almost impossible to fulfill some points below on the old mr, especially 5)
1) add validation modes for optimistic txns
2) modify unittests to test both modes
3) make format
4) refine hash functor
5) push to master
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6240

Differential Revision: D19301296

fbshipit-source-id: 5b5b3cbd39558f43947f7d2dec6cd31a06386edb
2020-01-07 14:20:38 -08:00

72 lines
2.3 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <mutex>
#include <vector>
#include <algorithm>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
namespace rocksdb {
class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
public:
explicit OptimisticTransactionDBImpl(
DB* db, const OptimisticTransactionDBOptions& occ_options,
bool take_ownership = true)
: OptimisticTransactionDB(db),
db_owner_(take_ownership),
validate_policy_(occ_options.validate_policy) {
if (validate_policy_ == OccValidationPolicy::kValidateParallel) {
uint32_t bucket_size = std::max(16u, occ_options.occ_lock_buckets);
bucketed_locks_.reserve(bucket_size);
for (size_t i = 0; i < bucket_size; ++i) {
bucketed_locks_.emplace_back(
std::unique_ptr<std::mutex>(new std::mutex));
}
}
}
~OptimisticTransactionDBImpl() {
// Prevent this stackable from destroying
// base db
if (!db_owner_) {
db_ = nullptr;
}
}
Transaction* BeginTransaction(const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options,
Transaction* old_txn) override;
size_t GetLockBucketsSize() const { return bucketed_locks_.size(); }
OccValidationPolicy GetValidatePolicy() const { return validate_policy_; }
std::unique_lock<std::mutex> LockBucket(size_t idx);
private:
// NOTE: used in validation phase. Each key is hashed into some
// bucket. We then take the lock in the hash value order to avoid deadlock.
std::vector<std::unique_ptr<std::mutex>> bucketed_locks_;
bool db_owner_;
const OccValidationPolicy validate_policy_;
void ReinitializeTransaction(Transaction* txn,
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options =
OptimisticTransactionOptions());
};
} // namespace rocksdb
#endif // ROCKSDB_LITE