rocksdb/db/flush_scheduler.cc
Jermy Li c2029f9716 Support concurrent CF iteration and drop (#6147)
Summary:
It's easy to cause coredump when closing ColumnFamilyHandle with unreleased iterators, especially iterators release is controlled by java GC when using JNI.

This patch fixed concurrent CF iteration and drop, we let iterators(actually SuperVersion) hold a ColumnFamilyData reference to prevent the CF from being released too early.

fixed https://github.com/facebook/rocksdb/issues/5982
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6147

Differential Revision: D18926378

fbshipit-source-id: 1dff6d068c603d012b81446812368bfee95a5e15
2019-12-12 19:04:48 -08:00

87 lines
2.4 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/flush_scheduler.h"
#include <cassert>
#include "db/column_family.h"
namespace rocksdb {
void FlushScheduler::ScheduleWork(ColumnFamilyData* cfd) {
#ifndef NDEBUG
{
std::lock_guard<std::mutex> lock(checking_mutex_);
assert(checking_set_.count(cfd) == 0);
checking_set_.insert(cfd);
}
#endif // NDEBUG
cfd->Ref();
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
while (!head_.compare_exchange_strong(
node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
// failing CAS updates the first param, so we are already set for
// retry. TakeNextColumnFamily won't happen until after another
// inter-thread synchronization, so we don't even need release
// semantics for this CAS
}
#endif // __clang_analyzer__
}
ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
while (true) {
if (head_.load(std::memory_order_relaxed) == nullptr) {
return nullptr;
}
// dequeue the head
Node* node = head_.load(std::memory_order_relaxed);
head_.store(node->next, std::memory_order_relaxed);
ColumnFamilyData* cfd = node->column_family;
delete node;
#ifndef NDEBUG
{
std::lock_guard<std::mutex> lock(checking_mutex_);
auto iter = checking_set_.find(cfd);
assert(iter != checking_set_.end());
checking_set_.erase(iter);
}
#endif // NDEBUG
if (!cfd->IsDropped()) {
// success
return cfd;
}
// no longer relevant, retry
cfd->UnrefAndTryDelete();
}
}
bool FlushScheduler::Empty() {
auto rv = head_.load(std::memory_order_relaxed) == nullptr;
#ifndef NDEBUG
std::lock_guard<std::mutex> lock(checking_mutex_);
// Empty is allowed to be called concurrnetly with ScheduleFlush. It would
// only miss the recent schedules.
assert((rv == checking_set_.empty()) || rv);
#endif // NDEBUG
return rv;
}
void FlushScheduler::Clear() {
ColumnFamilyData* cfd;
while ((cfd = TakeNextColumnFamily()) != nullptr) {
cfd->UnrefAndTryDelete();
}
assert(head_.load(std::memory_order_relaxed) == nullptr);
}
} // namespace rocksdb