WritePrepared Txn: make recoverable state visible after flush

Summary:
Currently if the CommitTimeWriteBatch is set to be used only as a state that is required only for recovery , the user cannot see that in DB until it is restarted. This while the state is already inserted into the DB after the memtable flush. It would be useful for debugging if make this state visible to the user after the flush by committing it. The patch does it by a invoking a callback that does the commit on the recoverable state.
Closes https://github.com/facebook/rocksdb/pull/3661

Differential Revision: D7424577

Pulled By: maysamyabandeh

fbshipit-source-id: 137f9408662f0853938b33fa440f27f04c1bbf5c
This commit is contained in:
Maysam Yabandeh 2018-03-28 12:01:09 -07:00 committed by Facebook Github Bot
parent 1f5def1653
commit 0377ff9dea
4 changed files with 52 additions and 0 deletions

View File

@ -621,6 +621,9 @@ class DBImpl : public DB {
void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
// Not thread-safe.
void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback);
InstrumentedMutex* mutex() { return &mutex_; }
Status NewDB();
@ -1354,6 +1357,10 @@ class DBImpl : public DB {
// REQUIRES: mutex held
std::unique_ptr<SnapshotChecker> snapshot_checker_;
// Callback for when the cached_recoverable_state_ is written to memtable
// Only to be set during initialization
std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);

View File

@ -45,6 +45,11 @@ Status DBImpl::SingleDelete(const WriteOptions& write_options,
return DB::SingleDelete(write_options, column_family, key);
}
void DBImpl::SetRecoverableStatePreReleaseCallback(
PreReleaseCallback* callback) {
recoverable_state_pre_release_callback_.reset(callback);
}
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
return WriteImpl(write_options, my_batch, nullptr, nullptr);
}
@ -976,6 +981,14 @@ Status DBImpl::WriteRecoverableState() {
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
if (status.ok() && recoverable_state_pre_release_callback_) {
const bool DISABLE_MEMTABLE = true;
for (uint64_t sub_batch_seq = seq + 1;
sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
status = recoverable_state_pre_release_callback_->Callback(
sub_batch_seq, !DISABLE_MEMTABLE);
}
}
if (status.ok()) {
cached_recoverable_state_.Clear();
cached_recoverable_state_empty_ = true;

View File

@ -913,6 +913,16 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
}
db_impl->TEST_FlushMemTable(true);
// After flush the recoverable state must be visible
if (cwb4recovery) {
s = db->Get(read_options, "gtid", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "dogs");
s = db->Get(read_options, "gtid2", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "cats");
}
// after memtable flush we can now relese the log
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
@ -1044,6 +1054,10 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
if (test_with_empty_wal) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->TEST_FlushMemTable(true);
// After flush the state must be visible
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
}
db->FlushWAL(true);
// kill and reopen to trigger recovery

View File

@ -46,6 +46,24 @@ Status WritePreparedTxnDB::Initialize(
AdvanceMaxEvictedSeq(prev_max, last_seq);
db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
// A callback to commit a single sub-batch
class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
public:
explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
: db_(db) {}
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled) override {
assert(!is_mem_disabled);
const bool PREPARE_SKIPPED = true;
db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED);
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
};
db_impl_->SetRecoverableStatePreReleaseCallback(
new CommitSubBatchPreReleaseCallback(this));
auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
handles);