Fix a bug when trigger atomic flush and close db (#5254)
Summary: With atomic flush, RocksDB background flush will flush memtables of a column family up to the largest memtable id in the immutable memtable list. This can introduce a bug in the following scenario. A user thread inserts into a column family until the memtable is full and triggers a flush. This will add the column family to flush_scheduler_. Then the user thread writes another record to the column family. In the PreprocessWrite function, the user thread picks the column family from flush_scheduler_ and schedules a flush request. The flush request gaurantees to flush all the memtables up to the current largest memtable ID of the immutable memtable list. Then the user thread writes new data to the newly-created active memtable. After the write returns, the user thread closes the db. This can cause assertion failure when the background flush thread tries to install superversion for the column family. The solution is to not install flush results if the db has already set `shutting_down_` to true. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5254 Differential Revision: D15124149 Pulled By: riversand963 fbshipit-source-id: 0a667a41339dedb5a18bcb01b0bf11c275c04df0
This commit is contained in:
parent
3548e4220d
commit
35e6ba734e
@ -488,6 +488,32 @@ TEST_P(DBAtomicFlushTest,
|
|||||||
Destroy(options);
|
Destroy(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
|
||||||
|
bool atomic_flush = GetParam();
|
||||||
|
if (!atomic_flush) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const int kNumKeysTriggerFlush = 4;
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.atomic_flush = atomic_flush;
|
||||||
|
options.memtable_factory.reset(
|
||||||
|
new SpecialSkipListFactory(kNumKeysTriggerFlush));
|
||||||
|
CreateAndReopenWithCF({"pikachu"}, options);
|
||||||
|
|
||||||
|
for (int i = 0; i != kNumKeysTriggerFlush; ++i) {
|
||||||
|
ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
|
||||||
|
}
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
ASSERT_OK(Put(0, "key", "value"));
|
||||||
|
Close();
|
||||||
|
|
||||||
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||||
|
ASSERT_EQ("value", Get(0, "key"));
|
||||||
|
}
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
|
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
|
||||||
testing::Bool());
|
testing::Bool());
|
||||||
|
|
||||||
|
@ -397,12 +397,21 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
s = error_status.ok() ? s : error_status;
|
s = error_status.ok() ? s : error_status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If db is NOT shutting down, and one or more column families have been
|
||||||
|
// dropped.
|
||||||
|
// TODO: use separate status code for db shutdown and column family dropped.
|
||||||
|
if (s.IsShutdownInProgress() &&
|
||||||
|
!shutting_down_.load(std::memory_order_acquire)) {
|
||||||
|
s = Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
if (s.ok() || s.IsShutdownInProgress()) {
|
if (s.ok() || s.IsShutdownInProgress()) {
|
||||||
// Sync on all distinct output directories.
|
// Sync on all distinct output directories.
|
||||||
for (auto dir : distinct_output_dirs) {
|
for (auto dir : distinct_output_dirs) {
|
||||||
if (dir != nullptr) {
|
if (dir != nullptr) {
|
||||||
s = dir->Fsync();
|
Status error_status = dir->Fsync();
|
||||||
if (!s.ok()) {
|
if (!error_status.ok()) {
|
||||||
|
s = error_status;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -469,7 +478,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
|
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok() || s.IsShutdownInProgress()) {
|
if (s.ok()) {
|
||||||
assert(num_cfs ==
|
assert(num_cfs ==
|
||||||
static_cast<int>(job_context->superversion_contexts.size()));
|
static_cast<int>(job_context->superversion_contexts.size()));
|
||||||
for (int i = 0; i != num_cfs; ++i) {
|
for (int i = 0; i != num_cfs; ++i) {
|
||||||
|
Loading…
Reference in New Issue
Block a user