WritePrepared: add private options to TransactionDBOptions (#4966)

Summary:
WritePreparedTransactionDB operates with more options which should not be configurable to avoid complicating it for the users. For testing purposes however we need to change the default value of this parameters. This patch makes these parameters private fields in TransactionDBOptions so that the existing ::Open API could use them seamlessly without however exposing them to the users.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4966

Differential Revision: D14015986

Pulled By: maysamyabandeh

fbshipit-source-id: 13037efa7dfdd6f73ec7a19414b66571e044c633
This commit is contained in:
Maysam Yabandeh 2019-02-11 14:18:51 -08:00 committed by Facebook Github Bot
parent 2d049ab7e8
commit 9144d1f186
3 changed files with 53 additions and 70 deletions

View File

@ -93,6 +93,15 @@ struct TransactionDBOptions {
// logic in myrocks. This hack of simply not rolling back merge operands works // logic in myrocks. This hack of simply not rolling back merge operands works
// for the special way that myrocks uses this operands. // for the special way that myrocks uses this operands.
bool rollback_merge_operands = false; bool rollback_merge_operands = false;
private:
// 128 entries
size_t wp_snapshot_cache_bits = static_cast<size_t>(7);
// 8m entry, 64MB size
size_t wp_commit_cache_bits = static_cast<size_t>(23);
friend class WritePreparedTxnDB;
friend class WritePreparedTransactionTestBase;
}; };
struct TransactionOptions { struct TransactionOptions {

View File

@ -324,13 +324,6 @@ class WritePreparedTxnDBMock : public WritePreparedTxnDB {
public: public:
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt) WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
: WritePreparedTxnDB(db_impl, opt) {} : WritePreparedTxnDB(db_impl, opt) {}
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
size_t snapshot_cache_size)
: WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {}
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
size_t snapshot_cache_size, size_t commit_cache_size)
: WritePreparedTxnDB(db_impl, opt, snapshot_cache_size,
commit_cache_size) {}
void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) { void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
snapshots_ = snapshots; snapshots_ = snapshots;
} }
@ -353,39 +346,14 @@ class WritePreparedTransactionTestBase : public TransactionTestBase {
: TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}; : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){};
protected: protected:
// TODO(myabandeh): Avoid duplicating PessimisticTransaction::Open logic here. void UpdateTransactionDBOptions(size_t snapshot_cache_bits,
void DestroyAndReopenWithExtraOptions(size_t snapshot_cache_bits, size_t commit_cache_bits) {
size_t commit_cache_bits) { txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
delete db; txn_db_options.wp_commit_cache_bits = commit_cache_bits;
db = nullptr; }
DestroyDB(dbname, options); void UpdateTransactionDBOptions(size_t snapshot_cache_bits) {
txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
options.create_if_missing = true;
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
std::vector<ColumnFamilyHandle*> handles;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<size_t> compaction_enabled_cf_indices;
TransactionDB::PrepareWrap(&options, &column_families,
&compaction_enabled_cf_indices);
DB* base_db = nullptr;
ASSERT_OK(DBImpl::Open(options, dbname, column_families, &handles, &base_db,
true /*use_seq_per_batch*/,
false /*use_batch_for_txn*/));
// The following is equivalent of WrapDB().
txn_db_options.write_policy = WRITE_PREPARED;
auto* wp_db = new WritePreparedTxnDB(
base_db, txn_db_options, snapshot_cache_bits, commit_cache_bits);
wp_db->UpdateCFComparatorMap(handles);
ASSERT_OK(wp_db->Initialize(compaction_enabled_cf_indices, handles));
ASSERT_EQ(1, handles.size());
delete handles[0];
db = wp_db;
} }
// If expect_update is set, check if it actually updated old_commit_map_. If // If expect_update is set, check if it actually updated old_commit_map_. If
// it did not and yet suggested not to check the next snapshot, do the // it did not and yet suggested not to check the next snapshot, do the
// opposite to check if it was not a bad suggestion. // opposite to check if it was not a bad suggestion.
@ -843,8 +811,9 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
const size_t snapshot_cache_bits = 0; const size_t snapshot_cache_bits = 0;
const size_t commit_cache_bits = 0; const size_t commit_cache_bits = 0;
DBImpl* mock_db = new DBImpl(options, dbname); DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock( UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options));
SequenceNumber seq = 0; SequenceNumber seq = 0;
// Take the first snapshot that overlaps with two txn // Take the first snapshot that overlaps with two txn
@ -928,8 +897,9 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
// the snapshots lists changed. // the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size()); assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
DBImpl* mock_db = new DBImpl(options, dbname); DBImpl* mock_db = new DBImpl(options, dbname);
UpdateTransactionDBOptions(snapshot_cache_bits);
std::unique_ptr<WritePreparedTxnDBMock> wp_db( std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits)); new WritePreparedTxnDBMock(mock_db, txn_db_options));
SequenceNumber version = 1000l; SequenceNumber version = 1000l;
ASSERT_EQ(0, wp_db->snapshots_total_); ASSERT_EQ(0, wp_db->snapshots_total_);
wp_db->UpdateSnapshots(snapshots, version); wp_db->UpdateSnapshots(snapshots, version);
@ -1023,8 +993,9 @@ TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccessTest) {
// Choose the cache size so that the new snapshot list could replace all the // Choose the cache size so that the new snapshot list could replace all the
// existing items in the cache and also have some overflow. // existing items in the cache and also have some overflow.
DBImpl* mock_db = new DBImpl(options, dbname); DBImpl* mock_db = new DBImpl(options, dbname);
UpdateTransactionDBOptions(snapshot_cache_bits);
std::unique_ptr<WritePreparedTxnDBMock> wp_db( std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits)); new WritePreparedTxnDBMock(mock_db, txn_db_options));
const size_t extra = 2; const size_t extra = 2;
size_t loop_id = 0; size_t loop_id = 0;
// Add up to extra items that do not fit into the cache // Add up to extra items that do not fit into the cache
@ -1197,7 +1168,8 @@ TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) { TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WriteOptions woptions; WriteOptions woptions;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
@ -1244,7 +1216,8 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) { TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WriteOptions woptions; WriteOptions woptions;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Insert something to increase seq // Insert something to increase seq
@ -1807,8 +1780,9 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
// The set of commit seq numbers to be excluded from IsInSnapshot queries // The set of commit seq numbers to be excluded from IsInSnapshot queries
std::set<uint64_t> commit_seqs; std::set<uint64_t> commit_seqs;
DBImpl* mock_db = new DBImpl(options, dbname); DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock( UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options));
// We continue until max advances a bit beyond the snapshot. // We continue until max advances a bit beyond the snapshot.
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
// do prepare for a transaction // do prepare for a transaction
@ -2232,7 +2206,8 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // disable commit cache const size_t commit_cache_bits = 0; // disable commit cache
for (bool has_recent_prepare : {true, false}) { for (bool has_recent_prepare : {true, false}) {
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* transaction = auto* transaction =
@ -2280,7 +2255,8 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) { TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache const size_t commit_cache_bits = 0; // minimum commit cache
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1")); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
auto* transaction = auto* transaction =
@ -2328,7 +2304,8 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) { TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache const size_t commit_cache_bits = 0; // minimum commit cache
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
ASSERT_OK(db->Put(WriteOptions(), "key1", "value2")); ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
@ -2377,7 +2354,8 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) { TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // commit cache size = 2 const size_t commit_cache_bits = 1; // commit cache size = 2
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
// Add a dummy key to evict v2 commit cache, but keep v1 commit cache. // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
// It also advance max_evicted_seq and can trigger old_commit_map cleanup. // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
@ -2427,7 +2405,8 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) { TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache const size_t commit_cache_bits = 0; // minimum commit cache
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* transaction = auto* transaction =
@ -2706,7 +2685,8 @@ TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfOldPrepared) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 3; // 8 entries const size_t commit_cache_bits = 3; // 8 entries
for (auto split_read : {true, false}) { for (auto split_read : {true, false}) {
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
// Fill up the commit cache // Fill up the commit cache
std::string init_value("value1"); std::string init_value("value1");
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
@ -2777,7 +2757,8 @@ TEST_P(WritePreparedTransactionTest, CommitOfOldPrepared) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
for (const size_t commit_cache_bits : {0, 2, 3}) { for (const size_t commit_cache_bits : {0, 2, 3}) {
for (const size_t sub_batch_cnt : {1, 2, 3}) { for (const size_t sub_batch_cnt : {1, 2, 3}) {
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
std::atomic<const Snapshot*> snap = {nullptr}; std::atomic<const Snapshot*> snap = {nullptr};
std::atomic<SequenceNumber> exp_prepare = {0}; std::atomic<SequenceNumber> exp_prepare = {0};
// Value is synchronized via snap // Value is synchronized via snap

View File

@ -43,27 +43,23 @@ namespace rocksdb {
// mechanisms to tell such data apart from committed data. // mechanisms to tell such data apart from committed data.
class WritePreparedTxnDB : public PessimisticTransactionDB { class WritePreparedTxnDB : public PessimisticTransactionDB {
public: public:
explicit WritePreparedTxnDB( explicit WritePreparedTxnDB(DB* db,
DB* db, const TransactionDBOptions& txn_db_options, const TransactionDBOptions& txn_db_options)
size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
: PessimisticTransactionDB(db, txn_db_options), : PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_BITS(snapshot_cache_bits), SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
COMMIT_CACHE_BITS(commit_cache_bits), COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) { FORMAT(COMMIT_CACHE_BITS) {
Init(txn_db_options); Init(txn_db_options);
} }
explicit WritePreparedTxnDB( explicit WritePreparedTxnDB(StackableDB* db,
StackableDB* db, const TransactionDBOptions& txn_db_options, const TransactionDBOptions& txn_db_options)
size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
: PessimisticTransactionDB(db, txn_db_options), : PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_BITS(snapshot_cache_bits), SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
COMMIT_CACHE_BITS(commit_cache_bits), COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) { FORMAT(COMMIT_CACHE_BITS) {
Init(txn_db_options); Init(txn_db_options);
@ -619,8 +615,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// The list sorted in ascending order. Thread-safety for writes is provided // The list sorted in ascending order. Thread-safety for writes is provided
// with snapshots_mutex_ and concurrent reads are safe due to std::atomic for // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
// each entry. In x86_64 architecture such reads are compiled to simple read // each entry. In x86_64 architecture such reads are compiled to simple read
// instructions. 128 entries // instructions.
static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast<size_t>(7);
const size_t SNAPSHOT_CACHE_BITS; const size_t SNAPSHOT_CACHE_BITS;
const size_t SNAPSHOT_CACHE_SIZE; const size_t SNAPSHOT_CACHE_SIZE;
std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_; std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
@ -638,8 +633,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// A heap of prepared transactions. Thread-safety is provided with // A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_. // prepared_mutex_.
PreparedHeap prepared_txns_; PreparedHeap prepared_txns_;
// 8m entry, 64MB size
static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(23);
const size_t COMMIT_CACHE_BITS; const size_t COMMIT_CACHE_BITS;
const size_t COMMIT_CACHE_SIZE; const size_t COMMIT_CACHE_SIZE;
const CommitEntry64bFormat FORMAT; const CommitEntry64bFormat FORMAT;