WritePrepared: Fix bug in searching in non-cached snapshots (#4639)
Summary: When evicting an entry form the commit_cache, it is verified against the list of old snapshots to see if it overlaps with any. The list of old snapshots is split into two lists: an efficient concurrent cache and an slow vector protected by a lock. The patch fixes a bug that would stop the search in the cache if it finds any and yet would not include the larger snapshots in the slower list. An extra info log entry is also removed. The condition to trigger that although very rare is still feasible and should not spam the LOG when that happens. Fixes #4621 Pull Request resolved: https://github.com/facebook/rocksdb/pull/4639 Differential Revision: D12934989 Pulled By: maysamyabandeh fbshipit-source-id: 4e0fe8147ba292b554ae78e94c21c2ef31e03e2d
This commit is contained in:
parent
c190eb57ba
commit
8756a118be
@ -3,6 +3,9 @@
|
||||
### Public API Change
|
||||
* Application using PessimisticTransactionDB is expected to rollback/commit recovered transactions before starting new ones. This assumption is used to skip concurrency control during recovery.
|
||||
|
||||
### Bug Fixes
|
||||
* Fix a bug in WritePrepared txns where if the number of old snapshots goes beyond the snapshot cache size (128 default) the rest will not be checked when evicting a commit entry from the commit cache.
|
||||
|
||||
### New Features
|
||||
* TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery.
|
||||
|
||||
|
@ -810,6 +810,7 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
|
||||
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
|
||||
600l, 700l, 800l, 900l};
|
||||
const size_t snapshot_cache_bits = 2;
|
||||
const uint64_t cache_size = 1ul << snapshot_cache_bits;
|
||||
// Safety check to express the intended size in the test. Can be adjusted if
|
||||
// the snapshots lists changed.
|
||||
assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
|
||||
@ -837,6 +838,57 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
|
||||
commit_entry.prep_seq <= snapshots.back();
|
||||
ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
|
||||
}
|
||||
|
||||
// Test that search will include multiple snapshot from snapshot cache
|
||||
{
|
||||
// exclude first and last item in the cache
|
||||
CommitEntry commit_entry = {snapshots.front() + 1,
|
||||
snapshots[cache_size - 1] - 1};
|
||||
wp_db->old_commit_map_empty_ = true; // reset
|
||||
wp_db->old_commit_map_.clear();
|
||||
wp_db->CheckAgainstSnapshots(commit_entry);
|
||||
ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
|
||||
}
|
||||
|
||||
// Test that search will include multiple snapshot from old snapshots
|
||||
{
|
||||
// include two in the middle
|
||||
CommitEntry commit_entry = {snapshots[cache_size] + 1,
|
||||
snapshots[cache_size + 2] + 1};
|
||||
wp_db->old_commit_map_empty_ = true; // reset
|
||||
wp_db->old_commit_map_.clear();
|
||||
wp_db->CheckAgainstSnapshots(commit_entry);
|
||||
ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
|
||||
}
|
||||
|
||||
// Test that search will include both snapshot cache and old snapshots
|
||||
// Case 1: includes all in snapshot cache
|
||||
{
|
||||
CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
|
||||
wp_db->old_commit_map_empty_ = true; // reset
|
||||
wp_db->old_commit_map_.clear();
|
||||
wp_db->CheckAgainstSnapshots(commit_entry);
|
||||
ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
|
||||
}
|
||||
|
||||
// Case 2: includes all snapshot caches except the smallest
|
||||
{
|
||||
CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
|
||||
wp_db->old_commit_map_empty_ = true; // reset
|
||||
wp_db->old_commit_map_.clear();
|
||||
wp_db->CheckAgainstSnapshots(commit_entry);
|
||||
ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
|
||||
}
|
||||
|
||||
// Case 3: includes only the largest of snapshot cache
|
||||
{
|
||||
CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
|
||||
snapshots.back() + 1};
|
||||
wp_db->old_commit_map_empty_ = true; // reset
|
||||
wp_db->old_commit_map_.clear();
|
||||
wp_db->CheckAgainstSnapshots(commit_entry);
|
||||
ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
|
||||
}
|
||||
}
|
||||
|
||||
// This test is too slow for travis
|
||||
|
@ -567,14 +567,16 @@ void WritePreparedTxnDB::ReleaseSnapshotInternal(
|
||||
bool need_gc = false;
|
||||
{
|
||||
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
|
||||
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
|
||||
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
|
||||
snap_seq);
|
||||
ReadLock rl(&old_commit_map_mutex_);
|
||||
auto prep_set_entry = old_commit_map_.find(snap_seq);
|
||||
need_gc = prep_set_entry != old_commit_map_.end();
|
||||
}
|
||||
if (need_gc) {
|
||||
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
|
||||
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
|
||||
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
|
||||
snap_seq);
|
||||
WriteLock wl(&old_commit_map_mutex_);
|
||||
old_commit_map_.erase(snap_seq);
|
||||
old_commit_map_empty_.store(old_commit_map_.empty(),
|
||||
@ -649,13 +651,20 @@ void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
|
||||
// place before gets overwritten the reader that reads bottom-up will
|
||||
// eventully see it.
|
||||
const bool next_is_larger = true;
|
||||
SequenceNumber snapshot_seq = kMaxSequenceNumber;
|
||||
// We will set to true if the border line snapshot suggests that.
|
||||
bool search_larger_list = false;
|
||||
size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
|
||||
for (; 0 < ip1; ip1--) {
|
||||
snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
|
||||
SequenceNumber snapshot_seq =
|
||||
snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
|
||||
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
|
||||
++sync_i);
|
||||
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
|
||||
if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot
|
||||
// snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
|
||||
// then later also continue the search to larger snapshots
|
||||
search_larger_list = snapshot_seq < evicted.commit_seq;
|
||||
}
|
||||
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
|
||||
snapshot_seq, !next_is_larger)) {
|
||||
break;
|
||||
@ -670,17 +679,20 @@ void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
|
||||
#endif
|
||||
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
|
||||
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
|
||||
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
|
||||
snapshot_seq < evicted.prep_seq)) {
|
||||
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) {
|
||||
// Then access the less efficient list of snapshots_
|
||||
WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
|
||||
ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
|
||||
ROCKS_LOG_WARN(info_log_,
|
||||
"snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64
|
||||
"> with %" ROCKSDB_PRIszt " snapshots",
|
||||
evicted.prep_seq, evicted.commit_seq, cnt);
|
||||
ReadLock rl(&snapshots_mutex_);
|
||||
// Items could have moved from the snapshots_ to snapshot_cache_ before
|
||||
// accquiring the lock. To make sure that we do not miss a valid snapshot,
|
||||
// read snapshot_cache_ again while holding the lock.
|
||||
for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
|
||||
snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire);
|
||||
SequenceNumber snapshot_seq =
|
||||
snapshot_cache_[i].load(std::memory_order_acquire);
|
||||
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
|
||||
snapshot_seq, next_is_larger)) {
|
||||
break;
|
||||
@ -708,7 +720,10 @@ bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
|
||||
// then snapshot_seq < commit_seq
|
||||
if (prep_seq <= snapshot_seq) { // overlapping range
|
||||
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
|
||||
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
|
||||
ROCKS_LOG_WARN(info_log_,
|
||||
"old_commit_map_mutex_ overhead for %" PRIu64
|
||||
" commit entry: <%" PRIu64 ",%" PRIu64 ">",
|
||||
snapshot_seq, prep_seq, commit_seq);
|
||||
WriteLock wl(&old_commit_map_mutex_);
|
||||
old_commit_map_empty_.store(false, std::memory_order_release);
|
||||
auto& vec = old_commit_map_[snapshot_seq];
|
||||
|
@ -222,7 +222,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
|
||||
// rare case and it is ok to pay the cost of mutex ReadLock for such old,
|
||||
// reading transactions.
|
||||
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
|
||||
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
|
||||
ReadLock rl(&old_commit_map_mutex_);
|
||||
auto prep_set_entry = old_commit_map_.find(snapshot_seq);
|
||||
bool found = prep_set_entry != old_commit_map_.end();
|
||||
|
Loading…
Reference in New Issue
Block a user