Fix IOError on WAL write doesn't propagate to write group follower
Summary: This is a simpler version of #3097 by removing all unrelated changes. Fixing the bug where concurrent writes may get Status::OK while it actually gets IOError on WAL write. This happens when multiple writes form a write batch group, and the leader get an IOError while writing to WAL. The leader failed to pass the error to followers in the group, and the followers end up returning Status::OK() while actually writing nothing. The bug only affect writes in a batch group. Future writes after the batch group will correctly return immediately with the IOError. Closes https://github.com/facebook/rocksdb/pull/3201 Differential Revision: D6421644 Pulled By: yiwu-arbug fbshipit-source-id: 1c2a455c5b73f6842423785eb8a9dbfbb191dc0e
This commit is contained in:
parent
a4d02dc905
commit
f759122ba0
@ -2,6 +2,8 @@
|
|||||||
### New Features
|
### New Features
|
||||||
* Upon snapshot release, recompact bottommost files containing deleted/overwritten keys that previously could not be dropped due to the snapshot. This alleviates space-amp caused by long-held snapshots.
|
* Upon snapshot release, recompact bottommost files containing deleted/overwritten keys that previously could not be dropped due to the snapshot. This alleviates space-amp caused by long-held snapshots.
|
||||||
* Support lower bound on iterators specified via `ReadOptions::iterate_lower_bound`.
|
* Support lower bound on iterators specified via `ReadOptions::iterate_lower_bound`.
|
||||||
|
### Bug Fixes
|
||||||
|
* Fix IOError on WAL write doesn't propagate to write group follower
|
||||||
|
|
||||||
## 5.7.4 (08/31/2017)
|
## 5.7.4 (08/31/2017)
|
||||||
No significant changes.
|
No significant changes.
|
||||||
|
@ -313,7 +313,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
versions_->SetLastSequence(last_sequence);
|
versions_->SetLastSequence(last_sequence);
|
||||||
}
|
}
|
||||||
MemTableInsertStatusCheck(w.status);
|
MemTableInsertStatusCheck(w.status);
|
||||||
write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
|
write_thread_.ExitAsBatchGroupLeader(write_group, status);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
@ -523,7 +523,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
|
|||||||
if (!w.CallbackFailed()) {
|
if (!w.CallbackFailed()) {
|
||||||
WriteCallbackStatusCheck(status);
|
WriteCallbackStatusCheck(status);
|
||||||
}
|
}
|
||||||
nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
|
nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status);
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
status = w.FinalStatus();
|
status = w.FinalStatus();
|
||||||
}
|
}
|
||||||
|
@ -3,12 +3,17 @@
|
|||||||
// COPYING file in the root directory) and Apache 2.0 License
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
// (found in the LICENSE.Apache file in the root directory).
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include "db/db_test_util.h"
|
#include "db/db_test_util.h"
|
||||||
#include "db/write_batch_internal.h"
|
#include "db/write_batch_internal.h"
|
||||||
|
#include "db/write_thread.h"
|
||||||
|
#include "port/port.h"
|
||||||
#include "port/stack_trace.h"
|
#include "port/stack_trace.h"
|
||||||
|
#include "util/fault_injection_test_env.h"
|
||||||
|
#include "util/string_util.h"
|
||||||
#include "util/sync_point.h"
|
#include "util/sync_point.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
@ -18,7 +23,9 @@ class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
|
|||||||
public:
|
public:
|
||||||
DBWriteTest() : DBTestBase("/db_write_test") {}
|
DBWriteTest() : DBTestBase("/db_write_test") {}
|
||||||
|
|
||||||
void Open() { DBTestBase::Reopen(GetOptions(GetParam())); }
|
Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
|
||||||
|
|
||||||
|
void Open() { DBTestBase::Reopen(GetOptions()); }
|
||||||
};
|
};
|
||||||
|
|
||||||
// Sequence number should be return through input write batch.
|
// Sequence number should be return through input write batch.
|
||||||
@ -67,6 +74,47 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
|
||||||
|
constexpr int kNumThreads = 5;
|
||||||
|
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
||||||
|
new FaultInjectionTestEnv(Env::Default()));
|
||||||
|
Options options = GetOptions();
|
||||||
|
options.env = mock_env.get();
|
||||||
|
Reopen(options);
|
||||||
|
std::atomic<int> ready_count{0};
|
||||||
|
std::atomic<int> leader_count{0};
|
||||||
|
std::vector<port::Thread> threads;
|
||||||
|
mock_env->SetFilesystemActive(false);
|
||||||
|
// Wait until all threads linked to write threads, to make sure
|
||||||
|
// all threads join the same batch group.
|
||||||
|
SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
|
||||||
|
ready_count++;
|
||||||
|
auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
|
||||||
|
if (w->state == WriteThread::STATE_GROUP_LEADER) {
|
||||||
|
leader_count++;
|
||||||
|
while (ready_count < kNumThreads) {
|
||||||
|
// busy waiting
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
for (int i = 0; i < kNumThreads; i++) {
|
||||||
|
threads.push_back(port::Thread(
|
||||||
|
[&](int index) {
|
||||||
|
// All threads should fail.
|
||||||
|
ASSERT_FALSE(Put("key" + ToString(index), "value").ok());
|
||||||
|
},
|
||||||
|
i));
|
||||||
|
}
|
||||||
|
for (int i = 0; i < kNumThreads; i++) {
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
|
ASSERT_EQ(1, leader_count);
|
||||||
|
// Close before mock_env destruct.
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
|
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
|
||||||
testing::Values(DBTestBase::kDefault,
|
testing::Values(DBTestBase::kDefault,
|
||||||
DBTestBase::kConcurrentWALWrites,
|
DBTestBase::kConcurrentWALWrites,
|
||||||
|
@ -511,6 +511,11 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
|
|||||||
Writer* last_writer = write_group.last_writer;
|
Writer* last_writer = write_group.last_writer;
|
||||||
assert(leader->link_older == nullptr);
|
assert(leader->link_older == nullptr);
|
||||||
|
|
||||||
|
// Propagate memtable write error to the whole group.
|
||||||
|
if (status.ok() && !write_group.status.ok()) {
|
||||||
|
status = write_group.status;
|
||||||
|
}
|
||||||
|
|
||||||
if (enable_pipelined_write_) {
|
if (enable_pipelined_write_) {
|
||||||
// Notify writers don't write to memtable to exit.
|
// Notify writers don't write to memtable to exit.
|
||||||
for (Writer* w = last_writer; w != leader;) {
|
for (Writer* w = last_writer; w != leader;) {
|
||||||
|
Loading…
Reference in New Issue
Block a user