diff --git a/db/db_impl.cc b/db/db_impl.cc index c8b1d60fa..e3d1a4491 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1597,16 +1597,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // we just ignore the update. // That's why we set ignore missing column families to true bool has_valid_writes = false; - // If we pass DB through and options.max_successive_merges is hit - // during recovery, Get() will be issued which will try to acquire - // DB mutex and cause deadlock, as DB mutex is already held. - // The DB pointer is not needed unless 2PC is used. - // TODO(sdong) fix the allow_2pc case too. status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), &flush_scheduler_, true, - log_number, db_options_.allow_2pc ? this : nullptr, - false /* concurrent_memtable_writes */, next_sequence, - &has_valid_writes); + log_number, this, false /* concurrent_memtable_writes */, + next_sequence, &has_valid_writes); // If it is the first log file and there is no column family updated // after replaying the file, this file may be a stale file. We ignore // sequence IDs from the file. Otherwise, if a newer stale log file that diff --git a/db/db_test2.cc b/db/db_test2.cc index 670c045e4..20ccf0a77 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -145,6 +145,22 @@ TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) { value = Get(1, "a"); } +TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + options.max_successive_merges = 3; + options.merge_operator = MergeOperators::CreatePutOperator(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + Put("poi", "Finch"); + db_->Merge(WriteOptions(), "poi", "Reese"); + db_->Merge(WriteOptions(), "poi", "Shaw"); + db_->Merge(WriteOptions(), "poi", "Root"); + options.max_successive_merges = 2; + Reopen(options); +} + #ifndef ROCKSDB_LITE class DBTestSharedWriteBufferAcrossCFs : public DBTestBase, @@ -1889,23 +1905,6 @@ TEST_P(MergeOperatorPinningTest, TailingIterator) { } #endif // ROCKSDB_LITE -TEST_F(DBTest2, MaxSuccessiveMergesInRecovery) { - Options options; - options = CurrentOptions(options); - options.merge_operator = MergeOperators::CreatePutOperator(); - DestroyAndReopen(options); - - db_->Put(WriteOptions(), "foo", "bar"); - ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar")); - ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar")); - ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar")); - ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar")); - ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar")); - - options.max_successive_merges = 3; - Reopen(options); -} - size_t GetEncodedEntrySize(size_t key_size, size_t value_size) { std::string buffer; diff --git a/db/write_batch.cc b/db/write_batch.cc index ea909e244..2ed3cd85b 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -882,10 +882,13 @@ class MemTableInserter : public WriteBatch::Handler { std::string merged_value; auto cf_handle = cf_mems_->GetColumnFamilyHandle(); - if (cf_handle == nullptr) { - cf_handle = db_->DefaultColumnFamily(); + Status s = Status::NotSupported(); + if (db_ != nullptr && recovering_log_number_ != 0) { + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } + s = db_->Get(ropts, cf_handle, key, &prev_value); } - Status s = db_->Get(ropts, cf_handle, key, &prev_value); char* prev_buffer = const_cast(prev_value.c_str()); uint32_t prev_size = static_cast(prev_value.size()); @@ -989,7 +992,12 @@ class MemTableInserter : public WriteBatch::Handler { auto* moptions = mem->GetMemTableOptions(); bool perform_merge = false; - if (moptions->max_successive_merges > 0 && db_ != nullptr) { + // If we pass DB through and options.max_successive_merges is hit + // during recovery, Get() will be issued which will try to acquire + // DB mutex and cause deadlock, as DB mutex is already held. + // So we disable merge in recovery + if (moptions->max_successive_merges > 0 && db_ != nullptr && + recovering_log_number_ == 0) { LookupKey lkey(key, sequence_); // Count the number of successive merges at the head