diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 04ed2d334..6a54bfe51 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -97,6 +97,21 @@ class RangeLockManagerHandle : public LockManagerHandle { using RangeLockStatus = std::unordered_multimap; + // Lock Escalation barrier check function. + // It is called for a couple of endpoints A and B, such that A < B. + // If escalation_barrier_check_func(A, B)==true, then there's a lock + // escalation barrier between A and B, and lock escalation is not allowed + // to bridge the gap between A and B. + // + // The function may be called from any thread that acquires or releases + // locks. It should not throw exceptions. There is currently no way to return + // an error. + using EscalationBarrierFunc = + std::function; + + // Set the user-provided barrier check function + virtual void SetEscalationBarrierFunc(EscalationBarrierFunc func) = 0; + virtual RangeLockStatus GetRangeLockStatusData() = 0; class Counters { diff --git a/utilities/transactions/lock/range/range_locking_test.cc b/utilities/transactions/lock/range/range_locking_test.cc index 2e8170837..dd95dc436 100644 --- a/utilities/transactions/lock/range/range_locking_test.cc +++ b/utilities/transactions/lock/range/range_locking_test.cc @@ -276,9 +276,10 @@ TEST_F(RangeLockingTest, BasicLockEscalation) { // Get the locks until we hit an escalation for (int i = 0; i < 2020; i++) { - char buf[32]; - snprintf(buf, sizeof(buf) - 1, "%08d", i); - ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf), Endpoint(buf))); + std::ostringstream buf; + buf << std::setw(8) << std::setfill('0') << i; + std::string buf_str = buf.str(); + ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str))); } counters = range_lock_mgr->GetStatus(); ASSERT_GT(counters.escalation_count, 0); @@ -286,6 +287,60 @@ TEST_F(RangeLockingTest, BasicLockEscalation) { delete txn; } + +// An escalation barrier function. Allow escalation iff the first two bytes are +// identical. +static bool escalation_barrier(const Endpoint& a, const Endpoint& b) { + assert(a.slice.size() > 2); + assert(b.slice.size() > 2); + if (memcmp(a.slice.data(), b.slice.data(), 2)) { + return true; // This is a barrier + } else { + return false; // No barrier + } +} + +TEST_F(RangeLockingTest, LockEscalationBarrier) { + auto cf = db->DefaultColumnFamily(); + + auto counters = range_lock_mgr->GetStatus(); + + // Initially not using any lock memory + ASSERT_EQ(counters.escalation_count, 0); + + range_lock_mgr->SetMaxLockMemory(8000); + range_lock_mgr->SetEscalationBarrierFunc(escalation_barrier); + + // Insert enough locks to cause lock escalations to happen + auto txn = NewTxn(); + const int N = 2000; + for (int i = 0; i < N; i++) { + std::ostringstream buf; + buf << std::setw(4) << std::setfill('0') << i; + std::string buf_str = buf.str(); + ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str))); + } + counters = range_lock_mgr->GetStatus(); + ASSERT_GT(counters.escalation_count, 0); + + // Check that lock escalation was not performed across escalation barriers: + // Use another txn to acquire locks near the barriers. + auto txn2 = NewTxn(); + range_lock_mgr->SetMaxLockMemory(500000); + for (int i = 100; i < N; i += 100) { + std::ostringstream buf; + buf << std::setw(4) << std::setfill('0') << i - 1 << "-a"; + std::string buf_str = buf.str(); + // Check that we CAN get a lock near the escalation barrier + ASSERT_OK(txn2->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str))); + } + + txn->Rollback(); + txn2->Rollback(); + delete txn; + delete txn2; +} + #endif TEST_F(RangeLockingTest, LockWaitCount) { diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc index c238b0204..0d99130ae 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc @@ -96,9 +96,19 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, m_sto_end_early_count = 0; m_sto_end_early_time = 0; + m_escalation_barrier = [](const DBT *, const DBT *, void *) -> bool { + return false; + }; + m_lock_request_info.init(mutex_factory); } +void locktree::set_escalation_barrier_func( + lt_escalation_barrier_check_func func, void *extra) { + m_escalation_barrier = func; + m_escalation_barrier_arg = extra; +} + void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) { pending_lock_requests.create(); pending_is_empty = true; @@ -863,14 +873,19 @@ void locktree::escalate(lt_escalate_cb after_escalate_callback, // - belongs to a different txnid, or // - belongs to several txnids, or // - is a shared lock (we could potentially merge those but - // currently we don't) + // currently we don't), or + // - is across a lock escalation barrier. int next_txnid_index = current_index + 1; while (next_txnid_index < num_extracted && (extracted_buf[current_index].txnid == extracted_buf[next_txnid_index].txnid) && !extracted_buf[next_txnid_index].is_shared && - !extracted_buf[next_txnid_index].owners) { + !extracted_buf[next_txnid_index].owners && + !m_escalation_barrier( + extracted_buf[current_index].range.get_right_key(), + extracted_buf[next_txnid_index].range.get_left_key(), + m_escalation_barrier_arg)) { next_txnid_index++; } diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h index 3e438f502..f0f4b042d 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h @@ -85,6 +85,9 @@ typedef void (*lt_destroy_cb)(locktree *lt); typedef void (*lt_escalate_cb)(TXNID txnid, const locktree *lt, const range_buffer &buffer, void *extra); +typedef bool (*lt_escalation_barrier_check_func)(const DBT *a, const DBT *b, + void *extra); + struct lt_counters { uint64_t wait_count, wait_time; uint64_t long_wait_count, long_wait_time; @@ -343,6 +346,20 @@ class locktree { void set_comparator(const comparator &cmp); + // Set the user-provided Lock Escalation Barrier check function and its + // argument + // + // Lock Escalation Barrier limits the scope of Lock Escalation. + // For two keys A and B (such that A < B), + // escalation_barrier_check_func(A, B)==true means that there's a lock + // escalation barrier between A and B, and lock escalation is not allowed to + // bridge the gap between A and B. + // + // This method sets the user-provided barrier check function and its + // parameter. + void set_escalation_barrier_func(lt_escalation_barrier_check_func func, + void *extra); + int compare(const locktree *lt) const; DICTIONARY_ID get_dict_id() const; @@ -373,6 +390,9 @@ class locktree { // userdata pointer below. see locktree_manager::get_lt w/ on_create_extra comparator m_cmp; + lt_escalation_barrier_check_func m_escalation_barrier; + void *m_escalation_barrier_arg; + concurrent_tree *m_rangetree; void *m_userdata; diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc index 054e24d3a..ae99be534 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc @@ -48,14 +48,15 @@ void serialize_endpoint(const Endpoint& endp, std::string* buf) { } // Decode the endpoint from the format it is stored in the locktree (DBT) to -// one used outside (EndpointWithString) -void deserialize_endpoint(const DBT* dbt, EndpointWithString* endp) { +// the one used outside: either Endpoint or EndpointWithString +template +void deserialize_endpoint(const DBT* dbt, EndpointStruct* endp) { assert(dbt->size >= 1); const char* dbt_data = (const char*)dbt->data; char suffix = dbt_data[0]; assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM); endp->inf_suffix = (suffix == SUFFIX_SUPREMUM); - endp->slice.assign(dbt_data + 1, dbt->size - 1); + endp->slice = decltype(EndpointStruct::slice)(dbt_data + 1, dbt->size - 1); } // Get a range lock on [start_key; end_key] range @@ -263,6 +264,21 @@ RangeTreeLockManager::RangeTreeLockManager( ltm_.create(on_create, on_destroy, on_escalate, nullptr, mutex_factory_); } +int RangeTreeLockManager::on_create(toku::locktree* lt, void* arg) { + // arg is a pointer to RangeTreeLockManager + lt->set_escalation_barrier_func(&OnEscalationBarrierCheck, arg); + return 0; +} + +bool RangeTreeLockManager::OnEscalationBarrierCheck(const DBT* a, const DBT* b, + void* extra) { + Endpoint a_endp, b_endp; + deserialize_endpoint(a, &a_endp); + deserialize_endpoint(b, &b_endp); + auto self = static_cast(extra); + return self->barrier_func_(a_endp, b_endp); +} + void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize( uint32_t target_size) { dlock_buffer_.Resize(target_size); @@ -357,8 +373,9 @@ void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) { DICTIONARY_ID dict_id = {.dictid = column_family_id}; toku::comparator cmp; cmp.create(CompareDbtEndpoints, (void*)cfh->GetComparator()); - toku::locktree* ltree = ltm_.get_lt(dict_id, cmp, - /* on_create_extra*/ nullptr); + toku::locktree* ltree = + ltm_.get_lt(dict_id, cmp, + /* on_create_extra*/ static_cast(this)); // This is ok to because get_lt has copied the comparator: cmp.destroy(); diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h index e7c150281..e4236d600 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h @@ -91,9 +91,16 @@ class RangeTreeLockManager : public RangeLockManagerBase, // Get the locktree which stores locks for the Column Family with given cf_id std::shared_ptr GetLockTreeForCF(ColumnFamilyId cf_id); + void SetEscalationBarrierFunc(EscalationBarrierFunc func) override { + barrier_func_ = func; + } + private: toku::locktree_manager ltm_; + EscalationBarrierFunc barrier_func_ = + [](const Endpoint&, const Endpoint&) -> bool { return false; }; + std::shared_ptr mutex_factory_; // Map from cf_id to locktree*. Can only be accessed while holding the @@ -114,10 +121,12 @@ class RangeTreeLockManager : public RangeLockManagerBase, static int CompareDbtEndpoints(void* arg, const DBT* a_key, const DBT* b_key); // Callbacks - static int on_create(toku::locktree*, void*) { return 0; /* no error */ } + static int on_create(toku::locktree*, void*); static void on_destroy(toku::locktree*) {} static void on_escalate(TXNID txnid, const toku::locktree* lt, const toku::range_buffer& buffer, void* extra); + + static bool OnEscalationBarrierCheck(const DBT* a, const DBT* b, void* extra); }; void serialize_endpoint(const Endpoint& endp, std::string* buf);