Revert "forbid merge during recovery"

This reverts commit 715256338ae21bc7cdfa21b720ed73143c61e5aa.
This commit is contained in:
Islam AbdelRahman 2016-11-09 16:07:20 -08:00
parent f201a44b41
commit 7faa39b288
3 changed files with 29 additions and 30 deletions

View File

@ -1610,10 +1610,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// we just ignore the update.
// That's why we set ignore missing column families to true
bool has_valid_writes = false;
// If we pass DB through and options.max_successive_merges is hit
// during recovery, Get() will be issued which will try to acquire
// DB mutex and cause deadlock, as DB mutex is already held.
// The DB pointer is not needed unless 2PC is used.
// TODO(sdong) fix the allow_2pc case too.
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes);
log_number, db_options_.allow_2pc ? this : nullptr,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes);
// If it is the first log file and there is no column family updated
// after replaying the file, this file may be a stale file. We ignore
// sequence IDs from the file. Otherwise, if a newer stale log file that

View File

@ -145,22 +145,6 @@ TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
value = Get(1, "a");
}
TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
options.max_successive_merges = 3;
options.merge_operator = MergeOperators::CreatePutOperator();
options.disable_auto_compactions = true;
DestroyAndReopen(options);
Put("poi", "Finch");
db_->Merge(WriteOptions(), "poi", "Reese");
db_->Merge(WriteOptions(), "poi", "Shaw");
db_->Merge(WriteOptions(), "poi", "Root");
options.max_successive_merges = 2;
Reopen(options);
}
#ifndef ROCKSDB_LITE
class DBTestSharedWriteBufferAcrossCFs
: public DBTestBase,
@ -1905,6 +1889,23 @@ TEST_P(MergeOperatorPinningTest, TailingIterator) {
}
#endif // ROCKSDB_LITE
TEST_F(DBTest2, MaxSuccessiveMergesInRecovery) {
Options options;
options = CurrentOptions(options);
options.merge_operator = MergeOperators::CreatePutOperator();
DestroyAndReopen(options);
db_->Put(WriteOptions(), "foo", "bar");
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
options.max_successive_merges = 3;
Reopen(options);
}
size_t GetEncodedEntrySize(size_t key_size, size_t value_size) {
std::string buffer;

View File

@ -885,13 +885,10 @@ class MemTableInserter : public WriteBatch::Handler {
std::string merged_value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
Status s = Status::NotSupported();
if (db_ != nullptr && recovering_log_number_ != 0) {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
s = db_->Get(ropts, cf_handle, key, &prev_value);
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
Status s = db_->Get(ropts, cf_handle, key, &prev_value);
char* prev_buffer = const_cast<char*>(prev_value.c_str());
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
@ -995,12 +992,7 @@ class MemTableInserter : public WriteBatch::Handler {
auto* moptions = mem->GetMemTableOptions();
bool perform_merge = false;
// If we pass DB through and options.max_successive_merges is hit
// during recovery, Get() will be issued which will try to acquire
// DB mutex and cause deadlock, as DB mutex is already held.
// So we disable merge in recovery
if (moptions->max_successive_merges > 0 && db_ != nullptr &&
recovering_log_number_ == 0) {
if (moptions->max_successive_merges > 0 && db_ != nullptr) {
LookupKey lkey(key, sequence_);
// Count the number of successive merges at the head