Ignore write stall triggers when auto-compaction is disabled
Summary: My understanding is that the purpose of write stall triggers are to wait for auto-compaction to catch up. Without auto-compaction, we don't need to stall writes. Also with this diff, flush/compaction conditions are recalculated on dynamic option change. Previously the conditions are recalculate only when write stall options are changed. Test Plan: See the new test. Removed two tests that are no longer valid. Reviewers: IslamAbdelRahman, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D61437
This commit is contained in:
parent
9ae92f50b2
commit
4001e799ab
@ -558,8 +558,9 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
"(waiting for flush), max_write_buffer_number is set to %d",
|
||||
name_.c_str(), imm()->NumNotFlushed(),
|
||||
mutable_cf_options.max_write_buffer_number);
|
||||
} else if (vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_stop_writes_trigger) {
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_stop_writes_trigger) {
|
||||
write_controller_token_ = write_controller->GetStopToken();
|
||||
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
|
||||
if (compaction_picker_->IsLevel0CompactionInProgress()) {
|
||||
@ -569,7 +570,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
|
||||
"[%s] Stopping writes because we have %d level-0 files",
|
||||
name_.c_str(), vstorage->l0_delay_trigger_count());
|
||||
} else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
|
||||
compaction_needed_bytes >=
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit) {
|
||||
write_controller_token_ = write_controller->GetStopToken();
|
||||
@ -594,7 +596,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
name_.c_str(), imm()->NumNotFlushed(),
|
||||
mutable_cf_options.max_write_buffer_number,
|
||||
write_controller->delayed_write_rate());
|
||||
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
|
||||
vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_slowdown_writes_trigger) {
|
||||
write_controller_token_ =
|
||||
@ -611,7 +614,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
"rate %" PRIu64,
|
||||
name_.c_str(), vstorage->l0_delay_trigger_count(),
|
||||
write_controller->delayed_write_rate());
|
||||
} else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
|
||||
vstorage->estimated_compaction_needed_bytes() >=
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit) {
|
||||
write_controller_token_ =
|
||||
|
@ -2413,6 +2413,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
|
||||
mutable_cf_options.level0_stop_writes_trigger = 10000;
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
|
||||
mutable_cf_options.disable_auto_compactions = false;
|
||||
|
||||
vstorage->TEST_set_estimated_compaction_needed_bytes(50);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
@ -2559,16 +2560,17 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
|
||||
vstorage->set_l0_delay_trigger_count(50);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
|
||||
|
||||
vstorage->set_l0_delay_trigger_count(60);
|
||||
vstorage->TEST_set_estimated_compaction_needed_bytes(300);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
|
||||
|
||||
mutable_cf_options.disable_auto_compactions = false;
|
||||
vstorage->set_l0_delay_trigger_count(70);
|
||||
vstorage->TEST_set_estimated_compaction_needed_bytes(500);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
@ -2576,7 +2578,6 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
|
||||
|
||||
mutable_cf_options.disable_auto_compactions = false;
|
||||
vstorage->set_l0_delay_trigger_count(71);
|
||||
vstorage->TEST_set_estimated_compaction_needed_bytes(501);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
|
@ -2358,20 +2358,6 @@ void DBImpl::NotifyOnCompactionCompleted(
|
||||
#endif // ROCKSDB_LITE
|
||||
}
|
||||
|
||||
bool DBImpl::NeedFlushOrCompaction(const MutableCFOptions& base_options,
|
||||
const MutableCFOptions& new_options) {
|
||||
return (base_options.disable_auto_compactions &&
|
||||
!new_options.disable_auto_compactions) ||
|
||||
base_options.level0_slowdown_writes_trigger <
|
||||
new_options.level0_slowdown_writes_trigger ||
|
||||
base_options.level0_stop_writes_trigger <
|
||||
new_options.level0_stop_writes_trigger ||
|
||||
base_options.soft_pending_compaction_bytes_limit <
|
||||
new_options.soft_pending_compaction_bytes_limit ||
|
||||
base_options.hard_pending_compaction_bytes_limit <
|
||||
new_options.hard_pending_compaction_bytes_limit;
|
||||
}
|
||||
|
||||
Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
||||
const std::unordered_map<std::string, std::string>& options_map) {
|
||||
#ifdef ROCKSDB_LITE
|
||||
@ -2385,7 +2371,6 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
||||
return Status::InvalidArgument("empty input");
|
||||
}
|
||||
|
||||
MutableCFOptions prev_options = *cfd->GetLatestMutableCFOptions();
|
||||
MutableCFOptions new_options;
|
||||
Status s;
|
||||
Status persist_options_status;
|
||||
@ -2394,14 +2379,12 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
||||
s = cfd->SetOptions(options_map);
|
||||
if (s.ok()) {
|
||||
new_options = *cfd->GetLatestMutableCFOptions();
|
||||
if (NeedFlushOrCompaction(prev_options, new_options)) {
|
||||
// Trigger possible flush/compactions. This has to be before we persist
|
||||
// options to file, otherwise there will be a deadlock with writer
|
||||
// thread.
|
||||
auto* old_sv =
|
||||
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
|
||||
delete old_sv;
|
||||
}
|
||||
// Trigger possible flush/compactions. This has to be before we persist
|
||||
// options to file, otherwise there will be a deadlock with writer
|
||||
// thread.
|
||||
auto* old_sv =
|
||||
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
|
||||
delete old_sv;
|
||||
|
||||
// Persist RocksDB options under the single write thread
|
||||
WriteThread::Writer w;
|
||||
@ -3343,7 +3326,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
|
||||
// compaction is not necessary. Need to make sure mutex is held
|
||||
// until we make a copy in the following code
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
|
||||
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
|
||||
if (c != nullptr) {
|
||||
// update statistics
|
||||
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
|
||||
|
@ -672,11 +672,6 @@ class DBImpl : public DB {
|
||||
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
// Compare options before and after to see whether flush or compaction is
|
||||
// needed immediately after dynamic option change.
|
||||
bool NeedFlushOrCompaction(const MutableCFOptions& base_options,
|
||||
const MutableCFOptions& new_options);
|
||||
|
||||
void PrintStatistics();
|
||||
|
||||
// dump rocksdb.stats to LOG
|
||||
|
@ -6,6 +6,9 @@
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#include <limits>
|
||||
#include <string>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "util/sync_point.h"
|
||||
@ -20,80 +23,101 @@ class DBOptionsTest : public DBTestBase {
|
||||
// RocksDB lite don't support dynamic options.
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
// When write stalls, user can enable auto compaction to unblock writes.
|
||||
// However, we had an issue where the stalled write thread blocks the attempt
|
||||
// to persist auto compaction option, thus creating a deadlock. The test
|
||||
// verifies the issue is fixed.
|
||||
TEST_F(DBOptionsTest, EnableAutoCompactionToUnblockWrites) {
|
||||
Options options;
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 1000 * 1000; // 1M
|
||||
options.level0_file_num_compaction_trigger = 1;
|
||||
options.level0_slowdown_writes_trigger = 1;
|
||||
options.level0_stop_writes_trigger = 1;
|
||||
options.compression = kNoCompression;
|
||||
TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) {
|
||||
const std::string kValue(1024, 'v');
|
||||
for (int method_type = 0; method_type < 2; method_type++) {
|
||||
for (int option_type = 0; option_type < 4; option_type++) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 1024 * 1024;
|
||||
options.compression = CompressionType::kNoCompression;
|
||||
options.level0_file_num_compaction_trigger = 1;
|
||||
options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
|
||||
options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
|
||||
options.hard_pending_compaction_bytes_limit =
|
||||
std::numeric_limits<uint64_t>::max();
|
||||
options.soft_pending_compaction_bytes_limit =
|
||||
std::numeric_limits<uint64_t>::max();
|
||||
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::DelayWrite:Wait",
|
||||
"DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"},
|
||||
{"DBImpl::BackgroundCompaction:Finish",
|
||||
"DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
DestroyAndReopen(options);
|
||||
for (int i = 0; i < 1024 * 2; i++) {
|
||||
Put(Key(i), kValue);
|
||||
}
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
ASSERT_EQ(2, NumTableFilesAtLevel(0));
|
||||
uint64_t l0_size = SizeAtLevel(0);
|
||||
|
||||
// Stall writes.
|
||||
Reopen(options);
|
||||
env_->StartThread(
|
||||
[](void* arg) {
|
||||
std::string value(1000, 'v');
|
||||
auto* t = static_cast<DBOptionsTest*>(arg);
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
ASSERT_OK(t->Put(t->Key(i), value));
|
||||
}
|
||||
},
|
||||
this);
|
||||
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionToUnblockWrites:1");
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ColumnFamilyHandle* handle = dbfull()->DefaultColumnFamily();
|
||||
// We will get a deadlock here if we hit the issue.
|
||||
ASSERT_OK(dbfull()->EnableAutoCompaction({handle}));
|
||||
env_->WaitForJoin();
|
||||
}
|
||||
switch (option_type) {
|
||||
case 0:
|
||||
// test with level0_stop_writes_trigger
|
||||
options.level0_stop_writes_trigger = 2;
|
||||
options.level0_slowdown_writes_trigger = 2;
|
||||
break;
|
||||
case 1:
|
||||
options.level0_slowdown_writes_trigger = 2;
|
||||
break;
|
||||
case 2:
|
||||
options.hard_pending_compaction_bytes_limit = l0_size;
|
||||
options.soft_pending_compaction_bytes_limit = l0_size;
|
||||
break;
|
||||
case 3:
|
||||
options.soft_pending_compaction_bytes_limit = l0_size;
|
||||
break;
|
||||
}
|
||||
Reopen(options);
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
|
||||
// Similar to EnableAutoCompactionAfterStallDeadlock. See comments there.
|
||||
TEST_F(DBOptionsTest, ToggleStopTriggerToUnblockWrites) {
|
||||
Options options;
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 1000 * 1000; // 1M
|
||||
options.level0_file_num_compaction_trigger = 1;
|
||||
options.level0_slowdown_writes_trigger = 1;
|
||||
options.level0_stop_writes_trigger = 1;
|
||||
options.compression = kNoCompression;
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBOptionsTest::EnableAutoCompactionAndTriggerStall:1",
|
||||
"BackgroundCallCompaction:0"},
|
||||
{"DBImpl::BackgroundCompaction():BeforePickCompaction",
|
||||
"DBOptionsTest::EnableAutoCompactionAndTriggerStall:2"},
|
||||
{"DBOptionsTest::EnableAutoCompactionAndTriggerStall:3",
|
||||
"DBImpl::BackgroundCompaction():AfterPickCompaction"}});
|
||||
// Block background compaction.
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::DelayWrite:Wait",
|
||||
"DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"},
|
||||
{"DBImpl::BackgroundCompaction:Finish",
|
||||
"DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
switch (method_type) {
|
||||
case 0:
|
||||
ASSERT_OK(
|
||||
dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
|
||||
break;
|
||||
case 1:
|
||||
ASSERT_OK(dbfull()->EnableAutoCompaction(
|
||||
{dbfull()->DefaultColumnFamily()}));
|
||||
break;
|
||||
}
|
||||
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:1");
|
||||
// Wait for stall condition recalculate.
|
||||
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:2");
|
||||
|
||||
// Stall writes.
|
||||
Reopen(options);
|
||||
env_->StartThread(
|
||||
[](void* arg) {
|
||||
std::string value(1000, 'v');
|
||||
auto* t = static_cast<DBOptionsTest*>(arg);
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
ASSERT_OK(t->Put(t->Key(i), value));
|
||||
}
|
||||
},
|
||||
this);
|
||||
TEST_SYNC_POINT("DBOptionsTest::ToggleStopTriggerToUnblockWrites:1");
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
|
||||
// We will get a deadlock here if we hit the issue.
|
||||
ASSERT_OK(
|
||||
dbfull()->SetOptions({{"level0_stop_writes_trigger", "1000000"},
|
||||
{"level0_slowdown_writes_trigger", "1000000"}}));
|
||||
env_->WaitForJoin();
|
||||
switch (option_type) {
|
||||
case 0:
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
|
||||
break;
|
||||
case 1:
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
break;
|
||||
case 2:
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
|
||||
break;
|
||||
case 3:
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
break;
|
||||
}
|
||||
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:3");
|
||||
|
||||
// Background compaction executed.
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
Loading…
Reference in New Issue
Block a user