WritePrepared: Fix deadlock in WriteRecoverableState (#5306)
Summary: The recent improvement in https://github.com/facebook/rocksdb/pull/3661 could cause a deadlock: When writing recoverable state, we also commit its sequence number to commit table, which could result into evicting existing commit entry, which could result into advancing max_evicted_seq_, which would need to get snapshots from database, which requires obtaining db mutex. The patch releases db_mutex before calling the callback in WriteRecoverableState to avoid the potential deadlock. It also improves the stress tests to let the issue be manifested in the tests. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5306 Differential Revision: D15341458 Pulled By: maysamyabandeh fbshipit-source-id: 05dcbed7e21b789fd1e5fd5ee8eea08077162323 (cherry picked from commit f0e821619742a8e97521d035c7e527c21743530a)
This commit is contained in:
parent
990b2f4cb3
commit
2c3eaeb4db
@ -477,20 +477,6 @@ void CompactionIterator::NextFromInput() {
|
||||
// in this snapshot.
|
||||
assert(last_sequence >= current_user_key_sequence_);
|
||||
|
||||
// Note2: if last_snapshot < current_user_key_snapshot, it can only
|
||||
// mean last_snapshot is released between we process last value and
|
||||
// this value, and findEarliestVisibleSnapshot returns the next snapshot
|
||||
// as current_user_key_snapshot. In this case last value and current
|
||||
// value are both in current_user_key_snapshot currently.
|
||||
// Although last_snapshot is released we might still get a definitive
|
||||
// response when key sequence number changes, e.g., when seq is determined
|
||||
// too old and visible in all snapshots.
|
||||
assert(last_snapshot == current_user_key_snapshot_ ||
|
||||
(snapshot_checker_ != nullptr &&
|
||||
snapshot_checker_->CheckInSnapshot(current_user_key_sequence_,
|
||||
last_snapshot) !=
|
||||
SnapshotCheckerResult::kNotInSnapshot));
|
||||
|
||||
++iter_stats_.num_record_drop_hidden; // (A)
|
||||
input_->Next();
|
||||
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
|
||||
|
@ -178,9 +178,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
||||
WriteThread::WriteGroup write_group;
|
||||
bool in_parallel_group = false;
|
||||
uint64_t last_sequence = kMaxSequenceNumber;
|
||||
if (!two_write_queues_) {
|
||||
last_sequence = versions_->LastSequence();
|
||||
}
|
||||
|
||||
mutex_.Lock();
|
||||
|
||||
@ -195,6 +192,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
||||
PERF_TIMER_STOP(write_pre_and_post_process_time);
|
||||
|
||||
status = PreprocessWrite(write_options, &need_log_sync, &write_context);
|
||||
if (!two_write_queues_) {
|
||||
// Assign it after ::PreprocessWrite since the sequence might advance
|
||||
// inside it by WriteRecoverableState
|
||||
last_sequence = versions_->LastSequence();
|
||||
}
|
||||
|
||||
PERF_TIMER_START(write_pre_and_post_process_time);
|
||||
}
|
||||
@ -994,8 +996,12 @@ Status DBImpl::WriteRecoverableState() {
|
||||
for (uint64_t sub_batch_seq = seq + 1;
|
||||
sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
|
||||
uint64_t const no_log_num = 0;
|
||||
// Unlock it since the callback might end up locking mutex. e.g.,
|
||||
// AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
|
||||
mutex_.Unlock();
|
||||
status = recoverable_state_pre_release_callback_->Callback(
|
||||
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num);
|
||||
mutex_.Lock();
|
||||
}
|
||||
}
|
||||
if (status.ok()) {
|
||||
|
@ -205,6 +205,12 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
|
||||
ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
|
||||
"Prepare of %" PRIu64 " %s (%s)", txn->GetId(),
|
||||
s.ToString().c_str(), txn->GetName().c_str());
|
||||
if (rand_->OneIn(20)) {
|
||||
// This currently only tests the mechanics of writing commit time
|
||||
// write batch so the exact values would not matter.
|
||||
s = txn_->GetCommitTimeWriteBatch()->Put("cat", "dog");
|
||||
assert(s.ok());
|
||||
}
|
||||
db->GetDBOptions().env->SleepForMicroseconds(
|
||||
static_cast<int>(cmt_delay_ms_ * 1000));
|
||||
}
|
||||
|
@ -5003,6 +5003,9 @@ Status TransactionStressTestInserter(
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
TransactionOptions txn_options;
|
||||
if (rand->OneIn(2)) {
|
||||
txn_options.use_only_the_last_commit_time_batch_for_recovery = true;
|
||||
}
|
||||
// Inside the inserter we might also retake the snapshot. We do both since two
|
||||
// separte functions are engaged for each.
|
||||
txn_options.set_snapshot = rand->OneIn(2);
|
||||
|
@ -466,6 +466,7 @@ class MySQLStyleTransactionTest
|
||||
// structures.
|
||||
txn_db_options.wp_snapshot_cache_bits = 1;
|
||||
txn_db_options.wp_commit_cache_bits = 10;
|
||||
options.write_buffer_size = 1024;
|
||||
EXPECT_OK(ReOpen());
|
||||
}
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user