Introduce bottom-pri thread pool for large universal compactions
Summary: When we had a single thread pool for compactions, a thread could be busy for a long time (minutes) executing a compaction involving the bottom level. In multi-instance setups, the entire thread pool could be consumed by such bottom-level compactions. Then, top-level compactions (e.g., a few L0 files) would be blocked for a long time ("head-of-line blocking"). Such top-level compactions are critical to prevent compaction stalls as they can quickly reduce number of L0 files / sorted runs. This diff introduces a bottom-priority queue for universal compactions including the bottom level. This alleviates the head-of-line blocking situation for fast, top-level compactions. - Added `Env::Priority::BOTTOM` thread pool. This feature is only enabled if user explicitly configures it to have a positive number of threads. - Changed `ThreadPoolImpl`'s default thread limit from one to zero. This change is invisible to users as we call `IncBackgroundThreadsIfNeeded` on the low-pri/high-pri pools during `DB::Open` with values of at least one. It is necessary, though, for bottom-pri to start with zero threads so the feature is disabled by default. - Separated `ManualCompaction` into two parts in `PrepickedCompaction`. `PrepickedCompaction` is used for any compaction that's picked outside of its execution thread, either manual or automatic. - Forward universal compactions involving last level to the bottom pool (worker thread's entry point is `BGWorkBottomCompaction`). - Track `bg_bottom_compaction_scheduled_` so we can wait for bottom-level compactions to finish. We don't count them against the background jobs limits. So users of this feature will get an extra compaction for free. Closes https://github.com/facebook/rocksdb/pull/2580 Differential Revision: D5422916 Pulled By: ajkr fbshipit-source-id: a74bd11f1ea4933df3739b16808bb21fcd512333
This commit is contained in:
parent
0b814ba92d
commit
cc01985db0
@ -3,6 +3,7 @@
|
||||
### New Features
|
||||
* Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators.
|
||||
* Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1.
|
||||
* Universal compactions including the bottom level can be executed in a dedicated thread pool. This alleviates head-of-line blocking in the compaction queue, which cause write stalling, particularly in multi-instance use cases. Users can enable this feature via `Env::SetBackgroundThreads(N, Env::Priority::BOTTOM)`, where `N > 0`.
|
||||
|
||||
### Bug Fixes
|
||||
* Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`.
|
||||
|
@ -168,6 +168,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
||||
last_batch_group_size_(0),
|
||||
unscheduled_flushes_(0),
|
||||
unscheduled_compactions_(0),
|
||||
bg_bottom_compaction_scheduled_(0),
|
||||
bg_compaction_scheduled_(0),
|
||||
num_running_compactions_(0),
|
||||
bg_flush_scheduled_(0),
|
||||
@ -242,7 +243,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
|
||||
return;
|
||||
}
|
||||
// Wait for background work to finish
|
||||
while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
|
||||
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
|
||||
bg_flush_scheduled_) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
}
|
||||
@ -252,15 +254,18 @@ DBImpl::~DBImpl() {
|
||||
// marker. After this we do a variant of the waiting and unschedule work
|
||||
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
|
||||
CancelAllBackgroundWork(false);
|
||||
int bottom_compactions_unscheduled =
|
||||
env_->UnSchedule(this, Env::Priority::BOTTOM);
|
||||
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
|
||||
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
|
||||
mutex_.Lock();
|
||||
bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
|
||||
bg_compaction_scheduled_ -= compactions_unscheduled;
|
||||
bg_flush_scheduled_ -= flushes_unscheduled;
|
||||
|
||||
// Wait for background work to finish
|
||||
while (bg_compaction_scheduled_ || bg_flush_scheduled_ ||
|
||||
bg_purge_scheduled_) {
|
||||
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
|
||||
bg_flush_scheduled_ || bg_purge_scheduled_) {
|
||||
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
|
38
db/db_impl.h
38
db/db_impl.h
@ -658,6 +658,7 @@ class DBImpl : public DB {
|
||||
}
|
||||
};
|
||||
|
||||
struct PrepickedCompaction;
|
||||
struct PurgeFileInfo;
|
||||
|
||||
// Recover the descriptor from persistent storage. May do a significant
|
||||
@ -799,14 +800,19 @@ class DBImpl : public DB {
|
||||
void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
|
||||
uint32_t path_id, int job_id);
|
||||
static void BGWorkCompaction(void* arg);
|
||||
// Runs a pre-chosen universal compaction involving bottom level in a
|
||||
// separate, bottom-pri thread pool.
|
||||
static void BGWorkBottomCompaction(void* arg);
|
||||
static void BGWorkFlush(void* db);
|
||||
static void BGWorkPurge(void* arg);
|
||||
static void UnscheduleCallback(void* arg);
|
||||
void BackgroundCallCompaction(void* arg);
|
||||
void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
||||
Env::Priority bg_thread_pri);
|
||||
void BackgroundCallFlush();
|
||||
void BackgroundCallPurge();
|
||||
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
||||
LogBuffer* log_buffer, void* m = 0);
|
||||
LogBuffer* log_buffer,
|
||||
PrepickedCompaction* prepicked_compaction);
|
||||
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
@ -1059,6 +1065,10 @@ class DBImpl : public DB {
|
||||
int unscheduled_flushes_;
|
||||
int unscheduled_compactions_;
|
||||
|
||||
// count how many background compactions are running or have been scheduled in
|
||||
// the BOTTOM pool
|
||||
int bg_bottom_compaction_scheduled_;
|
||||
|
||||
// count how many background compactions are running or have been scheduled
|
||||
int bg_compaction_scheduled_;
|
||||
|
||||
@ -1075,7 +1085,7 @@ class DBImpl : public DB {
|
||||
int bg_purge_scheduled_;
|
||||
|
||||
// Information for a manual compaction
|
||||
struct ManualCompaction {
|
||||
struct ManualCompactionState {
|
||||
ColumnFamilyData* cfd;
|
||||
int input_level;
|
||||
int output_level;
|
||||
@ -1091,13 +1101,21 @@ class DBImpl : public DB {
|
||||
InternalKey* manual_end; // how far we are compacting
|
||||
InternalKey tmp_storage; // Used to keep track of compaction progress
|
||||
InternalKey tmp_storage1; // Used to keep track of compaction progress
|
||||
Compaction* compaction;
|
||||
};
|
||||
std::deque<ManualCompaction*> manual_compaction_dequeue_;
|
||||
struct PrepickedCompaction {
|
||||
// background compaction takes ownership of `compaction`.
|
||||
Compaction* compaction;
|
||||
// caller retains ownership of `manual_compaction_state` as it is reused
|
||||
// across background compactions.
|
||||
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
|
||||
};
|
||||
std::deque<ManualCompactionState*> manual_compaction_dequeue_;
|
||||
|
||||
struct CompactionArg {
|
||||
// caller retains ownership of `db`.
|
||||
DBImpl* db;
|
||||
ManualCompaction* m;
|
||||
// background compaction takes ownership of `prepicked_compaction`.
|
||||
PrepickedCompaction* prepicked_compaction;
|
||||
};
|
||||
|
||||
// Have we encountered a background error in paranoid mode?
|
||||
@ -1231,11 +1249,11 @@ class DBImpl : public DB {
|
||||
|
||||
bool HasPendingManualCompaction();
|
||||
bool HasExclusiveManualCompaction();
|
||||
void AddManualCompaction(ManualCompaction* m);
|
||||
void RemoveManualCompaction(ManualCompaction* m);
|
||||
bool ShouldntRunManualCompaction(ManualCompaction* m);
|
||||
void AddManualCompaction(ManualCompactionState* m);
|
||||
void RemoveManualCompaction(ManualCompactionState* m);
|
||||
bool ShouldntRunManualCompaction(ManualCompactionState* m);
|
||||
bool HaveManualCompaction(ColumnFamilyData* cfd);
|
||||
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
|
||||
bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
|
||||
|
||||
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
|
||||
|
||||
|
@ -612,7 +612,8 @@ Status DBImpl::CompactFilesImpl(
|
||||
Status DBImpl::PauseBackgroundWork() {
|
||||
InstrumentedMutexLock guard_lock(&mutex_);
|
||||
bg_compaction_paused_++;
|
||||
while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_ > 0) {
|
||||
while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
|
||||
bg_flush_scheduled_ > 0) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
bg_work_paused_++;
|
||||
@ -808,7 +809,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
|
||||
bool scheduled = false;
|
||||
bool manual_conflict = false;
|
||||
ManualCompaction manual;
|
||||
ManualCompactionState manual;
|
||||
manual.cfd = cfd;
|
||||
manual.input_level = input_level;
|
||||
manual.output_level = output_level;
|
||||
@ -858,7 +859,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
AddManualCompaction(&manual);
|
||||
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
|
||||
if (exclusive) {
|
||||
while (bg_compaction_scheduled_ > 0) {
|
||||
while (bg_bottom_compaction_scheduled_ > 0 ||
|
||||
bg_compaction_scheduled_ > 0) {
|
||||
TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
|
||||
ROCKS_LOG_INFO(
|
||||
immutable_db_options_.info_log,
|
||||
@ -878,14 +880,14 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
while (!manual.done) {
|
||||
assert(HasPendingManualCompaction());
|
||||
manual_conflict = false;
|
||||
Compaction* compaction;
|
||||
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
|
||||
scheduled ||
|
||||
((manual.manual_end = &manual.tmp_storage1)&&(
|
||||
(manual.compaction = manual.cfd->CompactRange(
|
||||
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
|
||||
manual.output_level, manual.output_path_id, manual.begin,
|
||||
manual.end, &manual.manual_end, &manual_conflict)) ==
|
||||
nullptr) &&
|
||||
((manual.manual_end = &manual.tmp_storage1) &&
|
||||
((compaction = manual.cfd->CompactRange(
|
||||
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
|
||||
manual.output_level, manual.output_path_id, manual.begin,
|
||||
manual.end, &manual.manual_end, &manual_conflict)) == nullptr) &&
|
||||
manual_conflict)) {
|
||||
// exclusive manual compactions should not see a conflict during
|
||||
// CompactRange
|
||||
@ -898,14 +900,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
manual.incomplete = false;
|
||||
}
|
||||
} else if (!scheduled) {
|
||||
if (manual.compaction == nullptr) {
|
||||
if (compaction == nullptr) {
|
||||
manual.done = true;
|
||||
bg_cv_.SignalAll();
|
||||
continue;
|
||||
}
|
||||
ca = new CompactionArg;
|
||||
ca->db = this;
|
||||
ca->m = &manual;
|
||||
ca->prepicked_compaction = new PrepickedCompaction;
|
||||
ca->prepicked_compaction->manual_compaction_state = &manual;
|
||||
ca->prepicked_compaction->compaction = compaction;
|
||||
manual.incomplete = false;
|
||||
bg_compaction_scheduled_++;
|
||||
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
||||
@ -1047,7 +1051,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
|
||||
unscheduled_compactions_ > 0) {
|
||||
CompactionArg* ca = new CompactionArg;
|
||||
ca->db = this;
|
||||
ca->m = nullptr;
|
||||
ca->prepicked_compaction = nullptr;
|
||||
bg_compaction_scheduled_++;
|
||||
unscheduled_compactions_--;
|
||||
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
||||
@ -1152,7 +1156,23 @@ void DBImpl::BGWorkCompaction(void* arg) {
|
||||
delete reinterpret_cast<CompactionArg*>(arg);
|
||||
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
|
||||
TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
|
||||
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(ca.m);
|
||||
auto prepicked_compaction =
|
||||
static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
|
||||
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
|
||||
prepicked_compaction, Env::Priority::LOW);
|
||||
delete prepicked_compaction;
|
||||
}
|
||||
|
||||
void DBImpl::BGWorkBottomCompaction(void* arg) {
|
||||
CompactionArg ca = *(static_cast<CompactionArg*>(arg));
|
||||
delete static_cast<CompactionArg*>(arg);
|
||||
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
|
||||
TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
|
||||
auto* prepicked_compaction = ca.prepicked_compaction;
|
||||
assert(prepicked_compaction && prepicked_compaction->compaction &&
|
||||
!prepicked_compaction->manual_compaction_state);
|
||||
ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
|
||||
delete prepicked_compaction;
|
||||
}
|
||||
|
||||
void DBImpl::BGWorkPurge(void* db) {
|
||||
@ -1165,8 +1185,11 @@ void DBImpl::BGWorkPurge(void* db) {
|
||||
void DBImpl::UnscheduleCallback(void* arg) {
|
||||
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
|
||||
delete reinterpret_cast<CompactionArg*>(arg);
|
||||
if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) {
|
||||
delete ca.m->compaction;
|
||||
if (ca.prepicked_compaction != nullptr) {
|
||||
if (ca.prepicked_compaction->compaction != nullptr) {
|
||||
delete ca.prepicked_compaction->compaction;
|
||||
}
|
||||
delete ca.prepicked_compaction;
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
|
||||
}
|
||||
@ -1293,9 +1316,9 @@ void DBImpl::BackgroundCallFlush() {
|
||||
}
|
||||
}
|
||||
|
||||
void DBImpl::BackgroundCallCompaction(void* arg) {
|
||||
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
||||
Env::Priority bg_thread_pri) {
|
||||
bool made_progress = false;
|
||||
ManualCompaction* m = reinterpret_cast<ManualCompaction*>(arg);
|
||||
JobContext job_context(next_job_id_.fetch_add(1), true);
|
||||
TEST_SYNC_POINT("BackgroundCallCompaction:0");
|
||||
MaybeDumpStats();
|
||||
@ -1313,9 +1336,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
|
||||
auto pending_outputs_inserted_elem =
|
||||
CaptureCurrentFileNumberInPendingOutputs();
|
||||
|
||||
assert(bg_compaction_scheduled_);
|
||||
Status s =
|
||||
BackgroundCompaction(&made_progress, &job_context, &log_buffer, m);
|
||||
assert((bg_thread_pri == Env::Priority::BOTTOM &&
|
||||
bg_bottom_compaction_scheduled_) ||
|
||||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
|
||||
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
|
||||
prepicked_compaction);
|
||||
TEST_SYNC_POINT("BackgroundCallCompaction:1");
|
||||
if (!s.ok() && !s.IsShutdownInProgress()) {
|
||||
// Wait a little bit before retrying background compaction in
|
||||
@ -1361,17 +1386,24 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
|
||||
|
||||
assert(num_running_compactions_ > 0);
|
||||
num_running_compactions_--;
|
||||
bg_compaction_scheduled_--;
|
||||
if (bg_thread_pri == Env::Priority::LOW) {
|
||||
bg_compaction_scheduled_--;
|
||||
} else {
|
||||
assert(bg_thread_pri == Env::Priority::BOTTOM);
|
||||
bg_bottom_compaction_scheduled_--;
|
||||
}
|
||||
|
||||
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
|
||||
|
||||
// See if there's more work to be done
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
if (made_progress || bg_compaction_scheduled_ == 0 ||
|
||||
if (made_progress ||
|
||||
(bg_compaction_scheduled_ == 0 &&
|
||||
bg_bottom_compaction_scheduled_ == 0) ||
|
||||
HasPendingManualCompaction()) {
|
||||
// signal if
|
||||
// * made_progress -- need to wakeup DelayWrite
|
||||
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
|
||||
// * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
|
||||
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction
|
||||
// If none of this is true, there is no need to signal since nobody is
|
||||
// waiting for it
|
||||
@ -1386,14 +1418,23 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
|
||||
|
||||
Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
JobContext* job_context,
|
||||
LogBuffer* log_buffer, void* arg) {
|
||||
ManualCompaction* manual_compaction =
|
||||
reinterpret_cast<ManualCompaction*>(arg);
|
||||
LogBuffer* log_buffer,
|
||||
PrepickedCompaction* prepicked_compaction) {
|
||||
ManualCompactionState* manual_compaction =
|
||||
prepicked_compaction == nullptr
|
||||
? nullptr
|
||||
: prepicked_compaction->manual_compaction_state;
|
||||
*made_progress = false;
|
||||
mutex_.AssertHeld();
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
|
||||
|
||||
bool is_manual = (manual_compaction != nullptr);
|
||||
unique_ptr<Compaction> c;
|
||||
if (prepicked_compaction != nullptr &&
|
||||
prepicked_compaction->compaction != nullptr) {
|
||||
c.reset(prepicked_compaction->compaction);
|
||||
}
|
||||
bool is_prepicked = is_manual || c;
|
||||
|
||||
// (manual_compaction->in_progress == false);
|
||||
bool trivial_move_disallowed =
|
||||
@ -1410,7 +1451,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
manual_compaction->status = status;
|
||||
manual_compaction->done = true;
|
||||
manual_compaction->in_progress = false;
|
||||
delete manual_compaction->compaction;
|
||||
manual_compaction = nullptr;
|
||||
}
|
||||
return status;
|
||||
@ -1421,13 +1461,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
manual_compaction->in_progress = true;
|
||||
}
|
||||
|
||||
unique_ptr<Compaction> c;
|
||||
// InternalKey manual_end_storage;
|
||||
// InternalKey* manual_end = &manual_end_storage;
|
||||
if (is_manual) {
|
||||
ManualCompaction* m = manual_compaction;
|
||||
ManualCompactionState* m = manual_compaction;
|
||||
assert(m->in_progress);
|
||||
c.reset(std::move(m->compaction));
|
||||
if (!c) {
|
||||
m->done = true;
|
||||
m->manual_end = nullptr;
|
||||
@ -1449,7 +1487,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
? "(end)"
|
||||
: m->manual_end->DebugString().c_str()));
|
||||
}
|
||||
} else if (!compaction_queue_.empty()) {
|
||||
} else if (!is_prepicked && !compaction_queue_.empty()) {
|
||||
if (HaveManualCompaction(compaction_queue_.front())) {
|
||||
// Can't compact right now, but try again later
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
|
||||
@ -1601,6 +1639,28 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
|
||||
// Clear Instrument
|
||||
ThreadStatusUtil::ResetThreadStatus();
|
||||
} else if (c->column_family_data()->ioptions()->compaction_style ==
|
||||
kCompactionStyleUniversal &&
|
||||
!is_prepicked && c->output_level() > 0 &&
|
||||
c->output_level() ==
|
||||
c->column_family_data()
|
||||
->current()
|
||||
->storage_info()
|
||||
->MaxOutputLevel(
|
||||
immutable_db_options_.allow_ingest_behind) &&
|
||||
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
|
||||
// Forward universal compactions involving last level to the bottom pool
|
||||
// if it exists, such that long-running compactions can't block short-
|
||||
// lived ones, like L0->L0s.
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
|
||||
CompactionArg* ca = new CompactionArg;
|
||||
ca->db = this;
|
||||
ca->prepicked_compaction = new PrepickedCompaction;
|
||||
ca->prepicked_compaction->compaction = c.release();
|
||||
ca->prepicked_compaction->manual_compaction_state = nullptr;
|
||||
++bg_bottom_compaction_scheduled_;
|
||||
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
|
||||
this, &DBImpl::UnscheduleCallback);
|
||||
} else {
|
||||
int output_level __attribute__((unused)) = c->output_level();
|
||||
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
|
||||
@ -1664,7 +1724,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
}
|
||||
|
||||
if (is_manual) {
|
||||
ManualCompaction* m = manual_compaction;
|
||||
ManualCompactionState* m = manual_compaction;
|
||||
if (!status.ok()) {
|
||||
m->status = status;
|
||||
m->done = true;
|
||||
@ -1707,13 +1767,13 @@ bool DBImpl::HasPendingManualCompaction() {
|
||||
return (!manual_compaction_dequeue_.empty());
|
||||
}
|
||||
|
||||
void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) {
|
||||
void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
|
||||
manual_compaction_dequeue_.push_back(m);
|
||||
}
|
||||
|
||||
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
|
||||
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
|
||||
// Remove from queue
|
||||
std::deque<ManualCompaction*>::iterator it =
|
||||
std::deque<ManualCompactionState*>::iterator it =
|
||||
manual_compaction_dequeue_.begin();
|
||||
while (it != manual_compaction_dequeue_.end()) {
|
||||
if (m == (*it)) {
|
||||
@ -1726,16 +1786,17 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
|
||||
return;
|
||||
}
|
||||
|
||||
bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
|
||||
bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
|
||||
if (num_running_ingest_file_ > 0) {
|
||||
// We need to wait for other IngestExternalFile() calls to finish
|
||||
// before running a manual compaction.
|
||||
return true;
|
||||
}
|
||||
if (m->exclusive) {
|
||||
return (bg_compaction_scheduled_ > 0);
|
||||
return (bg_bottom_compaction_scheduled_ > 0 ||
|
||||
bg_compaction_scheduled_ > 0);
|
||||
}
|
||||
std::deque<ManualCompaction*>::iterator it =
|
||||
std::deque<ManualCompactionState*>::iterator it =
|
||||
manual_compaction_dequeue_.begin();
|
||||
bool seen = false;
|
||||
while (it != manual_compaction_dequeue_.end()) {
|
||||
@ -1756,7 +1817,7 @@ bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
|
||||
|
||||
bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
|
||||
// Remove from priority queue
|
||||
std::deque<ManualCompaction*>::iterator it =
|
||||
std::deque<ManualCompactionState*>::iterator it =
|
||||
manual_compaction_dequeue_.begin();
|
||||
while (it != manual_compaction_dequeue_.end()) {
|
||||
if ((*it)->exclusive) {
|
||||
@ -1774,7 +1835,7 @@ bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
|
||||
|
||||
bool DBImpl::HasExclusiveManualCompaction() {
|
||||
// Remove from priority queue
|
||||
std::deque<ManualCompaction*>::iterator it =
|
||||
std::deque<ManualCompactionState*>::iterator it =
|
||||
manual_compaction_dequeue_.begin();
|
||||
while (it != manual_compaction_dequeue_.end()) {
|
||||
if ((*it)->exclusive) {
|
||||
@ -1785,7 +1846,7 @@ bool DBImpl::HasExclusiveManualCompaction() {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
|
||||
bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
|
||||
if ((m->exclusive) || (m1->exclusive)) {
|
||||
return true;
|
||||
}
|
||||
|
@ -112,7 +112,9 @@ Status DBImpl::TEST_WaitForCompact() {
|
||||
// OR flush to finish.
|
||||
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
while ((bg_compaction_scheduled_ || bg_flush_scheduled_) && bg_error_.ok()) {
|
||||
while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
|
||||
bg_flush_scheduled_) &&
|
||||
bg_error_.ok()) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
return bg_error_;
|
||||
|
@ -1370,6 +1370,103 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_P(DBTestUniversalCompaction, FullCompactionInBottomPriThreadPool) {
|
||||
const int kNumFilesTrigger = 3;
|
||||
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
|
||||
for (bool allow_ingest_behind : {false, true}) {
|
||||
Options options = CurrentOptions();
|
||||
options.allow_ingest_behind = allow_ingest_behind;
|
||||
options.compaction_style = kCompactionStyleUniversal;
|
||||
options.num_levels = num_levels_;
|
||||
options.write_buffer_size = 100 << 10; // 100KB
|
||||
options.target_file_size_base = 32 << 10; // 32KB
|
||||
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
|
||||
// Trigger compaction if size amplification exceeds 110%
|
||||
options.compaction_options_universal.max_size_amplification_percent = 110;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
int num_bottom_pri_compactions = 0;
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::BGWorkBottomCompaction",
|
||||
[&](void* arg) { ++num_bottom_pri_compactions; });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Random rnd(301);
|
||||
for (int num = 0; num < kNumFilesTrigger; num++) {
|
||||
ASSERT_EQ(NumSortedRuns(), num);
|
||||
int key_idx = 0;
|
||||
GenerateNewFile(&rnd, &key_idx);
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
if (allow_ingest_behind || num_levels_ > 1) {
|
||||
// allow_ingest_behind increases number of levels while sanitizing.
|
||||
ASSERT_EQ(1, num_bottom_pri_compactions);
|
||||
} else {
|
||||
// for single-level universal, everything's bottom level so nothing should
|
||||
// be executed in bottom-pri thread pool.
|
||||
ASSERT_EQ(0, num_bottom_pri_compactions);
|
||||
}
|
||||
// Verify that size amplification did occur
|
||||
ASSERT_EQ(NumSortedRuns(), 1);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
|
||||
if (num_levels_ == 1) {
|
||||
// for single-level universal, everything's bottom level so nothing should
|
||||
// be executed in bottom-pri thread pool.
|
||||
return;
|
||||
}
|
||||
const int kNumFilesTrigger = 3;
|
||||
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
|
||||
Options options = CurrentOptions();
|
||||
options.compaction_style = kCompactionStyleUniversal;
|
||||
options.num_levels = num_levels_;
|
||||
options.write_buffer_size = 100 << 10; // 100KB
|
||||
options.target_file_size_base = 32 << 10; // 32KB
|
||||
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
|
||||
// Trigger compaction if size amplification exceeds 110%
|
||||
options.compaction_options_universal.max_size_amplification_percent = 110;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{// wait for the full compaction to be picked before adding files intended
|
||||
// for the second one.
|
||||
{"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
|
||||
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"},
|
||||
// the full (bottom-pri) compaction waits until a partial (low-pri)
|
||||
// compaction has started to verify they can run in parallel.
|
||||
{"DBImpl::BackgroundCompaction:NonTrivial",
|
||||
"DBImpl::BGWorkBottomCompaction"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
for (int num = 0; num < kNumFilesTrigger; num++) {
|
||||
int key_idx = 0;
|
||||
GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
|
||||
// use no_wait above because that one waits for flush and compaction. We
|
||||
// don't want to wait for compaction because the full compaction is
|
||||
// intentionally blocked while more files are flushed.
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
}
|
||||
if (i == 0) {
|
||||
TEST_SYNC_POINT(
|
||||
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0");
|
||||
}
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
// First compaction should output to bottom level. Second should output to L0
|
||||
// since older L0 files pending compaction prevent it from being placed lower.
|
||||
ASSERT_EQ(NumSortedRuns(), 2);
|
||||
ASSERT_GT(NumTableFilesAtLevel(0), 0);
|
||||
ASSERT_GT(NumTableFilesAtLevel(num_levels_ - 1), 0);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction,
|
||||
::testing::Combine(::testing::Values(1, 3, 5),
|
||||
::testing::Bool()));
|
||||
|
@ -1227,6 +1227,14 @@ int VersionStorageInfo::MaxInputLevel() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
|
||||
if (allow_ingest_behind) {
|
||||
assert(num_levels() > 1);
|
||||
return num_levels() - 2;
|
||||
}
|
||||
return num_levels() - 1;
|
||||
}
|
||||
|
||||
void VersionStorageInfo::EstimateCompactionBytesNeeded(
|
||||
const MutableCFOptions& mutable_cf_options) {
|
||||
// Only implemented for level-based compaction
|
||||
|
@ -147,6 +147,7 @@ class VersionStorageInfo {
|
||||
}
|
||||
|
||||
int MaxInputLevel() const;
|
||||
int MaxOutputLevel(bool allow_ingest_behind) const;
|
||||
|
||||
// Return level number that has idx'th highest score
|
||||
int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; }
|
||||
|
12
env/env_posix.cc
vendored
12
env/env_posix.cc
vendored
@ -761,23 +761,23 @@ class PosixEnv : public Env {
|
||||
|
||||
// Allow increasing the number of worker threads.
|
||||
virtual void SetBackgroundThreads(int num, Priority pri) override {
|
||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
||||
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||
thread_pools_[pri].SetBackgroundThreads(num);
|
||||
}
|
||||
|
||||
virtual int GetBackgroundThreads(Priority pri) override {
|
||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
||||
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||
return thread_pools_[pri].GetBackgroundThreads();
|
||||
}
|
||||
|
||||
// Allow increasing the number of worker threads.
|
||||
virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
|
||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
||||
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
|
||||
}
|
||||
|
||||
virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override {
|
||||
assert(pool >= Priority::LOW && pool <= Priority::HIGH);
|
||||
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
|
||||
#ifdef OS_LINUX
|
||||
thread_pools_[pool].LowerIOPriority();
|
||||
#endif
|
||||
@ -883,7 +883,7 @@ PosixEnv::PosixEnv()
|
||||
|
||||
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
|
||||
void* tag, void (*unschedFunction)(void* arg)) {
|
||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
||||
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||
thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
|
||||
}
|
||||
|
||||
@ -892,7 +892,7 @@ int PosixEnv::UnSchedule(void* arg, Priority pri) {
|
||||
}
|
||||
|
||||
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
|
||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
||||
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||
return thread_pools_[pri].GetQueueLen();
|
||||
}
|
||||
|
||||
|
14
env/env_test.cc
vendored
14
env/env_test.cc
vendored
@ -125,12 +125,14 @@ static void SetBool(void* ptr) {
|
||||
reinterpret_cast<std::atomic<bool>*>(ptr)->store(true);
|
||||
}
|
||||
|
||||
TEST_P(EnvPosixTestWithParam, RunImmediately) {
|
||||
std::atomic<bool> called(false);
|
||||
env_->Schedule(&SetBool, &called);
|
||||
Env::Default()->SleepForMicroseconds(kDelayMicros);
|
||||
ASSERT_TRUE(called.load());
|
||||
WaitThreadPoolsEmpty();
|
||||
TEST_F(EnvPosixTest, RunImmediately) {
|
||||
for (int pri = Env::BOTTOM; pri < Env::TOTAL; ++pri) {
|
||||
std::atomic<bool> called(false);
|
||||
env_->SetBackgroundThreads(1, static_cast<Env::Priority>(pri));
|
||||
env_->Schedule(&SetBool, &called, static_cast<Env::Priority>(pri));
|
||||
Env::Default()->SleepForMicroseconds(kDelayMicros);
|
||||
ASSERT_TRUE(called.load());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(EnvPosixTestWithParam, UnSchedule) {
|
||||
|
@ -283,7 +283,7 @@ class Env {
|
||||
virtual Status UnlockFile(FileLock* lock) = 0;
|
||||
|
||||
// Priority for scheduling job in thread pool
|
||||
enum Priority { LOW, HIGH, TOTAL };
|
||||
enum Priority { BOTTOM, LOW, HIGH, TOTAL };
|
||||
|
||||
// Priority for requesting bytes in rate limiter scheduler
|
||||
enum IOPriority {
|
||||
|
@ -571,6 +571,7 @@ static void RunConcurrentRead(int run) {
|
||||
fprintf(stderr, "Run %d of %d\n", i, N);
|
||||
}
|
||||
TestState state(seed + 1);
|
||||
Env::Default()->SetBackgroundThreads(1);
|
||||
Env::Default()->Schedule(ConcurrentReader, &state);
|
||||
state.Wait(TestState::RUNNING);
|
||||
for (int k = 0; k < kSize; ++k) {
|
||||
|
@ -363,6 +363,7 @@ static void RunConcurrent(int run) {
|
||||
fprintf(stderr, "Run %d of %d\n", i, N);
|
||||
}
|
||||
TestState state(seed + 1);
|
||||
Env::Default()->SetBackgroundThreads(1);
|
||||
Env::Default()->Schedule(ConcurrentReader, &state);
|
||||
state.Wait(TestState::RUNNING);
|
||||
for (int k = 0; k < kSize; k++) {
|
||||
|
@ -318,6 +318,10 @@ DEFINE_int32(max_background_jobs,
|
||||
"The maximum number of concurrent background jobs that can occur "
|
||||
"in parallel.");
|
||||
|
||||
DEFINE_int32(num_bottom_pri_threads, 0,
|
||||
"The number of threads in the bottom-priority thread pool (used "
|
||||
"by universal compaction only).");
|
||||
|
||||
DEFINE_int32(max_background_compactions,
|
||||
rocksdb::Options().max_background_compactions,
|
||||
"The maximum number of concurrent background compactions"
|
||||
@ -5242,6 +5246,8 @@ int db_bench_tool(int argc, char** argv) {
|
||||
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
|
||||
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes,
|
||||
rocksdb::Env::Priority::HIGH);
|
||||
FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
|
||||
rocksdb::Env::Priority::BOTTOM);
|
||||
|
||||
// Choose a location for the test database if none given with --db=<path>
|
||||
if (FLAGS_db.empty()) {
|
||||
|
@ -123,11 +123,11 @@ private:
|
||||
|
||||
inline
|
||||
ThreadPoolImpl::Impl::Impl()
|
||||
:
|
||||
:
|
||||
low_io_priority_(false),
|
||||
priority_(Env::LOW),
|
||||
env_(nullptr),
|
||||
total_threads_limit_(1),
|
||||
total_threads_limit_(0),
|
||||
queue_len_(),
|
||||
exit_all_threads_(false),
|
||||
wait_for_jobs_to_complete_(false),
|
||||
@ -372,7 +372,7 @@ int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
|
||||
return count;
|
||||
}
|
||||
|
||||
ThreadPoolImpl::ThreadPoolImpl() :
|
||||
ThreadPoolImpl::ThreadPoolImpl() :
|
||||
impl_(new Impl()) {
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user