a720401877
Summary: In `db_stress` profile the vast majority of CPU time is spent acquiring the `SyncPoint` mutex. I mistakenly assumed #3939 had fixed this mutex contention problem by disabling `SyncPoint` processing. But actually the lock was still being acquired just to check whether processing is enabled. We can avoid that overhead by using an atomic to track whether it's enabled. Closes https://github.com/facebook/rocksdb/pull/3991 Differential Revision: D8393825 Pulled By: ajkr fbshipit-source-id: 5bc4e3c722ee7304e7a9c2439998c456b05a6897
130 lines
3.5 KiB
C++
130 lines
3.5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "util/sync_point_impl.h"
|
|
|
|
#ifndef NDEBUG
|
|
namespace rocksdb {
|
|
|
|
void TestKillRandom(std::string kill_point, int odds,
|
|
const std::string& srcfile, int srcline) {
|
|
for (auto& p : rocksdb_kill_prefix_blacklist) {
|
|
if (kill_point.substr(0, p.length()) == p) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
assert(odds > 0);
|
|
if (odds % 7 == 0) {
|
|
// class Random uses multiplier 16807, which is 7^5. If odds are
|
|
// multiplier of 7, there might be limited values generated.
|
|
odds++;
|
|
}
|
|
auto* r = Random::GetTLSInstance();
|
|
bool crash = r->OneIn(odds);
|
|
if (crash) {
|
|
port::Crash(srcfile, srcline);
|
|
}
|
|
}
|
|
|
|
|
|
void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
successors_.clear();
|
|
predecessors_.clear();
|
|
cleared_points_.clear();
|
|
for (const auto& dependency : dependencies) {
|
|
successors_[dependency.predecessor].push_back(dependency.successor);
|
|
predecessors_[dependency.successor].push_back(dependency.predecessor);
|
|
}
|
|
cv_.notify_all();
|
|
}
|
|
|
|
void SyncPoint::Data::LoadDependencyAndMarkers(
|
|
const std::vector<SyncPointPair>& dependencies,
|
|
const std::vector<SyncPointPair>& markers) {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
successors_.clear();
|
|
predecessors_.clear();
|
|
cleared_points_.clear();
|
|
markers_.clear();
|
|
marked_thread_id_.clear();
|
|
for (const auto& dependency : dependencies) {
|
|
successors_[dependency.predecessor].push_back(dependency.successor);
|
|
predecessors_[dependency.successor].push_back(dependency.predecessor);
|
|
}
|
|
for (const auto& marker : markers) {
|
|
successors_[marker.predecessor].push_back(marker.successor);
|
|
predecessors_[marker.successor].push_back(marker.predecessor);
|
|
markers_[marker.predecessor].push_back(marker.successor);
|
|
}
|
|
cv_.notify_all();
|
|
}
|
|
|
|
bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) {
|
|
for (const auto& pred : predecessors_[point]) {
|
|
if (cleared_points_.count(pred) == 0) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void SyncPoint::Data::ClearCallBack(const std::string& point) {
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
while (num_callbacks_running_ > 0) {
|
|
cv_.wait(lock);
|
|
}
|
|
callbacks_.erase(point);
|
|
}
|
|
|
|
void SyncPoint::Data::ClearAllCallBacks() {
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
while (num_callbacks_running_ > 0) {
|
|
cv_.wait(lock);
|
|
}
|
|
callbacks_.clear();
|
|
}
|
|
|
|
void SyncPoint::Data::Process(const std::string& point, void* cb_arg) {
|
|
if (!enabled_) {
|
|
return;
|
|
}
|
|
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
auto thread_id = std::this_thread::get_id();
|
|
|
|
auto marker_iter = markers_.find(point);
|
|
if (marker_iter != markers_.end()) {
|
|
for (auto& marked_point : marker_iter->second) {
|
|
marked_thread_id_.emplace(marked_point, thread_id);
|
|
}
|
|
}
|
|
|
|
if (DisabledByMarker(point, thread_id)) {
|
|
return;
|
|
}
|
|
|
|
while (!PredecessorsAllCleared(point)) {
|
|
cv_.wait(lock);
|
|
if (DisabledByMarker(point, thread_id)) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
auto callback_pair = callbacks_.find(point);
|
|
if (callback_pair != callbacks_.end()) {
|
|
num_callbacks_running_++;
|
|
mutex_.unlock();
|
|
callback_pair->second(cb_arg);
|
|
mutex_.lock();
|
|
num_callbacks_running_--;
|
|
}
|
|
cleared_points_.insert(point);
|
|
cv_.notify_all();
|
|
}
|
|
} // rocksdb
|
|
#endif
|