Fix locktree accesses to PessimisticTransactions (#9898)
Summary: The current locktree implementation stores the address of the PessimisticTransactions object as the TXNID. However, when a transaction is blocked on a lock, it records the list of waitees with conflicting locks using the rocksdb assigned TransactionID. This is performed by calling GetID() on PessimisticTransactions objects of the waitees, and then recorded in the waiter's list. However, there is no guarantee the objects are valid when recording the waitee list during the conflict callbacks because the waitee could have released the lock and freed the PessimisticTransactions object. The waitee/txnid values are only valid PessimisticTransaction objects while the mutex for the root of the locktree is held. The simplest fix for this problem is to use the address of the PessimisticTransaction as the TransactionID so that it is consistent with its usage in the locktree. The TXNID is only converted back to a PessimisticTransaction for the report_wait callbacks. Since these callbacks are now all made within the critical section where the lock_request queue mutx is held, these conversions will be safe. Otherwise, only the uint64_t TXNID of the waitee is registerd with the waiter transaction. The PessimisitcTransaction object of the waitee is never referenced. The main downside of this approach is the TransactionID will not change if the PessimisticTransaction object is reused for new transactions. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9898 Test Plan: Add a new test case and run unit tests. Also verified with MyRocks workloads using range locks that the crash no longer happens. Reviewed By: riversand963 Differential Revision: D35950376 Pulled By: hermanlee fbshipit-source-id: 8c9cae272e23e487fc139b6a8ed5b8f8f24b1570
This commit is contained in:
parent
68ee228dec
commit
d9d456de49
@ -369,6 +369,46 @@ TEST_F(RangeLockingTest, LockWaitCount) {
|
||||
delete txn1;
|
||||
}
|
||||
|
||||
TEST_F(RangeLockingTest, LockWaiteeAccess) {
|
||||
TransactionOptions txn_options;
|
||||
auto cf = db->DefaultColumnFamily();
|
||||
txn_options.lock_timeout = 60;
|
||||
Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options);
|
||||
Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options);
|
||||
|
||||
// Get a range lock
|
||||
ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
|
||||
|
||||
std::atomic<bool> reached(false);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"RangeTreeLockManager::TryRangeLock:EnterWaitingTxn", [&](void* /*arg*/) {
|
||||
reached.store(true);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
|
||||
});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
port::Thread t([&]() {
|
||||
// Attempt to get a conflicting lock
|
||||
auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
|
||||
ASSERT_TRUE(s.ok());
|
||||
txn1->Rollback();
|
||||
});
|
||||
|
||||
while (!reached.load()) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
|
||||
// Release locks and free the transaction
|
||||
txn0->Rollback();
|
||||
delete txn0;
|
||||
|
||||
t.join();
|
||||
|
||||
delete txn1;
|
||||
}
|
||||
|
||||
void PointLockManagerTestExternalSetup(PointLockManagerTest* self) {
|
||||
self->env_ = Env::Default();
|
||||
self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
|
||||
|
@ -368,8 +368,6 @@ void lock_request::retry_all_lock_requests(
|
||||
|
||||
toku_mutex_lock(&info->retry_mutex);
|
||||
|
||||
lock_wait_infos conflicts_collector;
|
||||
|
||||
// here is the group retry algorithm.
|
||||
// get the latest retry_want count and use it as the generation number of
|
||||
// this retry operation. if this retry generation is > the last retry
|
||||
@ -381,7 +379,7 @@ void lock_request::retry_all_lock_requests(
|
||||
info->running_retry = true;
|
||||
info->retry_done = info->retry_want;
|
||||
toku_mutex_unlock(&info->retry_mutex);
|
||||
retry_all_lock_requests_info(info, &conflicts_collector);
|
||||
retry_all_lock_requests_info(info, lock_wait_callback, callback_arg);
|
||||
if (after_retry_all_test_callback) after_retry_all_test_callback();
|
||||
toku_mutex_lock(&info->retry_mutex);
|
||||
info->running_retry = false;
|
||||
@ -393,14 +391,14 @@ void lock_request::retry_all_lock_requests(
|
||||
}
|
||||
}
|
||||
toku_mutex_unlock(&info->retry_mutex);
|
||||
|
||||
report_waits(&conflicts_collector, lock_wait_callback, callback_arg);
|
||||
}
|
||||
|
||||
void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info,
|
||||
lock_wait_infos *collector) {
|
||||
void lock_request::retry_all_lock_requests_info(
|
||||
lt_lock_request_info *info,
|
||||
void (*lock_wait_callback)(void *, lock_wait_infos *), void *callback_arg) {
|
||||
toku_external_mutex_lock(&info->mutex);
|
||||
// retry all of the pending lock requests.
|
||||
lock_wait_infos conflicts_collector;
|
||||
for (uint32_t i = 0; i < info->pending_lock_requests.size();) {
|
||||
lock_request *request;
|
||||
int r = info->pending_lock_requests.fetch(i, &request);
|
||||
@ -410,12 +408,16 @@ void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info,
|
||||
// move on to the next lock request. otherwise
|
||||
// the request is gone from the list so we may
|
||||
// read the i'th entry for the next one.
|
||||
r = request->retry(collector);
|
||||
r = request->retry(&conflicts_collector);
|
||||
if (r != 0) {
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
// call report_waits while holding the pending queue lock since
|
||||
// the waiter object is still valid while it's in the queue
|
||||
report_waits(&conflicts_collector, lock_wait_callback, callback_arg);
|
||||
|
||||
// future threads should only retry lock requests if some still exist
|
||||
info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
|
||||
toku_external_mutex_unlock(&info->mutex);
|
||||
|
@ -140,8 +140,10 @@ class lock_request {
|
||||
void (*lock_wait_callback)(void *, lock_wait_infos *) = nullptr,
|
||||
void *callback_arg = nullptr,
|
||||
void (*after_retry_test_callback)(void) = nullptr);
|
||||
static void retry_all_lock_requests_info(lt_lock_request_info *info,
|
||||
lock_wait_infos *collector);
|
||||
static void retry_all_lock_requests_info(
|
||||
lt_lock_request_info *info,
|
||||
void (*lock_wait_callback)(void *, lock_wait_infos *),
|
||||
void *callback_arg);
|
||||
|
||||
void set_start_test_callback(void (*f)(void));
|
||||
void set_start_before_pending_test_callback(void (*f)(void));
|
||||
|
@ -111,8 +111,7 @@ Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn,
|
||||
deserialize_endpoint(start_dbt, &start);
|
||||
deserialize_endpoint(end_dbt, &end);
|
||||
|
||||
di_path.push_back({((PessimisticTransaction*)txnid)->GetID(),
|
||||
column_family_id, is_exclusive, std::move(start),
|
||||
di_path.push_back({txnid, column_family_id, is_exclusive, std::move(start),
|
||||
std::move(end)});
|
||||
};
|
||||
|
||||
@ -150,13 +149,16 @@ Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn,
|
||||
// Wait callback that locktree library will call to inform us about
|
||||
// the lock waits that are in progress.
|
||||
void wait_callback_for_locktree(void*, toku::lock_wait_infos* infos) {
|
||||
TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:EnterWaitingTxn");
|
||||
for (auto wait_info : *infos) {
|
||||
// As long as we hold the lock on the locktree's pending request queue
|
||||
// this should be safe.
|
||||
auto txn = (PessimisticTransaction*)wait_info.waiter;
|
||||
auto cf_id = (ColumnFamilyId)wait_info.ltree->get_dict_id().dictid;
|
||||
|
||||
autovector<TransactionID> waitee_ids;
|
||||
for (auto waitee : wait_info.waitees) {
|
||||
waitee_ids.push_back(((PessimisticTransaction*)waitee)->GetID());
|
||||
waitee_ids.push_back(waitee);
|
||||
}
|
||||
txn->SetWaitingTxn(waitee_ids, cf_id, (std::string*)wait_info.m_extra);
|
||||
}
|
||||
@ -475,12 +477,10 @@ static void push_into_lock_status_data(void* param, const DBT* left,
|
||||
deserialize_endpoint(right, &info.end);
|
||||
|
||||
if (txnid_arg != TXNID_SHARED) {
|
||||
TXNID txnid = ((PessimisticTransaction*)txnid_arg)->GetID();
|
||||
info.ids.push_back(txnid);
|
||||
info.ids.push_back(txnid_arg);
|
||||
} else {
|
||||
for (auto it : *owners) {
|
||||
TXNID real_id = ((PessimisticTransaction*)it)->GetID();
|
||||
info.ids.push_back(real_id);
|
||||
info.ids.push_back(it);
|
||||
}
|
||||
}
|
||||
ctx->data->insert({ctx->cfh_id, info});
|
||||
|
@ -61,7 +61,14 @@ PessimisticTransaction::PessimisticTransaction(
|
||||
}
|
||||
|
||||
void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
|
||||
txn_id_ = GenTxnID();
|
||||
// Range lock manager uses address of transaction object as TXNID
|
||||
const TransactionDBOptions& db_options = txn_db_impl_->GetTxnDBOptions();
|
||||
if (db_options.lock_mgr_handle &&
|
||||
db_options.lock_mgr_handle->getLockManager()->IsRangeLockSupported()) {
|
||||
txn_id_ = reinterpret_cast<TransactionID>(this);
|
||||
} else {
|
||||
txn_id_ = GenTxnID();
|
||||
}
|
||||
|
||||
txn_state_ = STARTED;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user