Use bloom filter to speed up sync point (#8337)

Summary:
Now SyncPoint is used in crash test but can signiciantly slow down the run. Add a bloom filter before each process to speed itup

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8337

Test Plan: Run all existing tests

Reviewed By: ajkr

Differential Revision: D28730282

fbshipit-source-id: a187377a9d47877a36c5649e4b1f67d5e3033238
This commit is contained in:
sdong 2021-05-27 13:13:18 -07:00 committed by Facebook GitHub Bot
parent b53e3d2adb
commit cda7923169
2 changed files with 41 additions and 3 deletions

View File

@ -45,6 +45,8 @@ void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependenc
for (const auto& dependency : dependencies) { for (const auto& dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor); successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor); predecessors_[dependency.successor].push_back(dependency.predecessor);
point_filter_.Add(dependency.successor);
point_filter_.Add(dependency.predecessor);
} }
cv_.notify_all(); cv_.notify_all();
} }
@ -61,11 +63,15 @@ void SyncPoint::Data::LoadDependencyAndMarkers(
for (const auto& dependency : dependencies) { for (const auto& dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor); successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor); predecessors_[dependency.successor].push_back(dependency.predecessor);
point_filter_.Add(dependency.successor);
point_filter_.Add(dependency.predecessor);
} }
for (const auto& marker : markers) { for (const auto& marker : markers) {
successors_[marker.predecessor].push_back(marker.successor); successors_[marker.predecessor].push_back(marker.successor);
predecessors_[marker.successor].push_back(marker.predecessor); predecessors_[marker.successor].push_back(marker.predecessor);
markers_[marker.predecessor].push_back(marker.successor); markers_[marker.predecessor].push_back(marker.successor);
point_filter_.Add(marker.predecessor);
point_filter_.Add(marker.successor);
} }
cv_.notify_all(); cv_.notify_all();
} }
@ -99,6 +105,10 @@ void SyncPoint::Data::Process(const std::string& point, void* cb_arg) {
if (!enabled_) { if (!enabled_) {
return; return;
} }
// Use a filter to prevent mutex lock if possible.
if (!point_filter_.MayContain(point)) {
return;
}
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
auto thread_id = std::this_thread::get_id(); auto thread_id = std::this_thread::get_id();
@ -107,6 +117,7 @@ void SyncPoint::Data::Process(const std::string& point, void* cb_arg) {
if (marker_iter != markers_.end()) { if (marker_iter != markers_.end()) {
for (auto& marked_point : marker_iter->second) { for (auto& marked_point : marker_iter->second) {
marked_thread_id_.emplace(marked_point, thread_id); marked_thread_id_.emplace(marked_point, thread_id);
point_filter_.Add(marked_point);
} }
} }

View File

@ -3,9 +3,8 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "test_util/sync_point.h"
#include <assert.h> #include <assert.h>
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <functional> #include <functional>
@ -15,15 +14,39 @@
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include "memory/concurrent_arena.h"
#include "port/port.h" #include "port/port.h"
#include "test_util/sync_point.h"
#include "util/dynamic_bloom.h"
#include "util/random.h" #include "util/random.h"
#pragma once #pragma once
#ifndef NDEBUG #ifndef NDEBUG
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
// A hacky allocator for single use.
// Arena depends on SyncPoint and create circular dependency.
class SingleAllocator : public Allocator {
public:
char* Allocate(size_t) override {
assert(false);
return nullptr;
}
char* AllocateAligned(size_t bytes, size_t, Logger*) override {
buf_.resize(bytes);
return const_cast<char*>(buf_.data());
}
size_t BlockSize() const override {
assert(false);
return 0;
}
private:
std::string buf_;
};
struct SyncPoint::Data { struct SyncPoint::Data {
Data() : enabled_(false) {} Data() : point_filter_(&alloc_, /*total_bits=*/8192), enabled_(false) {}
// Enable proper deletion by subclasses // Enable proper deletion by subclasses
virtual ~Data() {} virtual ~Data() {}
// successor/predecessor map loaded from LoadDependency // successor/predecessor map loaded from LoadDependency
@ -37,6 +60,9 @@ struct SyncPoint::Data {
std::condition_variable cv_; std::condition_variable cv_;
// sync points that have been passed through // sync points that have been passed through
std::unordered_set<std::string> cleared_points_; std::unordered_set<std::string> cleared_points_;
SingleAllocator alloc_;
// A filter before holding mutex to speed up process.
DynamicBloom point_filter_;
std::atomic<bool> enabled_; std::atomic<bool> enabled_;
int num_callbacks_running_ = 0; int num_callbacks_running_ = 0;
@ -48,6 +74,7 @@ struct SyncPoint::Data {
const std::function<void(void*)>& callback) { const std::function<void(void*)>& callback) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
callbacks_[point] = callback; callbacks_[point] = callback;
point_filter_.Add(point);
} }
void ClearCallBack(const std::string& point); void ClearCallBack(const std::string& point);