WritePrepared: max_evicted_seq_ update during commit cache lookup (#4955)
Summary: max_evicted_seq_ could be updated in the middle of the read in ::IsInSnapshot. The code to be correct in presence of this update would be complicated. The patch simplifies it by checking the value of max_evicted_seq_ before and after looking into commit_cache_ and retries in the unlucky case that it was changed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4955 Differential Revision: D13999556 Pulled By: maysamyabandeh fbshipit-source-id: 7a1bdfa95ea8b5d8d73ddff3263ed31d7297b39c
This commit is contained in:
parent
93f7e7a450
commit
bcdc8c8b19
@ -1178,7 +1178,7 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
|
||||
rocksdb::port::Thread t1([&]() {
|
||||
for (int i = 0; i < writes; i++) {
|
||||
WriteBatch batch;
|
||||
// For duplicate keys cause 4 commit entires, each evicting an entry that
|
||||
// For duplicate keys cause 4 commit entries, each evicting an entry that
|
||||
// is not published yet, thus causing max ecited seq go higher than last
|
||||
// published.
|
||||
for (int b = 0; b < batch_cnt; b++) {
|
||||
@ -2681,79 +2681,258 @@ TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
|
||||
// in the middle to ensure correctness in spite of non-atomic execution.
|
||||
// Note: This test is limitted to the case where snapshot is larger than the
|
||||
// max_evicted_seq_.
|
||||
TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfOldPrepared) {
|
||||
TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
|
||||
const size_t snapshot_cache_bits = 7; // same as default
|
||||
const size_t commit_cache_bits = 3; // 8 entries
|
||||
for (auto split_read : {true, false}) {
|
||||
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
|
||||
ReOpen();
|
||||
// Fill up the commit cache
|
||||
std::string init_value("value1");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
|
||||
}
|
||||
// Prepare a transaction but do not commit it
|
||||
Transaction* txn =
|
||||
db->BeginTransaction(WriteOptions(), TransactionOptions());
|
||||
ASSERT_OK(txn->SetName("xid"));
|
||||
ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
|
||||
ASSERT_OK(txn->Prepare());
|
||||
// Commit a bunch of entires to advance max evicted seq and make the
|
||||
// prepared a delayed prepared
|
||||
for (int i = 0; i < 10; i++) {
|
||||
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
|
||||
}
|
||||
// The snapshot should not see the delayed prepared entry
|
||||
auto snap = db->GetSnapshot();
|
||||
|
||||
std::vector<bool> split_options = {false};
|
||||
if (split_read) {
|
||||
// split right after reading from the commit cache
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
|
||||
"AtomicCommitOfOldPrepared:Commit:before"},
|
||||
{"AtomicCommitOfOldPrepared:Commit:after",
|
||||
"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
|
||||
} else { // split commit
|
||||
// split right before removing from delayed_preparped_
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"WritePreparedTxnDB::RemovePrepared:pause",
|
||||
"AtomicCommitOfOldPrepared:Read:before"},
|
||||
{"AtomicCommitOfOldPrepared:Read:after",
|
||||
"WritePreparedTxnDB::RemovePrepared:resume"}});
|
||||
// Also test for break before mutex
|
||||
split_options.push_back(true);
|
||||
}
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
for (auto split_before_mutex : split_options) {
|
||||
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
|
||||
ReOpen();
|
||||
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
||||
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
||||
// Fill up the commit cache
|
||||
std::string init_value("value1");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
|
||||
}
|
||||
// Prepare a transaction but do not commit it
|
||||
Transaction* txn =
|
||||
db->BeginTransaction(WriteOptions(), TransactionOptions());
|
||||
ASSERT_OK(txn->SetName("xid"));
|
||||
ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
|
||||
ASSERT_OK(txn->Prepare());
|
||||
// Commit a bunch of entries to advance max evicted seq and make the
|
||||
// prepared a delayed prepared
|
||||
for (int i = 0; i < 10; i++) {
|
||||
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
|
||||
}
|
||||
// The snapshot should not see the delayed prepared entry
|
||||
auto snap = db->GetSnapshot();
|
||||
|
||||
rocksdb::port::Thread commit_thread([&]() {
|
||||
TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Commit:before");
|
||||
ASSERT_OK(txn->Commit());
|
||||
TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Commit:after");
|
||||
delete txn;
|
||||
});
|
||||
if (split_read) {
|
||||
if (split_before_mutex) {
|
||||
// split before acquiring prepare_mutex_
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
|
||||
"AtomicCommitOfDelayedPrepared:Commit:before"},
|
||||
{"AtomicCommitOfDelayedPrepared:Commit:after",
|
||||
"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
|
||||
} else {
|
||||
// split right after reading from the commit cache
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
|
||||
"AtomicCommitOfDelayedPrepared:Commit:before"},
|
||||
{"AtomicCommitOfDelayedPrepared:Commit:after",
|
||||
"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
|
||||
}
|
||||
} else { // split commit
|
||||
// split right before removing from delayed_prepared_
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"WritePreparedTxnDB::RemovePrepared:pause",
|
||||
"AtomicCommitOfDelayedPrepared:Read:before"},
|
||||
{"AtomicCommitOfDelayedPrepared:Read:after",
|
||||
"WritePreparedTxnDB::RemovePrepared:resume"}});
|
||||
}
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
rocksdb::port::Thread read_thread([&]() {
|
||||
TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Read:before");
|
||||
ReadOptions roptions;
|
||||
roptions.snapshot = snap;
|
||||
PinnableSlice value;
|
||||
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
|
||||
ASSERT_OK(s);
|
||||
// It should not see the commit of delayed prpared
|
||||
ASSERT_TRUE(value == init_value);
|
||||
TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Read:after");
|
||||
db->ReleaseSnapshot(snap);
|
||||
});
|
||||
rocksdb::port::Thread commit_thread([&]() {
|
||||
TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
|
||||
ASSERT_OK(txn->Commit());
|
||||
if (split_before_mutex) {
|
||||
// Do bunch of inserts to evict the commit entry from the cache. This
|
||||
// would prevent the 2nd look into commit cache under prepare_mutex_
|
||||
// to see the commit entry.
|
||||
auto seq = db_impl->TEST_GetLastVisibleSequence();
|
||||
size_t tries = 0;
|
||||
while (wp_db->max_evicted_seq_ < seq && tries < 50) {
|
||||
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
|
||||
tries++;
|
||||
};
|
||||
ASSERT_LT(tries, 50);
|
||||
}
|
||||
TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
|
||||
delete txn;
|
||||
});
|
||||
|
||||
read_thread.join();
|
||||
commit_thread.join();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
rocksdb::port::Thread read_thread([&]() {
|
||||
TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
|
||||
ReadOptions roptions;
|
||||
roptions.snapshot = snap;
|
||||
PinnableSlice value;
|
||||
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
|
||||
ASSERT_OK(s);
|
||||
// It should not see the commit of delayed prepared
|
||||
ASSERT_TRUE(value == init_value);
|
||||
TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
|
||||
db->ReleaseSnapshot(snap);
|
||||
});
|
||||
|
||||
read_thread.join();
|
||||
commit_thread.join();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
} // for split_before_mutex
|
||||
} // for split_read
|
||||
}
|
||||
|
||||
// When max evicted seq advances a prepared seq, it involves two updates: i)
|
||||
// adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
|
||||
// ::IsInSnapshot also reads these two values in a non-atomic way. This test
|
||||
// ensures correctness if the update occurs after ::IsInSnapshot reads
|
||||
// delayed_prepared_empty_ and before it reads max_evicted_seq_.
|
||||
// Note: this test focuses on read snapshot larger than max_evicted_seq_.
|
||||
TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
|
||||
const size_t snapshot_cache_bits = 7; // same as default
|
||||
const size_t commit_cache_bits = 3; // 8 entries
|
||||
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
|
||||
ReOpen();
|
||||
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
||||
// Fill up the commit cache
|
||||
std::string init_value("value1");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
|
||||
}
|
||||
// Prepare a transaction but do not commit it
|
||||
Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
|
||||
ASSERT_OK(txn->SetName("xid"));
|
||||
ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
|
||||
ASSERT_OK(txn->Prepare());
|
||||
// Create a gap between prepare seq and snapshot seq
|
||||
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
|
||||
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
|
||||
// The snapshot should not see the delayed prepared entry
|
||||
auto snap = db->GetSnapshot();
|
||||
ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
|
||||
|
||||
// split right after reading delayed_prepared_empty_
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
|
||||
"AtomicUpdateOfDelayedPrepared:before"},
|
||||
{"AtomicUpdateOfDelayedPrepared:after",
|
||||
"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
rocksdb::port::Thread commit_thread([&]() {
|
||||
TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
|
||||
// Commit a bunch of entries to advance max evicted seq and make the
|
||||
// prepared a delayed prepared
|
||||
size_t tries = 0;
|
||||
while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
|
||||
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
|
||||
tries++;
|
||||
};
|
||||
ASSERT_LT(tries, 50);
|
||||
// This is the case on which the test focuses
|
||||
ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
|
||||
TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
|
||||
});
|
||||
|
||||
rocksdb::port::Thread read_thread([&]() {
|
||||
ReadOptions roptions;
|
||||
roptions.snapshot = snap;
|
||||
PinnableSlice value;
|
||||
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
|
||||
ASSERT_OK(s);
|
||||
// It should not see the uncommitted value of delayed prepared
|
||||
ASSERT_TRUE(value == init_value);
|
||||
db->ReleaseSnapshot(snap);
|
||||
});
|
||||
|
||||
read_thread.join();
|
||||
commit_thread.join();
|
||||
ASSERT_OK(txn->Commit());
|
||||
delete txn;
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
// Eviction from commit cache and update of max evicted seq are two non-atomic
|
||||
// steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
|
||||
// from commit cache are two non-atomic steps. This tests if the update occurs
|
||||
// after reading max_evicted_seq_ and before reading the commit cache.
|
||||
// Note: the test focuses on snapshot larger than max_evicted_seq_
|
||||
TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
|
||||
const size_t snapshot_cache_bits = 7; // same as default
|
||||
const size_t commit_cache_bits = 3; // 8 entries
|
||||
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
|
||||
ReOpen();
|
||||
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
||||
// Fill up the commit cache
|
||||
std::string init_value("value1");
|
||||
std::string last_value("value_final");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
|
||||
}
|
||||
// Do an uncommitted write to prevent min_uncommitted optimization
|
||||
Transaction* txn1 =
|
||||
db->BeginTransaction(WriteOptions(), TransactionOptions());
|
||||
ASSERT_OK(txn1->SetName("xid1"));
|
||||
ASSERT_OK(txn1->Put(Slice("key0"), last_value));
|
||||
ASSERT_OK(txn1->Prepare());
|
||||
// Do a write with prepare to get the prepare seq
|
||||
Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
|
||||
ASSERT_OK(txn->SetName("xid"));
|
||||
ASSERT_OK(txn->Put(Slice("key1"), last_value));
|
||||
ASSERT_OK(txn->Prepare());
|
||||
ASSERT_OK(txn->Commit());
|
||||
// Create a gap between commit entry and snapshot seq
|
||||
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
|
||||
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
|
||||
// The snapshot should see the last commit
|
||||
auto snap = db->GetSnapshot();
|
||||
ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
|
||||
|
||||
// split right after reading max_evicted_seq_
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
|
||||
"NonAtomicUpdateOfMaxEvictedSeq:before"},
|
||||
{"NonAtomicUpdateOfMaxEvictedSeq:after",
|
||||
"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
rocksdb::port::Thread commit_thread([&]() {
|
||||
TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
|
||||
// Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
|
||||
size_t tries = 0;
|
||||
while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
|
||||
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
|
||||
tries++;
|
||||
};
|
||||
ASSERT_LT(tries, 50);
|
||||
// This is the case on which the test focuses
|
||||
ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
|
||||
TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
|
||||
});
|
||||
|
||||
rocksdb::port::Thread read_thread([&]() {
|
||||
ReadOptions roptions;
|
||||
roptions.snapshot = snap;
|
||||
PinnableSlice value;
|
||||
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
|
||||
ASSERT_OK(s);
|
||||
// It should see the committed value of the evicted entry
|
||||
ASSERT_TRUE(value == last_value);
|
||||
db->ReleaseSnapshot(snap);
|
||||
});
|
||||
|
||||
read_thread.join();
|
||||
commit_thread.join();
|
||||
delete txn;
|
||||
txn1->Commit();
|
||||
delete txn1;
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
// When an old prepared entry gets committed, there is a gap between the time
|
||||
// that it is published and when it is cleaned up from old_prepared_. This test
|
||||
// stresses such cases.
|
||||
TEST_P(WritePreparedTransactionTest, CommitOfOldPrepared) {
|
||||
TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
|
||||
const size_t snapshot_cache_bits = 7; // same as default
|
||||
for (const size_t commit_cache_bits : {0, 2, 3}) {
|
||||
for (const size_t sub_batch_cnt : {1, 2, 3}) {
|
||||
|
@ -157,80 +157,111 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
|
||||
// cache, ii) if there was, we complete the search steps to be these: i)
|
||||
// commit cache, ii) delayed prepared, commit cache again. In this way if
|
||||
// the first query to commit cache missed the commit, the 2nd will catch it.
|
||||
bool was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
|
||||
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
|
||||
bool was_empty;
|
||||
SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
|
||||
CommitEntry64b dont_care;
|
||||
CommitEntry cached;
|
||||
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
|
||||
TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
|
||||
TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
|
||||
if (exist && prep_seq == cached.prep_seq) {
|
||||
// It is committed and also not evicted from commit cache
|
||||
ROCKS_LOG_DETAILS(
|
||||
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
|
||||
prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
|
||||
return cached.commit_seq <= snapshot_seq;
|
||||
}
|
||||
// else it could be committed but not inserted in the map which could happen
|
||||
// after recovery, or it could be committed and evicted by another commit,
|
||||
// or never committed.
|
||||
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
|
||||
size_t repeats = 0;
|
||||
do {
|
||||
repeats++;
|
||||
assert(repeats < 100);
|
||||
if (UNLIKELY(repeats >= 100)) {
|
||||
throw std::runtime_error(
|
||||
"The read was intrupted 100 times by update to max_evicted_seq_. "
|
||||
"This is unexpected in all setups");
|
||||
}
|
||||
max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
|
||||
TEST_SYNC_POINT(
|
||||
"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
|
||||
TEST_SYNC_POINT(
|
||||
"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
|
||||
was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
|
||||
TEST_SYNC_POINT(
|
||||
"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
|
||||
TEST_SYNC_POINT(
|
||||
"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
|
||||
CommitEntry cached;
|
||||
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
|
||||
TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
|
||||
TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
|
||||
if (exist && prep_seq == cached.prep_seq) {
|
||||
// It is committed and also not evicted from commit cache
|
||||
ROCKS_LOG_DETAILS(
|
||||
info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
|
||||
prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
|
||||
return cached.commit_seq <= snapshot_seq;
|
||||
}
|
||||
// else it could be committed but not inserted in the map which could
|
||||
// happen after recovery, or it could be committed and evicted by another
|
||||
// commit, or never committed.
|
||||
|
||||
// At this point we dont know if it was committed or it is still prepared
|
||||
auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
|
||||
// max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now
|
||||
if (max_evicted_seq < prep_seq) {
|
||||
// Not evicted from cache and also not present, so must be still prepared
|
||||
ROCKS_LOG_DETAILS(
|
||||
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
|
||||
prep_seq, snapshot_seq, 0);
|
||||
return false;
|
||||
}
|
||||
if (!was_empty) {
|
||||
// We should not normally reach here
|
||||
WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
|
||||
ReadLock rl(&prepared_mutex_);
|
||||
ROCKS_LOG_WARN(info_log_,
|
||||
"prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
|
||||
static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
|
||||
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
|
||||
// This is the order: 1) delayed_prepared_commits_ update, 2) publish 3)
|
||||
// delayed_prepared_ clean up. So check if it is the case of a late
|
||||
// clenaup.
|
||||
auto it = delayed_prepared_commits_.find(prep_seq);
|
||||
if (it == delayed_prepared_commits_.end()) {
|
||||
// Then it is not committed yet
|
||||
ROCKS_LOG_DETAILS(info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64
|
||||
" returns %" PRId32,
|
||||
prep_seq, snapshot_seq, 0);
|
||||
return false;
|
||||
// At this point we dont know if it was committed or it is still prepared
|
||||
max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
|
||||
if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) {
|
||||
continue;
|
||||
}
|
||||
// Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
|
||||
if (max_evicted_seq_ub < prep_seq) {
|
||||
// Not evicted from cache and also not present, so must be still
|
||||
// prepared
|
||||
ROCKS_LOG_DETAILS(info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64
|
||||
" returns %" PRId32,
|
||||
prep_seq, snapshot_seq, 0);
|
||||
return false;
|
||||
}
|
||||
TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
|
||||
TEST_SYNC_POINT(
|
||||
"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
|
||||
if (!was_empty) {
|
||||
// We should not normally reach here
|
||||
WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
|
||||
ReadLock rl(&prepared_mutex_);
|
||||
ROCKS_LOG_WARN(
|
||||
info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
|
||||
static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
|
||||
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
|
||||
// This is the order: 1) delayed_prepared_commits_ update, 2) publish
|
||||
// 3) delayed_prepared_ clean up. So check if it is the case of a late
|
||||
// clenaup.
|
||||
auto it = delayed_prepared_commits_.find(prep_seq);
|
||||
if (it == delayed_prepared_commits_.end()) {
|
||||
// Then it is not committed yet
|
||||
ROCKS_LOG_DETAILS(info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64
|
||||
" returns %" PRId32,
|
||||
prep_seq, snapshot_seq, 0);
|
||||
return false;
|
||||
} else {
|
||||
ROCKS_LOG_DETAILS(info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64
|
||||
" commit: %" PRIu64 " returns %" PRId32,
|
||||
prep_seq, snapshot_seq, it->second,
|
||||
snapshot_seq <= it->second);
|
||||
return it->second <= snapshot_seq;
|
||||
}
|
||||
} else {
|
||||
ROCKS_LOG_DETAILS(info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64
|
||||
" commit: %" PRIu64 " returns %" PRId32,
|
||||
prep_seq, snapshot_seq, it->second,
|
||||
snapshot_seq <= it->second);
|
||||
return it->second <= snapshot_seq;
|
||||
}
|
||||
} else {
|
||||
// 2nd query to commit cache. Refer to was_empty comment above.
|
||||
exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
|
||||
if (exist && prep_seq == cached.prep_seq) {
|
||||
ROCKS_LOG_DETAILS(
|
||||
info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
|
||||
prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
|
||||
return cached.commit_seq <= snapshot_seq;
|
||||
// 2nd query to commit cache. Refer to was_empty comment above.
|
||||
exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
|
||||
if (exist && prep_seq == cached.prep_seq) {
|
||||
ROCKS_LOG_DETAILS(
|
||||
info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
|
||||
prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
|
||||
return cached.commit_seq <= snapshot_seq;
|
||||
}
|
||||
max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub));
|
||||
// When advancing max_evicted_seq_, we move older entires from prepared to
|
||||
// delayed_prepared_. Also we move evicted entries from commit cache to
|
||||
// old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
|
||||
// max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
|
||||
// old_commit_map_, iii) committed with no conflict with any snapshot. Case
|
||||
// (i) delayed_prepared_ is checked above
|
||||
if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case
|
||||
if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case
|
||||
// only (iii) is the case: committed
|
||||
// commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
|
||||
// snapshot_seq
|
||||
@ -438,6 +469,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
|
||||
friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
|
||||
friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
|
||||
friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
|
||||
friend class
|
||||
WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
|
||||
friend class
|
||||
WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
|
||||
friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
|
||||
friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
|
||||
friend class WritePreparedTransactionTest_RollbackTest_Test;
|
||||
friend class WriteUnpreparedTxnDB;
|
||||
|
Loading…
Reference in New Issue
Block a user