Resubmit the fix for a race condition in persisting options
Summary: This patch fix a race condition in persisting options which will cause a crash when: * Thread A obtain cf options and start to persist options based on that cf options. * Thread B kicks in and finish DropColumnFamily and delete cf_handle. * Thread A wakes up and tries to finish the persisting options and crashes. Test Plan: Add a test in column_family_test that can reproduce the crash Reviewers: anthony, IslamAbdelRahman, rven, kradhakrishnan, sdong Reviewed By: sdong Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D51717
This commit is contained in:
parent
afc84731f4
commit
774b80e99e
@ -68,7 +68,9 @@ class ColumnFamilyTest : public testing::Test {
|
||||
|
||||
void Close() {
|
||||
for (auto h : handles_) {
|
||||
delete h;
|
||||
if (h) {
|
||||
delete h;
|
||||
}
|
||||
}
|
||||
handles_.clear();
|
||||
names_.clear();
|
||||
@ -1260,6 +1262,84 @@ TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
// skipped as persisting options is not supported in ROCKSDB_LITE
|
||||
namespace {
|
||||
std::atomic<int> test_stage(0);
|
||||
const int kMainThreadStartPersistingOptionsFile = 1;
|
||||
const int kChildThreadFinishDroppingColumnFamily = 2;
|
||||
const int kChildThreadWaitingMainThreadPersistOptions = 3;
|
||||
void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id,
|
||||
std::vector<Comparator*> comparators) {
|
||||
while (test_stage < kMainThreadStartPersistingOptionsFile) {
|
||||
Env::Default()->SleepForMicroseconds(100);
|
||||
}
|
||||
cf_test->DropColumnFamilies({cf_id});
|
||||
delete comparators[cf_id];
|
||||
comparators[cf_id] = nullptr;
|
||||
test_stage = kChildThreadFinishDroppingColumnFamily;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST_F(ColumnFamilyTest, CreateAndDropRace) {
|
||||
const int kCfCount = 5;
|
||||
std::vector<ColumnFamilyOptions> cf_opts;
|
||||
std::vector<Comparator*> comparators;
|
||||
for (int i = 0; i < kCfCount; ++i) {
|
||||
cf_opts.emplace_back();
|
||||
comparators.push_back(new test::SimpleSuffixReverseComparator());
|
||||
cf_opts.back().comparator = comparators.back();
|
||||
}
|
||||
db_options_.create_if_missing = true;
|
||||
db_options_.create_missing_column_families = true;
|
||||
|
||||
auto main_thread_id = std::this_thread::get_id();
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack("PersistRocksDBOptions:start",
|
||||
[&](void* arg) {
|
||||
auto current_thread_id = std::this_thread::get_id();
|
||||
// If it's the main thread hitting this sync-point, then it
|
||||
// will be blocked until some other thread update the test_stage.
|
||||
if (main_thread_id == current_thread_id) {
|
||||
test_stage = kMainThreadStartPersistingOptionsFile;
|
||||
while (test_stage < kChildThreadFinishDroppingColumnFamily) {
|
||||
Env::Default()->SleepForMicroseconds(100);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"WriteThread::EnterUnbatched:Wait", [&](void* arg) {
|
||||
// This means a thread doing DropColumnFamily() is waiting for
|
||||
// other thread to finish persisting options.
|
||||
// In such case, we update the test_stage to unblock the main thread.
|
||||
test_stage = kChildThreadWaitingMainThreadPersistOptions;
|
||||
|
||||
// Note that based on the test setting, this must not be the
|
||||
// main thread.
|
||||
ASSERT_NE(main_thread_id, std::this_thread::get_id());
|
||||
});
|
||||
|
||||
// Create a database with four column families
|
||||
Open({"default", "one", "two", "three"},
|
||||
{cf_opts[0], cf_opts[1], cf_opts[2], cf_opts[3]});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Start a thread that will drop the first column family
|
||||
// and its comparator
|
||||
std::thread drop_cf_thread(DropSingleColumnFamily, this, 1, comparators);
|
||||
|
||||
DropColumnFamilies({2});
|
||||
|
||||
drop_cf_thread.join();
|
||||
|
||||
Close();
|
||||
Destroy();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
118
db/db_impl.cc
118
db/db_impl.cc
@ -1934,24 +1934,21 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
||||
|
||||
MutableCFOptions new_options;
|
||||
Status s;
|
||||
Status persist_options_status;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
s = cfd->SetOptions(options_map);
|
||||
if (s.ok()) {
|
||||
new_options = *cfd->GetLatestMutableCFOptions();
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
Status persist_options_status = WriteOptionsFile();
|
||||
if (!persist_options_status.ok()) {
|
||||
if (db_options_.fail_if_options_file_error) {
|
||||
s = Status::IOError(
|
||||
"SetOptions succeeded, but unable to persist options",
|
||||
persist_options_status.ToString());
|
||||
}
|
||||
Warn(db_options_.info_log,
|
||||
"Unable to persist options in SetOptions() -- %s",
|
||||
persist_options_status.ToString().c_str());
|
||||
if (s.ok()) {
|
||||
// Persist RocksDB options under the single write thread
|
||||
WriteThread::Writer w;
|
||||
write_thread_.EnterUnbatched(&w, &mutex_);
|
||||
|
||||
persist_options_status = WriteOptionsFile();
|
||||
|
||||
write_thread_.ExitUnbatched(&w);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1967,6 +1964,16 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
||||
db_options_.info_log, "[%s] SetOptions succeeded",
|
||||
cfd->GetName().c_str());
|
||||
new_options.Dump(db_options_.info_log.get());
|
||||
if (!persist_options_status.ok()) {
|
||||
if (db_options_.fail_if_options_file_error) {
|
||||
s = Status::IOError(
|
||||
"SetOptions succeeded, but unable to persist options",
|
||||
persist_options_status.ToString());
|
||||
}
|
||||
Warn(db_options_.info_log,
|
||||
"Unable to persist options in SetOptions() -- %s",
|
||||
persist_options_status.ToString().c_str());
|
||||
}
|
||||
} else {
|
||||
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
|
||||
"[%s] SetOptions failed", cfd->GetName().c_str());
|
||||
@ -3455,6 +3462,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
||||
const std::string& column_family_name,
|
||||
ColumnFamilyHandle** handle) {
|
||||
Status s;
|
||||
Status persist_options_status;
|
||||
*handle = nullptr;
|
||||
|
||||
s = CheckCompressionSupported(cf_options);
|
||||
@ -3487,6 +3495,12 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
||||
s = versions_->LogAndApply(
|
||||
nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit,
|
||||
&mutex_, directories_.GetDbDir(), false, &cf_options);
|
||||
|
||||
if (s.ok()) {
|
||||
// If the column family was created successfully, we then persist
|
||||
// the updated RocksDB options under the same single write thread
|
||||
persist_options_status = WriteOptionsFile();
|
||||
}
|
||||
write_thread_.ExitUnbatched(&w);
|
||||
}
|
||||
if (s.ok()) {
|
||||
@ -3514,7 +3528,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
||||
|
||||
// this is outside the mutex
|
||||
if (s.ok()) {
|
||||
Status persist_options_status = WriteOptionsFile();
|
||||
NewThreadStatusCfInfo(
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
|
||||
if (!persist_options_status.ok()) {
|
||||
if (db_options_.fail_if_options_file_error) {
|
||||
s = Status::IOError(
|
||||
@ -3526,8 +3541,6 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
||||
"Unable to persist options in CreateColumnFamily() -- %s",
|
||||
persist_options_status.ToString().c_str());
|
||||
}
|
||||
NewThreadStatusCfInfo(
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -3546,6 +3559,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
||||
edit.SetColumnFamily(cfd->GetID());
|
||||
|
||||
Status s;
|
||||
Status options_persist_status;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
if (cfd->IsDropped()) {
|
||||
@ -3557,6 +3571,11 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
||||
write_thread_.EnterUnbatched(&w, &mutex_);
|
||||
s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
|
||||
&edit, &mutex_);
|
||||
if (s.ok()) {
|
||||
// If the column family was dropped successfully, we then persist
|
||||
// the updated RocksDB options under the same single write thread
|
||||
options_persist_status = WriteOptionsFile();
|
||||
}
|
||||
write_thread_.ExitUnbatched(&w);
|
||||
}
|
||||
|
||||
@ -3583,7 +3602,9 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
||||
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
|
||||
max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
|
||||
mutable_cf_options->max_write_buffer_number;
|
||||
auto options_persist_status = WriteOptionsFile();
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"Dropped column family with id %u\n", cfd->GetID());
|
||||
|
||||
if (!options_persist_status.ok()) {
|
||||
if (db_options_.fail_if_options_file_error) {
|
||||
s = Status::IOError(
|
||||
@ -3595,9 +3616,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
||||
"Unable to persist options in DropColumnFamily() -- %s",
|
||||
options_persist_status.ToString().c_str());
|
||||
}
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"Dropped column family with id %u\n",
|
||||
cfd->GetID());
|
||||
} else {
|
||||
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
|
||||
"Dropping column family with id %u FAILED -- %s\n",
|
||||
@ -5040,7 +5058,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
}
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::Open:Opened");
|
||||
Status persist_options_status;
|
||||
if (s.ok()) {
|
||||
// Persist RocksDB Options before scheduling the compaction.
|
||||
// The WriteOptionsFile() will release and lock the mutex internally.
|
||||
persist_options_status = impl->WriteOptionsFile();
|
||||
|
||||
*dbptr = impl;
|
||||
impl->opened_successfully_ = true;
|
||||
impl->MaybeScheduleFlushOrCompaction();
|
||||
@ -5051,8 +5074,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p",
|
||||
impl);
|
||||
LogFlush(impl->db_options_.info_log);
|
||||
|
||||
auto persist_options_status = impl->WriteOptionsFile();
|
||||
if (!persist_options_status.ok()) {
|
||||
if (db_options.fail_if_options_file_error) {
|
||||
s = Status::IOError(
|
||||
@ -5181,38 +5202,34 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
|
||||
|
||||
Status DBImpl::WriteOptionsFile() {
|
||||
#ifndef ROCKSDB_LITE
|
||||
std::string file_name;
|
||||
Status s = WriteOptionsToTempFile(&file_name);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
s = RenameTempFileToOptionsFile(file_name);
|
||||
return s;
|
||||
#else
|
||||
return Status::OK();
|
||||
#endif // !ROCKSDB_LITE
|
||||
}
|
||||
mutex_.AssertHeld();
|
||||
|
||||
Status DBImpl::WriteOptionsToTempFile(std::string* file_name) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
std::vector<std::string> cf_names;
|
||||
std::vector<ColumnFamilyOptions> cf_opts;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
// This part requires mutex to protect the column family options
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
cf_names.push_back(cfd->GetName());
|
||||
cf_opts.push_back(BuildColumnFamilyOptions(
|
||||
*cfd->options(), *cfd->GetLatestMutableCFOptions()));
|
||||
}
|
||||
}
|
||||
*file_name = TempOptionsFileName(GetName(), versions_->NewFileNumber());
|
||||
|
||||
Status s = PersistRocksDBOptions(GetDBOptions(), cf_names, cf_opts,
|
||||
*file_name, GetEnv());
|
||||
// This part requires mutex to protect the column family options
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
cf_names.push_back(cfd->GetName());
|
||||
cf_opts.push_back(BuildColumnFamilyOptions(
|
||||
*cfd->options(), *cfd->GetLatestMutableCFOptions()));
|
||||
}
|
||||
|
||||
// Unlock during expensive operations. New writes cannot get here
|
||||
// because the single write thread ensures all new writes get queued.
|
||||
mutex_.Unlock();
|
||||
|
||||
std::string file_name =
|
||||
TempOptionsFileName(GetName(), versions_->NewFileNumber());
|
||||
Status s = PersistRocksDBOptions(GetDBOptions(), cf_names, cf_opts, file_name,
|
||||
GetEnv());
|
||||
|
||||
if (s.ok()) {
|
||||
s = RenameTempFileToOptionsFile(file_name);
|
||||
}
|
||||
mutex_.Lock();
|
||||
return s;
|
||||
#else
|
||||
return Status::OK();
|
||||
@ -5240,8 +5257,6 @@ void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
|
||||
|
||||
Status DBImpl::DeleteObsoleteOptionsFiles() {
|
||||
#ifndef ROCKSDB_LITE
|
||||
options_files_mutex_.AssertHeld();
|
||||
|
||||
std::vector<std::string> filenames;
|
||||
// use ordered map to store keep the filenames sorted from the newest
|
||||
// to the oldest.
|
||||
@ -5273,7 +5288,6 @@ Status DBImpl::DeleteObsoleteOptionsFiles() {
|
||||
|
||||
Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
InstrumentedMutexLock l(&options_files_mutex_);
|
||||
Status s;
|
||||
std::string options_file_name =
|
||||
OptionsFileName(GetName(), versions_->NewFileNumber());
|
||||
|
10
db/db_impl.h
10
db/db_impl.h
@ -411,10 +411,14 @@ class DBImpl : public DB {
|
||||
SuperVersion* super_version,
|
||||
Arena* arena);
|
||||
|
||||
// The following options file related functions should not be
|
||||
// called while DB mutex is held.
|
||||
// Except in DB::Open(), WriteOptionsFile can only be called when:
|
||||
// 1. WriteThread::Writer::EnterUnbatched() is used.
|
||||
// 2. db_mutex is held
|
||||
Status WriteOptionsFile();
|
||||
Status WriteOptionsToTempFile(std::string* file_name);
|
||||
|
||||
// The following two functions can only be called when:
|
||||
// 1. WriteThread::Writer::EnterUnbatched() is used.
|
||||
// 2. db_mutex is NOT held
|
||||
Status RenameTempFileToOptionsFile(const std::string& file_name);
|
||||
Status DeleteObsoleteOptionsFiles();
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include "db/write_thread.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -188,6 +189,7 @@ void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
|
||||
LinkOne(w, &wait_needed);
|
||||
if (wait_needed) {
|
||||
mu->Unlock();
|
||||
TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
|
||||
Await(w);
|
||||
mu->Lock();
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "rocksdb/db.h"
|
||||
#include "util/options_helper.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
#include "port/port.h"
|
||||
|
||||
@ -34,6 +35,7 @@ Status PersistRocksDBOptions(const DBOptions& db_opt,
|
||||
const std::vector<std::string>& cf_names,
|
||||
const std::vector<ColumnFamilyOptions>& cf_opts,
|
||||
const std::string& file_name, Env* env) {
|
||||
TEST_SYNC_POINT("PersistRocksDBOptions:start");
|
||||
if (cf_names.size() != cf_opts.size()) {
|
||||
return Status::InvalidArgument(
|
||||
"cf_names.size() and cf_opts.size() must be the same");
|
||||
|
Loading…
Reference in New Issue
Block a user