diff --git a/utilities/transactions/lock/range/range_locking_test.cc b/utilities/transactions/lock/range/range_locking_test.cc index dd95dc436..5ab6c60fc 100644 --- a/utilities/transactions/lock/range/range_locking_test.cc +++ b/utilities/transactions/lock/range/range_locking_test.cc @@ -369,6 +369,46 @@ TEST_F(RangeLockingTest, LockWaitCount) { delete txn1; } +TEST_F(RangeLockingTest, LockWaiteeAccess) { + TransactionOptions txn_options; + auto cf = db->DefaultColumnFamily(); + txn_options.lock_timeout = 60; + Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options); + Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options); + + // Get a range lock + ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c"))); + + std::atomic reached(false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "RangeTreeLockManager::TryRangeLock:EnterWaitingTxn", [&](void* /*arg*/) { + reached.store(true); + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread t([&]() { + // Attempt to get a conflicting lock + auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z")); + ASSERT_TRUE(s.ok()); + txn1->Rollback(); + }); + + while (!reached.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Release locks and free the transaction + txn0->Rollback(); + delete txn0; + + t.join(); + + delete txn1; +} + void PointLockManagerTestExternalSetup(PointLockManagerTest* self) { self->env_ = Env::Default(); self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc index ec7bd04dc..3d217be70 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc @@ -368,8 +368,6 @@ void lock_request::retry_all_lock_requests( toku_mutex_lock(&info->retry_mutex); - lock_wait_infos conflicts_collector; - // here is the group retry algorithm. // get the latest retry_want count and use it as the generation number of // this retry operation. if this retry generation is > the last retry @@ -381,7 +379,7 @@ void lock_request::retry_all_lock_requests( info->running_retry = true; info->retry_done = info->retry_want; toku_mutex_unlock(&info->retry_mutex); - retry_all_lock_requests_info(info, &conflicts_collector); + retry_all_lock_requests_info(info, lock_wait_callback, callback_arg); if (after_retry_all_test_callback) after_retry_all_test_callback(); toku_mutex_lock(&info->retry_mutex); info->running_retry = false; @@ -393,14 +391,14 @@ void lock_request::retry_all_lock_requests( } } toku_mutex_unlock(&info->retry_mutex); - - report_waits(&conflicts_collector, lock_wait_callback, callback_arg); } -void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, - lock_wait_infos *collector) { +void lock_request::retry_all_lock_requests_info( + lt_lock_request_info *info, + void (*lock_wait_callback)(void *, lock_wait_infos *), void *callback_arg) { toku_external_mutex_lock(&info->mutex); // retry all of the pending lock requests. + lock_wait_infos conflicts_collector; for (uint32_t i = 0; i < info->pending_lock_requests.size();) { lock_request *request; int r = info->pending_lock_requests.fetch(i, &request); @@ -410,12 +408,16 @@ void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, // move on to the next lock request. otherwise // the request is gone from the list so we may // read the i'th entry for the next one. - r = request->retry(collector); + r = request->retry(&conflicts_collector); if (r != 0) { i++; } } + // call report_waits while holding the pending queue lock since + // the waiter object is still valid while it's in the queue + report_waits(&conflicts_collector, lock_wait_callback, callback_arg); + // future threads should only retry lock requests if some still exist info->should_retry_lock_requests = info->pending_lock_requests.size() > 0; toku_external_mutex_unlock(&info->mutex); diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h index 3544f102f..d30e1e2ca 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h @@ -140,8 +140,10 @@ class lock_request { void (*lock_wait_callback)(void *, lock_wait_infos *) = nullptr, void *callback_arg = nullptr, void (*after_retry_test_callback)(void) = nullptr); - static void retry_all_lock_requests_info(lt_lock_request_info *info, - lock_wait_infos *collector); + static void retry_all_lock_requests_info( + lt_lock_request_info *info, + void (*lock_wait_callback)(void *, lock_wait_infos *), + void *callback_arg); void set_start_test_callback(void (*f)(void)); void set_start_before_pending_test_callback(void (*f)(void)); diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc index ae99be534..531165dea 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc @@ -111,8 +111,7 @@ Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn, deserialize_endpoint(start_dbt, &start); deserialize_endpoint(end_dbt, &end); - di_path.push_back({((PessimisticTransaction*)txnid)->GetID(), - column_family_id, is_exclusive, std::move(start), + di_path.push_back({txnid, column_family_id, is_exclusive, std::move(start), std::move(end)}); }; @@ -150,13 +149,16 @@ Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn, // Wait callback that locktree library will call to inform us about // the lock waits that are in progress. void wait_callback_for_locktree(void*, toku::lock_wait_infos* infos) { + TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:EnterWaitingTxn"); for (auto wait_info : *infos) { + // As long as we hold the lock on the locktree's pending request queue + // this should be safe. auto txn = (PessimisticTransaction*)wait_info.waiter; auto cf_id = (ColumnFamilyId)wait_info.ltree->get_dict_id().dictid; autovector waitee_ids; for (auto waitee : wait_info.waitees) { - waitee_ids.push_back(((PessimisticTransaction*)waitee)->GetID()); + waitee_ids.push_back(waitee); } txn->SetWaitingTxn(waitee_ids, cf_id, (std::string*)wait_info.m_extra); } @@ -475,12 +477,10 @@ static void push_into_lock_status_data(void* param, const DBT* left, deserialize_endpoint(right, &info.end); if (txnid_arg != TXNID_SHARED) { - TXNID txnid = ((PessimisticTransaction*)txnid_arg)->GetID(); - info.ids.push_back(txnid); + info.ids.push_back(txnid_arg); } else { for (auto it : *owners) { - TXNID real_id = ((PessimisticTransaction*)it)->GetID(); - info.ids.push_back(real_id); + info.ids.push_back(it); } } ctx->data->insert({ctx->cfh_id, info}); diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 9e45b71e9..c5099abad 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -61,7 +61,14 @@ PessimisticTransaction::PessimisticTransaction( } void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { - txn_id_ = GenTxnID(); + // Range lock manager uses address of transaction object as TXNID + const TransactionDBOptions& db_options = txn_db_impl_->GetTxnDBOptions(); + if (db_options.lock_mgr_handle && + db_options.lock_mgr_handle->getLockManager()->IsRangeLockSupported()) { + txn_id_ = reinterpret_cast(this); + } else { + txn_id_ = GenTxnID(); + } txn_state_ = STARTED;