Fix flush not being commit while writing manifest
Summary: Fix flush not being commit while writing manifest, which is a recent bug introduced by D60075. The issue: # Options.max_background_flushes > 1 # Background thread A pick up a flush job, flush, then commit to manifest. (Note that mutex is released before writing manifest.) # Background thread B pick up another flush job, flush. When it gets to `MemTableList::InstallMemtableFlushResults`, it notices another thread is commiting, so it quit. # After the first commit, thread A doesn't double check if there are more flush result need to commit, leaving the second flush uncommitted. Test Plan: run the test. Also verify the new test hit deadlock without the fix. Reviewers: sdong, igor, lightmark Reviewed By: lightmark Subscribers: andrewkr, omegaga, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D60969
This commit is contained in:
parent
becc230aa9
commit
2462cebfe8
@ -356,6 +356,7 @@ set(TESTS
|
||||
db/db_test2.cc
|
||||
db/db_block_cache_test.cc
|
||||
db/db_bloom_filter_test.cc
|
||||
db/db_flush_test.cc
|
||||
db/db_iterator_test.cc
|
||||
db/db_sst_test.cc
|
||||
db/db_universal_compaction_test.cc
|
||||
|
5
Makefile
5
Makefile
@ -274,8 +274,10 @@ TESTS = \
|
||||
db_compaction_filter_test \
|
||||
db_compaction_test \
|
||||
db_dynamic_level_test \
|
||||
db_flush_test \
|
||||
db_inplace_update_test \
|
||||
db_iterator_test \
|
||||
db_options_test \
|
||||
db_sst_test \
|
||||
db_tailing_iter_test \
|
||||
db_universal_compaction_test \
|
||||
@ -920,6 +922,9 @@ db_compaction_test: db/db_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TE
|
||||
db_dynamic_level_test: db/db_dynamic_level_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
db_flush_test: db/db_flush_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
|
55
db/db_flush_test.cc
Normal file
55
db/db_flush_test.cc
Normal file
@ -0,0 +1,55 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class DBFlushTest : public DBTestBase {
|
||||
public:
|
||||
DBFlushTest() : DBTestBase("/db_flush_test") {}
|
||||
};
|
||||
|
||||
// We had issue when two background threads trying to flush at the same time,
|
||||
// only one of them get committed. The test verifies the issue is fixed.
|
||||
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
|
||||
Options options;
|
||||
options.disable_auto_compactions = true;
|
||||
options.max_background_flushes = 2;
|
||||
Reopen(options);
|
||||
FlushOptions no_wait;
|
||||
no_wait.wait = false;
|
||||
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"VersionSet::LogAndApply:WriteManifest",
|
||||
"DBFlushTest::FlushWhileWritingManifest:1"},
|
||||
{"MemTableList::InstallMemtableFlushResults:InProgress",
|
||||
"VersionSet::LogAndApply:WriteManifestDone"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(Put("foo", "v"));
|
||||
ASSERT_OK(dbfull()->Flush(no_wait));
|
||||
TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
|
||||
ASSERT_OK(Put("bar", "v"));
|
||||
ASSERT_OK(dbfull()->Flush(no_wait));
|
||||
// If the issue is hit we will wait here forever.
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
|
||||
ASSERT_EQ(2, TotalTableFiles());
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
rocksdb::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -11,14 +11,15 @@
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <string>
|
||||
#include "rocksdb/db.h"
|
||||
#include "db/memtable.h"
|
||||
#include "db/version_set.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "table/merger.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/log_buffer.h"
|
||||
#include "util/sync_point.h"
|
||||
#include "util/thread_status_util.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -297,69 +298,79 @@ Status MemTableList::InstallMemtableFlushResults(
|
||||
// if some other thread is already committing, then return
|
||||
Status s;
|
||||
if (commit_in_progress_) {
|
||||
TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress");
|
||||
return s;
|
||||
}
|
||||
|
||||
// Only a single thread can be executing this piece of code
|
||||
commit_in_progress_ = true;
|
||||
|
||||
// scan all memtables from the earliest, and commit those
|
||||
// (in that order) that have finished flushing. Memetables
|
||||
// are always committed in the order that they were created.
|
||||
uint64_t batch_file_number = 0;
|
||||
size_t batch_count = 0;
|
||||
autovector<VersionEdit*> edit_list;
|
||||
auto& memlist = current_->memlist_;
|
||||
// enumerate from the last (earliest) element to see how many batch finished
|
||||
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
|
||||
MemTable* m = *it;
|
||||
if (!m->flush_completed_) {
|
||||
// Retry until all completed flushes are committed. New flushes can finish
|
||||
// while the current thread is writing manifest where mutex is released.
|
||||
while (s.ok()) {
|
||||
auto& memlist = current_->memlist_;
|
||||
if (memlist.empty() || !memlist.back()->flush_completed_) {
|
||||
break;
|
||||
}
|
||||
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
|
||||
batch_file_number = m->file_number_;
|
||||
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started",
|
||||
cfd->GetName().c_str(), m->file_number_);
|
||||
edit_list.push_back(&m->edit_);
|
||||
}
|
||||
batch_count++;
|
||||
}
|
||||
|
||||
if (batch_count > 0) {
|
||||
// this can release and reacquire the mutex.
|
||||
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory);
|
||||
|
||||
// we will be changing the version in the next code path,
|
||||
// so we better create a new one, since versions are immutable
|
||||
InstallNewVersion();
|
||||
|
||||
// All the later memtables that have the same filenum
|
||||
// are part of the same batch. They can be committed now.
|
||||
uint64_t mem_id = 1; // how many memtables have been flushed.
|
||||
if (s.ok()) { // commit new state
|
||||
while (batch_count-- > 0) {
|
||||
MemTable* m = current_->memlist_.back();
|
||||
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
|
||||
": memtable #%" PRIu64 " done",
|
||||
cfd->GetName().c_str(), m->file_number_, mem_id);
|
||||
assert(m->file_number_ > 0);
|
||||
current_->Remove(m, to_delete);
|
||||
++mem_id;
|
||||
// scan all memtables from the earliest, and commit those
|
||||
// (in that order) that have finished flushing. Memetables
|
||||
// are always committed in the order that they were created.
|
||||
uint64_t batch_file_number = 0;
|
||||
size_t batch_count = 0;
|
||||
autovector<VersionEdit*> edit_list;
|
||||
// enumerate from the last (earliest) element to see how many batch finished
|
||||
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
|
||||
MemTable* m = *it;
|
||||
if (!m->flush_completed_) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
|
||||
MemTable* m = *it;
|
||||
// commit failed. setup state so that we can flush again.
|
||||
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
|
||||
": memtable #%" PRIu64 " failed",
|
||||
m->file_number_, mem_id);
|
||||
m->flush_completed_ = false;
|
||||
m->flush_in_progress_ = false;
|
||||
m->edit_.Clear();
|
||||
num_flush_not_started_++;
|
||||
m->file_number_ = 0;
|
||||
imm_flush_needed.store(true, std::memory_order_release);
|
||||
++mem_id;
|
||||
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
|
||||
batch_file_number = m->file_number_;
|
||||
LogToBuffer(log_buffer,
|
||||
"[%s] Level-0 commit table #%" PRIu64 " started",
|
||||
cfd->GetName().c_str(), m->file_number_);
|
||||
edit_list.push_back(&m->edit_);
|
||||
}
|
||||
batch_count++;
|
||||
}
|
||||
|
||||
if (batch_count > 0) {
|
||||
// this can release and reacquire the mutex.
|
||||
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
|
||||
db_directory);
|
||||
|
||||
// we will be changing the version in the next code path,
|
||||
// so we better create a new one, since versions are immutable
|
||||
InstallNewVersion();
|
||||
|
||||
// All the later memtables that have the same filenum
|
||||
// are part of the same batch. They can be committed now.
|
||||
uint64_t mem_id = 1; // how many memtables have been flushed.
|
||||
if (s.ok()) { // commit new state
|
||||
while (batch_count-- > 0) {
|
||||
MemTable* m = current_->memlist_.back();
|
||||
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
|
||||
": memtable #%" PRIu64 " done",
|
||||
cfd->GetName().c_str(), m->file_number_, mem_id);
|
||||
assert(m->file_number_ > 0);
|
||||
current_->Remove(m, to_delete);
|
||||
++mem_id;
|
||||
}
|
||||
} else {
|
||||
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
|
||||
MemTable* m = *it;
|
||||
// commit failed. setup state so that we can flush again.
|
||||
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
|
||||
": memtable #%" PRIu64 " failed",
|
||||
m->file_number_, mem_id);
|
||||
m->flush_completed_ = false;
|
||||
m->flush_in_progress_ = false;
|
||||
m->edit_.Clear();
|
||||
num_flush_not_started_++;
|
||||
m->file_number_ = 0;
|
||||
imm_flush_needed.store(true, std::memory_order_release);
|
||||
++mem_id;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user