Introduce bottom-pri thread pool for large universal compactions

Summary:
When we had a single thread pool for compactions, a thread could be busy for a long time (minutes) executing a compaction involving the bottom level. In multi-instance setups, the entire thread pool could be consumed by such bottom-level compactions. Then, top-level compactions (e.g., a few L0 files) would be blocked for a long time ("head-of-line blocking"). Such top-level compactions are critical to prevent compaction stalls as they can quickly reduce number of L0 files / sorted runs.

This diff introduces a bottom-priority queue for universal compactions including the bottom level. This alleviates the head-of-line blocking situation for fast, top-level compactions.

- Added `Env::Priority::BOTTOM` thread pool. This feature is only enabled if user explicitly configures it to have a positive number of threads.
- Changed `ThreadPoolImpl`'s default thread limit from one to zero. This change is invisible to users as we call `IncBackgroundThreadsIfNeeded` on the low-pri/high-pri pools during `DB::Open` with values of at least one. It is necessary, though, for bottom-pri to start with zero threads so the feature is disabled by default.
- Separated `ManualCompaction` into two parts in `PrepickedCompaction`. `PrepickedCompaction` is used for any compaction that's picked outside of its execution thread, either manual or automatic.
- Forward universal compactions involving last level to the bottom pool (worker thread's entry point is `BGWorkBottomCompaction`).
- Track `bg_bottom_compaction_scheduled_` so we can wait for bottom-level compactions to finish. We don't count them against the background jobs limits. So users of this feature will get an extra compaction for free.
Closes https://github.com/facebook/rocksdb/pull/2580

Differential Revision: D5422916

Pulled By: ajkr

fbshipit-source-id: a74bd11f1ea4933df3739b16808bb21fcd512333
This commit is contained in:
Andrew Kryczka 2017-08-03 15:36:28 -07:00 committed by Facebook Github Bot
parent 0b814ba92d
commit cc01985db0
15 changed files with 274 additions and 71 deletions

View File

@ -3,6 +3,7 @@
### New Features
* Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators.
* Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1.
* Universal compactions including the bottom level can be executed in a dedicated thread pool. This alleviates head-of-line blocking in the compaction queue, which cause write stalling, particularly in multi-instance use cases. Users can enable this feature via `Env::SetBackgroundThreads(N, Env::Priority::BOTTOM)`, where `N > 0`.
### Bug Fixes
* Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`.

View File

@ -168,6 +168,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
last_batch_group_size_(0),
unscheduled_flushes_(0),
unscheduled_compactions_(0),
bg_bottom_compaction_scheduled_(0),
bg_compaction_scheduled_(0),
num_running_compactions_(0),
bg_flush_scheduled_(0),
@ -242,7 +243,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
return;
}
// Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) {
bg_cv_.Wait();
}
}
@ -252,15 +254,18 @@ DBImpl::~DBImpl() {
// marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
CancelAllBackgroundWork(false);
int bottom_compactions_unscheduled =
env_->UnSchedule(this, Env::Priority::BOTTOM);
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
mutex_.Lock();
bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
bg_compaction_scheduled_ -= compactions_unscheduled;
bg_flush_scheduled_ -= flushes_unscheduled;
// Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_ ||
bg_purge_scheduled_) {
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ || bg_purge_scheduled_) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait();
}

View File

@ -658,6 +658,7 @@ class DBImpl : public DB {
}
};
struct PrepickedCompaction;
struct PurgeFileInfo;
// Recover the descriptor from persistent storage. May do a significant
@ -799,14 +800,19 @@ class DBImpl : public DB {
void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
uint32_t path_id, int job_id);
static void BGWorkCompaction(void* arg);
// Runs a pre-chosen universal compaction involving bottom level in a
// separate, bottom-pri thread pool.
static void BGWorkBottomCompaction(void* arg);
static void BGWorkFlush(void* db);
static void BGWorkPurge(void* arg);
static void UnscheduleCallback(void* arg);
void BackgroundCallCompaction(void* arg);
void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
Env::Priority bg_thread_pri);
void BackgroundCallFlush();
void BackgroundCallPurge();
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer, void* m = 0);
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer);
@ -1059,6 +1065,10 @@ class DBImpl : public DB {
int unscheduled_flushes_;
int unscheduled_compactions_;
// count how many background compactions are running or have been scheduled in
// the BOTTOM pool
int bg_bottom_compaction_scheduled_;
// count how many background compactions are running or have been scheduled
int bg_compaction_scheduled_;
@ -1075,7 +1085,7 @@ class DBImpl : public DB {
int bg_purge_scheduled_;
// Information for a manual compaction
struct ManualCompaction {
struct ManualCompactionState {
ColumnFamilyData* cfd;
int input_level;
int output_level;
@ -1091,13 +1101,21 @@ class DBImpl : public DB {
InternalKey* manual_end; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress
InternalKey tmp_storage1; // Used to keep track of compaction progress
Compaction* compaction;
};
std::deque<ManualCompaction*> manual_compaction_dequeue_;
struct PrepickedCompaction {
// background compaction takes ownership of `compaction`.
Compaction* compaction;
// caller retains ownership of `manual_compaction_state` as it is reused
// across background compactions.
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
};
std::deque<ManualCompactionState*> manual_compaction_dequeue_;
struct CompactionArg {
// caller retains ownership of `db`.
DBImpl* db;
ManualCompaction* m;
// background compaction takes ownership of `prepicked_compaction`.
PrepickedCompaction* prepicked_compaction;
};
// Have we encountered a background error in paranoid mode?
@ -1231,11 +1249,11 @@ class DBImpl : public DB {
bool HasPendingManualCompaction();
bool HasExclusiveManualCompaction();
void AddManualCompaction(ManualCompaction* m);
void RemoveManualCompaction(ManualCompaction* m);
bool ShouldntRunManualCompaction(ManualCompaction* m);
void AddManualCompaction(ManualCompactionState* m);
void RemoveManualCompaction(ManualCompactionState* m);
bool ShouldntRunManualCompaction(ManualCompactionState* m);
bool HaveManualCompaction(ColumnFamilyData* cfd);
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;

View File

@ -612,7 +612,8 @@ Status DBImpl::CompactFilesImpl(
Status DBImpl::PauseBackgroundWork() {
InstrumentedMutexLock guard_lock(&mutex_);
bg_compaction_paused_++;
while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_ > 0) {
while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
bg_flush_scheduled_ > 0) {
bg_cv_.Wait();
}
bg_work_paused_++;
@ -808,7 +809,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
bool scheduled = false;
bool manual_conflict = false;
ManualCompaction manual;
ManualCompactionState manual;
manual.cfd = cfd;
manual.input_level = input_level;
manual.output_level = output_level;
@ -858,7 +859,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
AddManualCompaction(&manual);
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
if (exclusive) {
while (bg_compaction_scheduled_ > 0) {
while (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0) {
TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
@ -878,14 +880,14 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
while (!manual.done) {
assert(HasPendingManualCompaction());
manual_conflict = false;
Compaction* compaction;
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
scheduled ||
((manual.manual_end = &manual.tmp_storage1)&&(
(manual.compaction = manual.cfd->CompactRange(
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
manual.output_level, manual.output_path_id, manual.begin,
manual.end, &manual.manual_end, &manual_conflict)) ==
nullptr) &&
((manual.manual_end = &manual.tmp_storage1) &&
((compaction = manual.cfd->CompactRange(
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
manual.output_level, manual.output_path_id, manual.begin,
manual.end, &manual.manual_end, &manual_conflict)) == nullptr) &&
manual_conflict)) {
// exclusive manual compactions should not see a conflict during
// CompactRange
@ -898,14 +900,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.incomplete = false;
}
} else if (!scheduled) {
if (manual.compaction == nullptr) {
if (compaction == nullptr) {
manual.done = true;
bg_cv_.SignalAll();
continue;
}
ca = new CompactionArg;
ca->db = this;
ca->m = &manual;
ca->prepicked_compaction = new PrepickedCompaction;
ca->prepicked_compaction->manual_compaction_state = &manual;
ca->prepicked_compaction->compaction = compaction;
manual.incomplete = false;
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
@ -1047,7 +1051,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->m = nullptr;
ca->prepicked_compaction = nullptr;
bg_compaction_scheduled_++;
unscheduled_compactions_--;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
@ -1152,7 +1156,23 @@ void DBImpl::BGWorkCompaction(void* arg) {
delete reinterpret_cast<CompactionArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(ca.m);
auto prepicked_compaction =
static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
prepicked_compaction, Env::Priority::LOW);
delete prepicked_compaction;
}
void DBImpl::BGWorkBottomCompaction(void* arg) {
CompactionArg ca = *(static_cast<CompactionArg*>(arg));
delete static_cast<CompactionArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
auto* prepicked_compaction = ca.prepicked_compaction;
assert(prepicked_compaction && prepicked_compaction->compaction &&
!prepicked_compaction->manual_compaction_state);
ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
delete prepicked_compaction;
}
void DBImpl::BGWorkPurge(void* db) {
@ -1165,8 +1185,11 @@ void DBImpl::BGWorkPurge(void* db) {
void DBImpl::UnscheduleCallback(void* arg) {
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
delete reinterpret_cast<CompactionArg*>(arg);
if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) {
delete ca.m->compaction;
if (ca.prepicked_compaction != nullptr) {
if (ca.prepicked_compaction->compaction != nullptr) {
delete ca.prepicked_compaction->compaction;
}
delete ca.prepicked_compaction;
}
TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
}
@ -1293,9 +1316,9 @@ void DBImpl::BackgroundCallFlush() {
}
}
void DBImpl::BackgroundCallCompaction(void* arg) {
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
Env::Priority bg_thread_pri) {
bool made_progress = false;
ManualCompaction* m = reinterpret_cast<ManualCompaction*>(arg);
JobContext job_context(next_job_id_.fetch_add(1), true);
TEST_SYNC_POINT("BackgroundCallCompaction:0");
MaybeDumpStats();
@ -1313,9 +1336,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
assert(bg_compaction_scheduled_);
Status s =
BackgroundCompaction(&made_progress, &job_context, &log_buffer, m);
assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
prepicked_compaction);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background compaction in
@ -1361,17 +1386,24 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
assert(num_running_compactions_ > 0);
num_running_compactions_--;
bg_compaction_scheduled_--;
if (bg_thread_pri == Env::Priority::LOW) {
bg_compaction_scheduled_--;
} else {
assert(bg_thread_pri == Env::Priority::BOTTOM);
bg_bottom_compaction_scheduled_--;
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
if (made_progress || bg_compaction_scheduled_ == 0 ||
if (made_progress ||
(bg_compaction_scheduled_ == 0 &&
bg_bottom_compaction_scheduled_ == 0) ||
HasPendingManualCompaction()) {
// signal if
// * made_progress -- need to wakeup DelayWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is
// waiting for it
@ -1386,14 +1418,23 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
Status DBImpl::BackgroundCompaction(bool* made_progress,
JobContext* job_context,
LogBuffer* log_buffer, void* arg) {
ManualCompaction* manual_compaction =
reinterpret_cast<ManualCompaction*>(arg);
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction) {
ManualCompactionState* manual_compaction =
prepicked_compaction == nullptr
? nullptr
: prepicked_compaction->manual_compaction_state;
*made_progress = false;
mutex_.AssertHeld();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
bool is_manual = (manual_compaction != nullptr);
unique_ptr<Compaction> c;
if (prepicked_compaction != nullptr &&
prepicked_compaction->compaction != nullptr) {
c.reset(prepicked_compaction->compaction);
}
bool is_prepicked = is_manual || c;
// (manual_compaction->in_progress == false);
bool trivial_move_disallowed =
@ -1410,7 +1451,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
manual_compaction->status = status;
manual_compaction->done = true;
manual_compaction->in_progress = false;
delete manual_compaction->compaction;
manual_compaction = nullptr;
}
return status;
@ -1421,13 +1461,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
manual_compaction->in_progress = true;
}
unique_ptr<Compaction> c;
// InternalKey manual_end_storage;
// InternalKey* manual_end = &manual_end_storage;
if (is_manual) {
ManualCompaction* m = manual_compaction;
ManualCompactionState* m = manual_compaction;
assert(m->in_progress);
c.reset(std::move(m->compaction));
if (!c) {
m->done = true;
m->manual_end = nullptr;
@ -1449,7 +1487,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
? "(end)"
: m->manual_end->DebugString().c_str()));
}
} else if (!compaction_queue_.empty()) {
} else if (!is_prepicked && !compaction_queue_.empty()) {
if (HaveManualCompaction(compaction_queue_.front())) {
// Can't compact right now, but try again later
TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
@ -1601,6 +1639,28 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Clear Instrument
ThreadStatusUtil::ResetThreadStatus();
} else if (c->column_family_data()->ioptions()->compaction_style ==
kCompactionStyleUniversal &&
!is_prepicked && c->output_level() > 0 &&
c->output_level() ==
c->column_family_data()
->current()
->storage_info()
->MaxOutputLevel(
immutable_db_options_.allow_ingest_behind) &&
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
// Forward universal compactions involving last level to the bottom pool
// if it exists, such that long-running compactions can't block short-
// lived ones, like L0->L0s.
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->prepicked_compaction = new PrepickedCompaction;
ca->prepicked_compaction->compaction = c.release();
ca->prepicked_compaction->manual_compaction_state = nullptr;
++bg_bottom_compaction_scheduled_;
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
this, &DBImpl::UnscheduleCallback);
} else {
int output_level __attribute__((unused)) = c->output_level();
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
@ -1664,7 +1724,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
if (is_manual) {
ManualCompaction* m = manual_compaction;
ManualCompactionState* m = manual_compaction;
if (!status.ok()) {
m->status = status;
m->done = true;
@ -1707,13 +1767,13 @@ bool DBImpl::HasPendingManualCompaction() {
return (!manual_compaction_dequeue_.empty());
}
void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) {
void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
manual_compaction_dequeue_.push_back(m);
}
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
// Remove from queue
std::deque<ManualCompaction*>::iterator it =
std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if (m == (*it)) {
@ -1726,16 +1786,17 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
return;
}
bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
if (num_running_ingest_file_ > 0) {
// We need to wait for other IngestExternalFile() calls to finish
// before running a manual compaction.
return true;
}
if (m->exclusive) {
return (bg_compaction_scheduled_ > 0);
return (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0);
}
std::deque<ManualCompaction*>::iterator it =
std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
bool seen = false;
while (it != manual_compaction_dequeue_.end()) {
@ -1756,7 +1817,7 @@ bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
// Remove from priority queue
std::deque<ManualCompaction*>::iterator it =
std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) {
@ -1774,7 +1835,7 @@ bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
bool DBImpl::HasExclusiveManualCompaction() {
// Remove from priority queue
std::deque<ManualCompaction*>::iterator it =
std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) {
@ -1785,7 +1846,7 @@ bool DBImpl::HasExclusiveManualCompaction() {
return false;
}
bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
if ((m->exclusive) || (m1->exclusive)) {
return true;
}

View File

@ -112,7 +112,9 @@ Status DBImpl::TEST_WaitForCompact() {
// OR flush to finish.
InstrumentedMutexLock l(&mutex_);
while ((bg_compaction_scheduled_ || bg_flush_scheduled_) && bg_error_.ok()) {
while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) &&
bg_error_.ok()) {
bg_cv_.Wait();
}
return bg_error_;

View File

@ -1370,6 +1370,103 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
Destroy(options);
}
TEST_P(DBTestUniversalCompaction, FullCompactionInBottomPriThreadPool) {
const int kNumFilesTrigger = 3;
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
for (bool allow_ingest_behind : {false, true}) {
Options options = CurrentOptions();
options.allow_ingest_behind = allow_ingest_behind;
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
// Trigger compaction if size amplification exceeds 110%
options.compaction_options_universal.max_size_amplification_percent = 110;
DestroyAndReopen(options);
int num_bottom_pri_compactions = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BGWorkBottomCompaction",
[&](void* arg) { ++num_bottom_pri_compactions; });
SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int num = 0; num < kNumFilesTrigger; num++) {
ASSERT_EQ(NumSortedRuns(), num);
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
}
dbfull()->TEST_WaitForCompact();
if (allow_ingest_behind || num_levels_ > 1) {
// allow_ingest_behind increases number of levels while sanitizing.
ASSERT_EQ(1, num_bottom_pri_compactions);
} else {
// for single-level universal, everything's bottom level so nothing should
// be executed in bottom-pri thread pool.
ASSERT_EQ(0, num_bottom_pri_compactions);
}
// Verify that size amplification did occur
ASSERT_EQ(NumSortedRuns(), 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
}
TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
if (num_levels_ == 1) {
// for single-level universal, everything's bottom level so nothing should
// be executed in bottom-pri thread pool.
return;
}
const int kNumFilesTrigger = 3;
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
// Trigger compaction if size amplification exceeds 110%
options.compaction_options_universal.max_size_amplification_percent = 110;
DestroyAndReopen(options);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{// wait for the full compaction to be picked before adding files intended
// for the second one.
{"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"},
// the full (bottom-pri) compaction waits until a partial (low-pri)
// compaction has started to verify they can run in parallel.
{"DBImpl::BackgroundCompaction:NonTrivial",
"DBImpl::BGWorkBottomCompaction"}});
SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int i = 0; i < 2; ++i) {
for (int num = 0; num < kNumFilesTrigger; num++) {
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
// use no_wait above because that one waits for flush and compaction. We
// don't want to wait for compaction because the full compaction is
// intentionally blocked while more files are flushed.
dbfull()->TEST_WaitForFlushMemTable();
}
if (i == 0) {
TEST_SYNC_POINT(
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0");
}
}
dbfull()->TEST_WaitForCompact();
// First compaction should output to bottom level. Second should output to L0
// since older L0 files pending compaction prevent it from being placed lower.
ASSERT_EQ(NumSortedRuns(), 2);
ASSERT_GT(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(num_levels_ - 1), 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction,
::testing::Combine(::testing::Values(1, 3, 5),
::testing::Bool()));

View File

@ -1227,6 +1227,14 @@ int VersionStorageInfo::MaxInputLevel() const {
return 0;
}
int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
if (allow_ingest_behind) {
assert(num_levels() > 1);
return num_levels() - 2;
}
return num_levels() - 1;
}
void VersionStorageInfo::EstimateCompactionBytesNeeded(
const MutableCFOptions& mutable_cf_options) {
// Only implemented for level-based compaction

View File

@ -147,6 +147,7 @@ class VersionStorageInfo {
}
int MaxInputLevel() const;
int MaxOutputLevel(bool allow_ingest_behind) const;
// Return level number that has idx'th highest score
int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; }

12
env/env_posix.cc vendored
View File

@ -761,23 +761,23 @@ class PosixEnv : public Env {
// Allow increasing the number of worker threads.
virtual void SetBackgroundThreads(int num, Priority pri) override {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].SetBackgroundThreads(num);
}
virtual int GetBackgroundThreads(Priority pri) override {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
return thread_pools_[pri].GetBackgroundThreads();
}
// Allow increasing the number of worker threads.
virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
}
virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override {
assert(pool >= Priority::LOW && pool <= Priority::HIGH);
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
#ifdef OS_LINUX
thread_pools_[pool].LowerIOPriority();
#endif
@ -883,7 +883,7 @@ PosixEnv::PosixEnv()
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
void* tag, void (*unschedFunction)(void* arg)) {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
}
@ -892,7 +892,7 @@ int PosixEnv::UnSchedule(void* arg, Priority pri) {
}
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
return thread_pools_[pri].GetQueueLen();
}

14
env/env_test.cc vendored
View File

@ -125,12 +125,14 @@ static void SetBool(void* ptr) {
reinterpret_cast<std::atomic<bool>*>(ptr)->store(true);
}
TEST_P(EnvPosixTestWithParam, RunImmediately) {
std::atomic<bool> called(false);
env_->Schedule(&SetBool, &called);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(called.load());
WaitThreadPoolsEmpty();
TEST_F(EnvPosixTest, RunImmediately) {
for (int pri = Env::BOTTOM; pri < Env::TOTAL; ++pri) {
std::atomic<bool> called(false);
env_->SetBackgroundThreads(1, static_cast<Env::Priority>(pri));
env_->Schedule(&SetBool, &called, static_cast<Env::Priority>(pri));
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(called.load());
}
}
TEST_P(EnvPosixTestWithParam, UnSchedule) {

View File

@ -283,7 +283,7 @@ class Env {
virtual Status UnlockFile(FileLock* lock) = 0;
// Priority for scheduling job in thread pool
enum Priority { LOW, HIGH, TOTAL };
enum Priority { BOTTOM, LOW, HIGH, TOTAL };
// Priority for requesting bytes in rate limiter scheduler
enum IOPriority {

View File

@ -571,6 +571,7 @@ static void RunConcurrentRead(int run) {
fprintf(stderr, "Run %d of %d\n", i, N);
}
TestState state(seed + 1);
Env::Default()->SetBackgroundThreads(1);
Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; ++k) {

View File

@ -363,6 +363,7 @@ static void RunConcurrent(int run) {
fprintf(stderr, "Run %d of %d\n", i, N);
}
TestState state(seed + 1);
Env::Default()->SetBackgroundThreads(1);
Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; k++) {

View File

@ -318,6 +318,10 @@ DEFINE_int32(max_background_jobs,
"The maximum number of concurrent background jobs that can occur "
"in parallel.");
DEFINE_int32(num_bottom_pri_threads, 0,
"The number of threads in the bottom-priority thread pool (used "
"by universal compaction only).");
DEFINE_int32(max_background_compactions,
rocksdb::Options().max_background_compactions,
"The maximum number of concurrent background compactions"
@ -5242,6 +5246,8 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes,
rocksdb::Env::Priority::HIGH);
FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
rocksdb::Env::Priority::BOTTOM);
// Choose a location for the test database if none given with --db=<path>
if (FLAGS_db.empty()) {

View File

@ -123,11 +123,11 @@ private:
inline
ThreadPoolImpl::Impl::Impl()
:
:
low_io_priority_(false),
priority_(Env::LOW),
env_(nullptr),
total_threads_limit_(1),
total_threads_limit_(0),
queue_len_(),
exit_all_threads_(false),
wait_for_jobs_to_complete_(false),
@ -372,7 +372,7 @@ int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
return count;
}
ThreadPoolImpl::ThreadPoolImpl() :
ThreadPoolImpl::ThreadPoolImpl() :
impl_(new Impl()) {
}