WritePrepared Txn: enable TryAgain for duplicates at the end of the batch

Summary:
The WriteBatch::Iterate will try with a larger sequence number if the memtable reports a duplicate. This status is specified with TryAgain status. So far the assumption was that the last entry in the batch will never return TryAgain, which is correct when WAL is created via WritePrepared since it always appends a batch separator if a natural one does not exist. However when reading a WAL generated by WriteCommitted this batch separator might  not exist. Although WritePrepared is not supposed to be able to read the WAL generated by WriteCommitted we should avoid confusing scenarios in which the behavior becomes unpredictable. The path fixes that by allowing TryAgain even for the last entry of the write batch.
Closes https://github.com/facebook/rocksdb/pull/3747

Differential Revision: D7708391

Pulled By: maysamyabandeh

fbshipit-source-id: bfaddaa9b14a4cdaff6977f6f63c789a6ab1ee0d
This commit is contained in:
Maysam Yabandeh 2018-04-20 15:13:47 -07:00
parent 91c087997e
commit 52aab66424
2 changed files with 14 additions and 13 deletions

View File

@ -405,9 +405,11 @@ Status WriteBatch::Iterate(Handler* handler) const {
Status s;
char tag = 0;
uint32_t column_family = 0; // default
while ((s.ok() || UNLIKELY(s.IsTryAgain())) && !input.empty() &&
bool last_was_try_again = false;
while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain())) &&
handler->Continue()) {
if (LIKELY(!s.IsTryAgain())) {
last_was_try_again = false;
tag = 0;
column_family = 0; // default
@ -418,6 +420,13 @@ Status WriteBatch::Iterate(Handler* handler) const {
}
} else {
assert(s.IsTryAgain());
assert(!last_was_try_again); // to detect infinite loop bugs
if (UNLIKELY(last_was_try_again)) {
return Status::Corruption(
"two consecutive TryAgain in WriteBatch handler; this is either a "
"software bug or data corruption.");
}
last_was_try_again = true;
s = Status::OK();
}

View File

@ -359,12 +359,8 @@ class TransactionTestBase : public ::testing::Test {
WriteBatch wb;
committed_kvs[k] = v;
wb.Put(k, v);
// TODO(myabandeh): remove this when we supprot duplicate keys in
// db->Write method
if (false) {
committed_kvs[k] = v2;
wb.Put(k, v2);
}
committed_kvs[k] = v2;
wb.Put(k, v2);
s = db->Write(write_options, &wb);
ASSERT_OK(s);
} break;
@ -376,12 +372,8 @@ class TransactionTestBase : public ::testing::Test {
committed_kvs[k] = v;
s = txn->Put(k, v);
ASSERT_OK(s);
// TODO(myabandeh): remove this when we supprot duplicate keys in
// db->Write method
if (false) {
committed_kvs[k] = v2;
s = txn->Put(k, v2);
}
committed_kvs[k] = v2;
s = txn->Put(k, v2);
ASSERT_OK(s);
if (type == 3) {
s = txn->Prepare();