From 32604e6601be84b771ad1ab03ee02284c982d945 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 21 Jul 2016 10:10:41 -0700 Subject: [PATCH] Fix flush not being commit while writing manifest Summary: Fix flush not being commit while writing manifest, which is a recent bug introduced by D60075. The issue: # Options.max_background_flushes > 1 # Background thread A pick up a flush job, flush, then commit to manifest. (Note that mutex is released before writing manifest.) # Background thread B pick up another flush job, flush. When it gets to `MemTableList::InstallMemtableFlushResults`, it notices another thread is commiting, so it quit. # After the first commit, thread A doesn't double check if there are more flush result need to commit, leaving the second flush uncommitted. Test Plan: run the test. Also verify the new test hit deadlock without the fix. Reviewers: sdong, igor, lightmark Reviewed By: lightmark Subscribers: andrewkr, omegaga, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D60969 --- CMakeLists.txt | 1 + Makefile | 5 ++ db/db_flush_test.cc | 55 ++++++++++++++++++++ db/memtable_list.cc | 121 ++++++++++++++++++++++++-------------------- src.mk | 1 + 5 files changed, 128 insertions(+), 55 deletions(-) create mode 100644 db/db_flush_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index c0c1bf447..fbdffb97a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -356,6 +356,7 @@ set(TESTS db/db_test2.cc db/db_block_cache_test.cc db/db_bloom_filter_test.cc + db/db_flush_test.cc db/db_iterator_test.cc db/db_sst_test.cc db/db_universal_compaction_test.cc diff --git a/Makefile b/Makefile index 94358b57c..63b467b39 100644 --- a/Makefile +++ b/Makefile @@ -274,8 +274,10 @@ TESTS = \ db_compaction_filter_test \ db_compaction_test \ db_dynamic_level_test \ + db_flush_test \ db_inplace_update_test \ db_iterator_test \ + db_options_test \ db_sst_test \ db_tailing_iter_test \ db_universal_compaction_test \ @@ -920,6 +922,9 @@ db_compaction_test: db/db_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TE db_dynamic_level_test: db/db_dynamic_level_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_flush_test: db/db_flush_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc new file mode 100644 index 000000000..71062143b --- /dev/null +++ b/db/db_flush_test.cc @@ -0,0 +1,55 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "util/sync_point.h" + +namespace rocksdb { + +class DBFlushTest : public DBTestBase { + public: + DBFlushTest() : DBTestBase("/db_flush_test") {} +}; + +// We had issue when two background threads trying to flush at the same time, +// only one of them get committed. The test verifies the issue is fixed. +TEST_F(DBFlushTest, FlushWhileWritingManifest) { + Options options; + options.disable_auto_compactions = true; + options.max_background_flushes = 2; + Reopen(options); + FlushOptions no_wait; + no_wait.wait = false; + + SyncPoint::GetInstance()->LoadDependency( + {{"VersionSet::LogAndApply:WriteManifest", + "DBFlushTest::FlushWhileWritingManifest:1"}, + {"MemTableList::InstallMemtableFlushResults:InProgress", + "VersionSet::LogAndApply:WriteManifestDone"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("foo", "v")); + ASSERT_OK(dbfull()->Flush(no_wait)); + TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1"); + ASSERT_OK(Put("bar", "v")); + ASSERT_OK(dbfull()->Flush(no_wait)); + // If the issue is hit we will wait here forever. + dbfull()->TEST_WaitForFlushMemTable(); + + ASSERT_EQ(2, TotalTableFiles()); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 03b3f1a9a..799887ed1 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -11,14 +11,15 @@ #include #include -#include "rocksdb/db.h" #include "db/memtable.h" #include "db/version_set.h" +#include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "table/merger.h" #include "util/coding.h" #include "util/log_buffer.h" +#include "util/sync_point.h" #include "util/thread_status_util.h" namespace rocksdb { @@ -297,69 +298,79 @@ Status MemTableList::InstallMemtableFlushResults( // if some other thread is already committing, then return Status s; if (commit_in_progress_) { + TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress"); return s; } // Only a single thread can be executing this piece of code commit_in_progress_ = true; - // scan all memtables from the earliest, and commit those - // (in that order) that have finished flushing. Memetables - // are always committed in the order that they were created. - uint64_t batch_file_number = 0; - size_t batch_count = 0; - autovector edit_list; - auto& memlist = current_->memlist_; - // enumerate from the last (earliest) element to see how many batch finished - for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { - MemTable* m = *it; - if (!m->flush_completed_) { + // Retry until all completed flushes are committed. New flushes can finish + // while the current thread is writing manifest where mutex is released. + while (s.ok()) { + auto& memlist = current_->memlist_; + if (memlist.empty() || !memlist.back()->flush_completed_) { break; } - if (it == memlist.rbegin() || batch_file_number != m->file_number_) { - batch_file_number = m->file_number_; - LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started", - cfd->GetName().c_str(), m->file_number_); - edit_list.push_back(&m->edit_); - } - batch_count++; - } - - if (batch_count > 0) { - // this can release and reacquire the mutex. - s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory); - - // we will be changing the version in the next code path, - // so we better create a new one, since versions are immutable - InstallNewVersion(); - - // All the later memtables that have the same filenum - // are part of the same batch. They can be committed now. - uint64_t mem_id = 1; // how many memtables have been flushed. - if (s.ok()) { // commit new state - while (batch_count-- > 0) { - MemTable* m = current_->memlist_.back(); - LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " done", - cfd->GetName().c_str(), m->file_number_, mem_id); - assert(m->file_number_ > 0); - current_->Remove(m, to_delete); - ++mem_id; + // scan all memtables from the earliest, and commit those + // (in that order) that have finished flushing. Memetables + // are always committed in the order that they were created. + uint64_t batch_file_number = 0; + size_t batch_count = 0; + autovector edit_list; + // enumerate from the last (earliest) element to see how many batch finished + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + if (!m->flush_completed_) { + break; } - } else { - for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) { - MemTable* m = *it; - // commit failed. setup state so that we can flush again. - LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " failed", - m->file_number_, mem_id); - m->flush_completed_ = false; - m->flush_in_progress_ = false; - m->edit_.Clear(); - num_flush_not_started_++; - m->file_number_ = 0; - imm_flush_needed.store(true, std::memory_order_release); - ++mem_id; + if (it == memlist.rbegin() || batch_file_number != m->file_number_) { + batch_file_number = m->file_number_; + LogToBuffer(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 " started", + cfd->GetName().c_str(), m->file_number_); + edit_list.push_back(&m->edit_); + } + batch_count++; + } + + if (batch_count > 0) { + // this can release and reacquire the mutex. + s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, + db_directory); + + // we will be changing the version in the next code path, + // so we better create a new one, since versions are immutable + InstallNewVersion(); + + // All the later memtables that have the same filenum + // are part of the same batch. They can be committed now. + uint64_t mem_id = 1; // how many memtables have been flushed. + if (s.ok()) { // commit new state + while (batch_count-- > 0) { + MemTable* m = current_->memlist_.back(); + LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, mem_id); + assert(m->file_number_ > 0); + current_->Remove(m, to_delete); + ++mem_id; + } + } else { + for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) { + MemTable* m = *it; + // commit failed. setup state so that we can flush again. + LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + m->file_number_, mem_id); + m->flush_completed_ = false; + m->flush_in_progress_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + m->file_number_ = 0; + imm_flush_needed.store(true, std::memory_order_release); + ++mem_id; + } } } } diff --git a/src.mk b/src.mk index c98afdcc6..d5761c41f 100644 --- a/src.mk +++ b/src.mk @@ -217,6 +217,7 @@ MAIN_SOURCES = \ db/db_compaction_filter_test.cc \ db/db_compaction_test.cc \ db/db_dynamic_level_test.cc \ + db/db_flush_test.cc \ db/db_inplace_update_test.cc \ db/db_iterator_test.cc \ db/db_log_iter_test.cc \