diff --git a/test_util/sync_point_impl.cc b/test_util/sync_point_impl.cc index 28405a0ef..1d87a05fe 100644 --- a/test_util/sync_point_impl.cc +++ b/test_util/sync_point_impl.cc @@ -45,6 +45,8 @@ void SyncPoint::Data::LoadDependency(const std::vector& dependenc for (const auto& dependency : dependencies) { successors_[dependency.predecessor].push_back(dependency.successor); predecessors_[dependency.successor].push_back(dependency.predecessor); + point_filter_.Add(dependency.successor); + point_filter_.Add(dependency.predecessor); } cv_.notify_all(); } @@ -61,11 +63,15 @@ void SyncPoint::Data::LoadDependencyAndMarkers( for (const auto& dependency : dependencies) { successors_[dependency.predecessor].push_back(dependency.successor); predecessors_[dependency.successor].push_back(dependency.predecessor); + point_filter_.Add(dependency.successor); + point_filter_.Add(dependency.predecessor); } for (const auto& marker : markers) { successors_[marker.predecessor].push_back(marker.successor); predecessors_[marker.successor].push_back(marker.predecessor); markers_[marker.predecessor].push_back(marker.successor); + point_filter_.Add(marker.predecessor); + point_filter_.Add(marker.successor); } cv_.notify_all(); } @@ -99,6 +105,10 @@ void SyncPoint::Data::Process(const std::string& point, void* cb_arg) { if (!enabled_) { return; } + // Use a filter to prevent mutex lock if possible. + if (!point_filter_.MayContain(point)) { + return; + } std::unique_lock lock(mutex_); auto thread_id = std::this_thread::get_id(); @@ -107,6 +117,7 @@ void SyncPoint::Data::Process(const std::string& point, void* cb_arg) { if (marker_iter != markers_.end()) { for (auto& marked_point : marker_iter->second) { marked_thread_id_.emplace(marked_point, thread_id); + point_filter_.Add(marked_point); } } diff --git a/test_util/sync_point_impl.h b/test_util/sync_point_impl.h index b246c0198..ba818e381 100644 --- a/test_util/sync_point_impl.h +++ b/test_util/sync_point_impl.h @@ -3,9 +3,8 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -#include "test_util/sync_point.h" - #include + #include #include #include @@ -15,15 +14,39 @@ #include #include +#include "memory/concurrent_arena.h" #include "port/port.h" +#include "test_util/sync_point.h" +#include "util/dynamic_bloom.h" #include "util/random.h" #pragma once #ifndef NDEBUG namespace ROCKSDB_NAMESPACE { +// A hacky allocator for single use. +// Arena depends on SyncPoint and create circular dependency. +class SingleAllocator : public Allocator { + public: + char* Allocate(size_t) override { + assert(false); + return nullptr; + } + char* AllocateAligned(size_t bytes, size_t, Logger*) override { + buf_.resize(bytes); + return const_cast(buf_.data()); + } + size_t BlockSize() const override { + assert(false); + return 0; + } + + private: + std::string buf_; +}; + struct SyncPoint::Data { - Data() : enabled_(false) {} + Data() : point_filter_(&alloc_, /*total_bits=*/8192), enabled_(false) {} // Enable proper deletion by subclasses virtual ~Data() {} // successor/predecessor map loaded from LoadDependency @@ -37,6 +60,9 @@ struct SyncPoint::Data { std::condition_variable cv_; // sync points that have been passed through std::unordered_set cleared_points_; + SingleAllocator alloc_; + // A filter before holding mutex to speed up process. + DynamicBloom point_filter_; std::atomic enabled_; int num_callbacks_running_ = 0; @@ -48,6 +74,7 @@ struct SyncPoint::Data { const std::function& callback) { std::lock_guard lock(mutex_); callbacks_[point] = callback; + point_filter_.Add(point); } void ClearCallBack(const std::string& point);