forbid merge during recovery
Summary: Mitigate regression bug of options.max_successive_merges hit during DB Recovery For https://reviews.facebook.net/D62625 Test Plan: make all check Reviewers: horuff, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D62655
This commit is contained in:
parent
5735b3dc2a
commit
715256338a
@ -1597,16 +1597,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
// we just ignore the update.
|
// we just ignore the update.
|
||||||
// That's why we set ignore missing column families to true
|
// That's why we set ignore missing column families to true
|
||||||
bool has_valid_writes = false;
|
bool has_valid_writes = false;
|
||||||
// If we pass DB through and options.max_successive_merges is hit
|
|
||||||
// during recovery, Get() will be issued which will try to acquire
|
|
||||||
// DB mutex and cause deadlock, as DB mutex is already held.
|
|
||||||
// The DB pointer is not needed unless 2PC is used.
|
|
||||||
// TODO(sdong) fix the allow_2pc case too.
|
|
||||||
status = WriteBatchInternal::InsertInto(
|
status = WriteBatchInternal::InsertInto(
|
||||||
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
|
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
|
||||||
log_number, db_options_.allow_2pc ? this : nullptr,
|
log_number, this, false /* concurrent_memtable_writes */,
|
||||||
false /* concurrent_memtable_writes */, next_sequence,
|
next_sequence, &has_valid_writes);
|
||||||
&has_valid_writes);
|
|
||||||
// If it is the first log file and there is no column family updated
|
// If it is the first log file and there is no column family updated
|
||||||
// after replaying the file, this file may be a stale file. We ignore
|
// after replaying the file, this file may be a stale file. We ignore
|
||||||
// sequence IDs from the file. Otherwise, if a newer stale log file that
|
// sequence IDs from the file. Otherwise, if a newer stale log file that
|
||||||
|
@ -145,6 +145,22 @@ TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
|
|||||||
value = Get(1, "a");
|
value = Get(1, "a");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.statistics = rocksdb::CreateDBStatistics();
|
||||||
|
options.max_successive_merges = 3;
|
||||||
|
options.merge_operator = MergeOperators::CreatePutOperator();
|
||||||
|
options.disable_auto_compactions = true;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
Put("poi", "Finch");
|
||||||
|
db_->Merge(WriteOptions(), "poi", "Reese");
|
||||||
|
db_->Merge(WriteOptions(), "poi", "Shaw");
|
||||||
|
db_->Merge(WriteOptions(), "poi", "Root");
|
||||||
|
options.max_successive_merges = 2;
|
||||||
|
Reopen(options);
|
||||||
|
}
|
||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
class DBTestSharedWriteBufferAcrossCFs
|
class DBTestSharedWriteBufferAcrossCFs
|
||||||
: public DBTestBase,
|
: public DBTestBase,
|
||||||
@ -1889,23 +1905,6 @@ TEST_P(MergeOperatorPinningTest, TailingIterator) {
|
|||||||
}
|
}
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
TEST_F(DBTest2, MaxSuccessiveMergesInRecovery) {
|
|
||||||
Options options;
|
|
||||||
options = CurrentOptions(options);
|
|
||||||
options.merge_operator = MergeOperators::CreatePutOperator();
|
|
||||||
DestroyAndReopen(options);
|
|
||||||
|
|
||||||
db_->Put(WriteOptions(), "foo", "bar");
|
|
||||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
|
||||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
|
||||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
|
||||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
|
||||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
|
||||||
|
|
||||||
options.max_successive_merges = 3;
|
|
||||||
Reopen(options);
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t GetEncodedEntrySize(size_t key_size, size_t value_size) {
|
size_t GetEncodedEntrySize(size_t key_size, size_t value_size) {
|
||||||
std::string buffer;
|
std::string buffer;
|
||||||
|
|
||||||
|
@ -882,10 +882,13 @@ class MemTableInserter : public WriteBatch::Handler {
|
|||||||
std::string merged_value;
|
std::string merged_value;
|
||||||
|
|
||||||
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
|
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
|
||||||
if (cf_handle == nullptr) {
|
Status s = Status::NotSupported();
|
||||||
cf_handle = db_->DefaultColumnFamily();
|
if (db_ != nullptr && recovering_log_number_ != 0) {
|
||||||
|
if (cf_handle == nullptr) {
|
||||||
|
cf_handle = db_->DefaultColumnFamily();
|
||||||
|
}
|
||||||
|
s = db_->Get(ropts, cf_handle, key, &prev_value);
|
||||||
}
|
}
|
||||||
Status s = db_->Get(ropts, cf_handle, key, &prev_value);
|
|
||||||
|
|
||||||
char* prev_buffer = const_cast<char*>(prev_value.c_str());
|
char* prev_buffer = const_cast<char*>(prev_value.c_str());
|
||||||
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
|
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
|
||||||
@ -989,7 +992,12 @@ class MemTableInserter : public WriteBatch::Handler {
|
|||||||
auto* moptions = mem->GetMemTableOptions();
|
auto* moptions = mem->GetMemTableOptions();
|
||||||
bool perform_merge = false;
|
bool perform_merge = false;
|
||||||
|
|
||||||
if (moptions->max_successive_merges > 0 && db_ != nullptr) {
|
// If we pass DB through and options.max_successive_merges is hit
|
||||||
|
// during recovery, Get() will be issued which will try to acquire
|
||||||
|
// DB mutex and cause deadlock, as DB mutex is already held.
|
||||||
|
// So we disable merge in recovery
|
||||||
|
if (moptions->max_successive_merges > 0 && db_ != nullptr &&
|
||||||
|
recovering_log_number_ == 0) {
|
||||||
LookupKey lkey(key, sequence_);
|
LookupKey lkey(key, sequence_);
|
||||||
|
|
||||||
// Count the number of successive merges at the head
|
// Count the number of successive merges at the head
|
||||||
|
Loading…
Reference in New Issue
Block a user