diff --git a/HISTORY.md b/HISTORY.md index d92d40e73..2dc3c015d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,8 @@ ### New Features * Upon snapshot release, recompact bottommost files containing deleted/overwritten keys that previously could not be dropped due to the snapshot. This alleviates space-amp caused by long-held snapshots. * Support lower bound on iterators specified via `ReadOptions::iterate_lower_bound`. +### Bug Fixes +* Fix IOError on WAL write doesn't propagate to write group follower ## 5.7.4 (08/31/2017) No significant changes. diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index b93dd6f8f..20e9b0973 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -313,7 +313,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, versions_->SetLastSequence(last_sequence); } MemTableInsertStatusCheck(w.status); - write_thread_.ExitAsBatchGroupLeader(write_group, w.status); + write_thread_.ExitAsBatchGroupLeader(write_group, status); } if (status.ok()) { @@ -523,7 +523,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, if (!w.CallbackFailed()) { WriteCallbackStatusCheck(status); } - nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status); + nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status); if (status.ok()) { status = w.FinalStatus(); } diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 726f444fa..e3e8ad829 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -3,12 +3,17 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include #include #include #include #include "db/db_test_util.h" #include "db/write_batch_internal.h" +#include "db/write_thread.h" +#include "port/port.h" #include "port/stack_trace.h" +#include "util/fault_injection_test_env.h" +#include "util/string_util.h" #include "util/sync_point.h" namespace rocksdb { @@ -18,7 +23,9 @@ class DBWriteTest : public DBTestBase, public testing::WithParamInterface { public: DBWriteTest() : DBTestBase("/db_write_test") {} - void Open() { DBTestBase::Reopen(GetOptions(GetParam())); } + Options GetOptions() { return DBTestBase::GetOptions(GetParam()); } + + void Open() { DBTestBase::Reopen(GetOptions()); } }; // Sequence number should be return through input write batch. @@ -67,6 +74,47 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) { } } +TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { + constexpr int kNumThreads = 5; + std::unique_ptr mock_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetOptions(); + options.env = mock_env.get(); + Reopen(options); + std::atomic ready_count{0}; + std::atomic leader_count{0}; + std::vector threads; + mock_env->SetFilesystemActive(false); + // Wait until all threads linked to write threads, to make sure + // all threads join the same batch group. + SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + ready_count++; + auto* w = reinterpret_cast(arg); + if (w->state == WriteThread::STATE_GROUP_LEADER) { + leader_count++; + while (ready_count < kNumThreads) { + // busy waiting + } + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 0; i < kNumThreads; i++) { + threads.push_back(port::Thread( + [&](int index) { + // All threads should fail. + ASSERT_FALSE(Put("key" + ToString(index), "value").ok()); + }, + i)); + } + for (int i = 0; i < kNumThreads; i++) { + threads[i].join(); + } + ASSERT_EQ(1, leader_count); + // Close before mock_env destruct. + Close(); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, diff --git a/db/write_thread.cc b/db/write_thread.cc index 022f4e646..8b15e61c4 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -511,6 +511,11 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr); + // Propagate memtable write error to the whole group. + if (status.ok() && !write_group.status.ok()) { + status = write_group.status; + } + if (enable_pipelined_write_) { // Notify writers don't write to memtable to exit. for (Writer* w = last_writer; w != leader;) {