From 35e6ba734e6d6443e13e24d3e7497b1ce05664e6 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 29 Apr 2019 12:29:57 -0700 Subject: [PATCH] Fix a bug when trigger atomic flush and close db (#5254) Summary: With atomic flush, RocksDB background flush will flush memtables of a column family up to the largest memtable id in the immutable memtable list. This can introduce a bug in the following scenario. A user thread inserts into a column family until the memtable is full and triggers a flush. This will add the column family to flush_scheduler_. Then the user thread writes another record to the column family. In the PreprocessWrite function, the user thread picks the column family from flush_scheduler_ and schedules a flush request. The flush request gaurantees to flush all the memtables up to the current largest memtable ID of the immutable memtable list. Then the user thread writes new data to the newly-created active memtable. After the write returns, the user thread closes the db. This can cause assertion failure when the background flush thread tries to install superversion for the column family. The solution is to not install flush results if the db has already set `shutting_down_` to true. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5254 Differential Revision: D15124149 Pulled By: riversand963 fbshipit-source-id: 0a667a41339dedb5a18bcb01b0bf11c275c04df0 --- db/db_flush_test.cc | 26 ++++++++++++++++++++++++++ db/db_impl_compaction_flush.cc | 15 ++++++++++++--- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 8a4d8fc63..09c461f8d 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -488,6 +488,32 @@ TEST_P(DBAtomicFlushTest, Destroy(options); } +TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + const int kNumKeysTriggerFlush = 4; + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = atomic_flush; + options.memtable_factory.reset( + new SpecialSkipListFactory(kNumKeysTriggerFlush)); + CreateAndReopenWithCF({"pikachu"}, options); + + for (int i = 0; i != kNumKeysTriggerFlush; ++i) { + ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i))); + } + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put(0, "key", "value")); + Close(); + + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); + ASSERT_EQ("value", Get(0, "key")); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 66ea24716..f16c61117 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -397,12 +397,21 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( s = error_status.ok() ? s : error_status; } + // If db is NOT shutting down, and one or more column families have been + // dropped. + // TODO: use separate status code for db shutdown and column family dropped. + if (s.IsShutdownInProgress() && + !shutting_down_.load(std::memory_order_acquire)) { + s = Status::OK(); + } + if (s.ok() || s.IsShutdownInProgress()) { // Sync on all distinct output directories. for (auto dir : distinct_output_dirs) { if (dir != nullptr) { - s = dir->Fsync(); - if (!s.ok()) { + Status error_status = dir->Fsync(); + if (!error_status.ok()) { + s = error_status; break; } } @@ -469,7 +478,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer); } - if (s.ok() || s.IsShutdownInProgress()) { + if (s.ok()) { assert(num_cfs == static_cast(job_context->superversion_contexts.size())); for (int i = 0; i != num_cfs; ++i) {