Introduce bottom-pri thread pool for large universal compactions
Summary: When we had a single thread pool for compactions, a thread could be busy for a long time (minutes) executing a compaction involving the bottom level. In multi-instance setups, the entire thread pool could be consumed by such bottom-level compactions. Then, top-level compactions (e.g., a few L0 files) would be blocked for a long time ("head-of-line blocking"). Such top-level compactions are critical to prevent compaction stalls as they can quickly reduce number of L0 files / sorted runs. This diff introduces a bottom-priority queue for universal compactions including the bottom level. This alleviates the head-of-line blocking situation for fast, top-level compactions. - Added `Env::Priority::BOTTOM` thread pool. This feature is only enabled if user explicitly configures it to have a positive number of threads. - Changed `ThreadPoolImpl`'s default thread limit from one to zero. This change is invisible to users as we call `IncBackgroundThreadsIfNeeded` on the low-pri/high-pri pools during `DB::Open` with values of at least one. It is necessary, though, for bottom-pri to start with zero threads so the feature is disabled by default. - Separated `ManualCompaction` into two parts in `PrepickedCompaction`. `PrepickedCompaction` is used for any compaction that's picked outside of its execution thread, either manual or automatic. - Forward universal compactions involving last level to the bottom pool (worker thread's entry point is `BGWorkBottomCompaction`). - Track `bg_bottom_compaction_scheduled_` so we can wait for bottom-level compactions to finish. We don't count them against the background jobs limits. So users of this feature will get an extra compaction for free. Closes https://github.com/facebook/rocksdb/pull/2580 Differential Revision: D5422916 Pulled By: ajkr fbshipit-source-id: a74bd11f1ea4933df3739b16808bb21fcd512333
This commit is contained in:
parent
0b814ba92d
commit
cc01985db0
@ -3,6 +3,7 @@
|
|||||||
### New Features
|
### New Features
|
||||||
* Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators.
|
* Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators.
|
||||||
* Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1.
|
* Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1.
|
||||||
|
* Universal compactions including the bottom level can be executed in a dedicated thread pool. This alleviates head-of-line blocking in the compaction queue, which cause write stalling, particularly in multi-instance use cases. Users can enable this feature via `Env::SetBackgroundThreads(N, Env::Priority::BOTTOM)`, where `N > 0`.
|
||||||
|
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
* Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`.
|
* Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`.
|
||||||
|
@ -168,6 +168,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
|||||||
last_batch_group_size_(0),
|
last_batch_group_size_(0),
|
||||||
unscheduled_flushes_(0),
|
unscheduled_flushes_(0),
|
||||||
unscheduled_compactions_(0),
|
unscheduled_compactions_(0),
|
||||||
|
bg_bottom_compaction_scheduled_(0),
|
||||||
bg_compaction_scheduled_(0),
|
bg_compaction_scheduled_(0),
|
||||||
num_running_compactions_(0),
|
num_running_compactions_(0),
|
||||||
bg_flush_scheduled_(0),
|
bg_flush_scheduled_(0),
|
||||||
@ -242,7 +243,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Wait for background work to finish
|
// Wait for background work to finish
|
||||||
while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
|
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
|
||||||
|
bg_flush_scheduled_) {
|
||||||
bg_cv_.Wait();
|
bg_cv_.Wait();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -252,15 +254,18 @@ DBImpl::~DBImpl() {
|
|||||||
// marker. After this we do a variant of the waiting and unschedule work
|
// marker. After this we do a variant of the waiting and unschedule work
|
||||||
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
|
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
|
||||||
CancelAllBackgroundWork(false);
|
CancelAllBackgroundWork(false);
|
||||||
|
int bottom_compactions_unscheduled =
|
||||||
|
env_->UnSchedule(this, Env::Priority::BOTTOM);
|
||||||
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
|
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
|
||||||
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
|
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
|
bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
|
||||||
bg_compaction_scheduled_ -= compactions_unscheduled;
|
bg_compaction_scheduled_ -= compactions_unscheduled;
|
||||||
bg_flush_scheduled_ -= flushes_unscheduled;
|
bg_flush_scheduled_ -= flushes_unscheduled;
|
||||||
|
|
||||||
// Wait for background work to finish
|
// Wait for background work to finish
|
||||||
while (bg_compaction_scheduled_ || bg_flush_scheduled_ ||
|
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
|
||||||
bg_purge_scheduled_) {
|
bg_flush_scheduled_ || bg_purge_scheduled_) {
|
||||||
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
|
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
|
||||||
bg_cv_.Wait();
|
bg_cv_.Wait();
|
||||||
}
|
}
|
||||||
|
38
db/db_impl.h
38
db/db_impl.h
@ -658,6 +658,7 @@ class DBImpl : public DB {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct PrepickedCompaction;
|
||||||
struct PurgeFileInfo;
|
struct PurgeFileInfo;
|
||||||
|
|
||||||
// Recover the descriptor from persistent storage. May do a significant
|
// Recover the descriptor from persistent storage. May do a significant
|
||||||
@ -799,14 +800,19 @@ class DBImpl : public DB {
|
|||||||
void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
|
void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
|
||||||
uint32_t path_id, int job_id);
|
uint32_t path_id, int job_id);
|
||||||
static void BGWorkCompaction(void* arg);
|
static void BGWorkCompaction(void* arg);
|
||||||
|
// Runs a pre-chosen universal compaction involving bottom level in a
|
||||||
|
// separate, bottom-pri thread pool.
|
||||||
|
static void BGWorkBottomCompaction(void* arg);
|
||||||
static void BGWorkFlush(void* db);
|
static void BGWorkFlush(void* db);
|
||||||
static void BGWorkPurge(void* arg);
|
static void BGWorkPurge(void* arg);
|
||||||
static void UnscheduleCallback(void* arg);
|
static void UnscheduleCallback(void* arg);
|
||||||
void BackgroundCallCompaction(void* arg);
|
void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
||||||
|
Env::Priority bg_thread_pri);
|
||||||
void BackgroundCallFlush();
|
void BackgroundCallFlush();
|
||||||
void BackgroundCallPurge();
|
void BackgroundCallPurge();
|
||||||
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
||||||
LogBuffer* log_buffer, void* m = 0);
|
LogBuffer* log_buffer,
|
||||||
|
PrepickedCompaction* prepicked_compaction);
|
||||||
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
|
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
|
||||||
LogBuffer* log_buffer);
|
LogBuffer* log_buffer);
|
||||||
|
|
||||||
@ -1059,6 +1065,10 @@ class DBImpl : public DB {
|
|||||||
int unscheduled_flushes_;
|
int unscheduled_flushes_;
|
||||||
int unscheduled_compactions_;
|
int unscheduled_compactions_;
|
||||||
|
|
||||||
|
// count how many background compactions are running or have been scheduled in
|
||||||
|
// the BOTTOM pool
|
||||||
|
int bg_bottom_compaction_scheduled_;
|
||||||
|
|
||||||
// count how many background compactions are running or have been scheduled
|
// count how many background compactions are running or have been scheduled
|
||||||
int bg_compaction_scheduled_;
|
int bg_compaction_scheduled_;
|
||||||
|
|
||||||
@ -1075,7 +1085,7 @@ class DBImpl : public DB {
|
|||||||
int bg_purge_scheduled_;
|
int bg_purge_scheduled_;
|
||||||
|
|
||||||
// Information for a manual compaction
|
// Information for a manual compaction
|
||||||
struct ManualCompaction {
|
struct ManualCompactionState {
|
||||||
ColumnFamilyData* cfd;
|
ColumnFamilyData* cfd;
|
||||||
int input_level;
|
int input_level;
|
||||||
int output_level;
|
int output_level;
|
||||||
@ -1091,13 +1101,21 @@ class DBImpl : public DB {
|
|||||||
InternalKey* manual_end; // how far we are compacting
|
InternalKey* manual_end; // how far we are compacting
|
||||||
InternalKey tmp_storage; // Used to keep track of compaction progress
|
InternalKey tmp_storage; // Used to keep track of compaction progress
|
||||||
InternalKey tmp_storage1; // Used to keep track of compaction progress
|
InternalKey tmp_storage1; // Used to keep track of compaction progress
|
||||||
Compaction* compaction;
|
|
||||||
};
|
};
|
||||||
std::deque<ManualCompaction*> manual_compaction_dequeue_;
|
struct PrepickedCompaction {
|
||||||
|
// background compaction takes ownership of `compaction`.
|
||||||
|
Compaction* compaction;
|
||||||
|
// caller retains ownership of `manual_compaction_state` as it is reused
|
||||||
|
// across background compactions.
|
||||||
|
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
|
||||||
|
};
|
||||||
|
std::deque<ManualCompactionState*> manual_compaction_dequeue_;
|
||||||
|
|
||||||
struct CompactionArg {
|
struct CompactionArg {
|
||||||
|
// caller retains ownership of `db`.
|
||||||
DBImpl* db;
|
DBImpl* db;
|
||||||
ManualCompaction* m;
|
// background compaction takes ownership of `prepicked_compaction`.
|
||||||
|
PrepickedCompaction* prepicked_compaction;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Have we encountered a background error in paranoid mode?
|
// Have we encountered a background error in paranoid mode?
|
||||||
@ -1231,11 +1249,11 @@ class DBImpl : public DB {
|
|||||||
|
|
||||||
bool HasPendingManualCompaction();
|
bool HasPendingManualCompaction();
|
||||||
bool HasExclusiveManualCompaction();
|
bool HasExclusiveManualCompaction();
|
||||||
void AddManualCompaction(ManualCompaction* m);
|
void AddManualCompaction(ManualCompactionState* m);
|
||||||
void RemoveManualCompaction(ManualCompaction* m);
|
void RemoveManualCompaction(ManualCompactionState* m);
|
||||||
bool ShouldntRunManualCompaction(ManualCompaction* m);
|
bool ShouldntRunManualCompaction(ManualCompactionState* m);
|
||||||
bool HaveManualCompaction(ColumnFamilyData* cfd);
|
bool HaveManualCompaction(ColumnFamilyData* cfd);
|
||||||
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
|
bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
|
||||||
|
|
||||||
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
|
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
|
||||||
|
|
||||||
|
@ -612,7 +612,8 @@ Status DBImpl::CompactFilesImpl(
|
|||||||
Status DBImpl::PauseBackgroundWork() {
|
Status DBImpl::PauseBackgroundWork() {
|
||||||
InstrumentedMutexLock guard_lock(&mutex_);
|
InstrumentedMutexLock guard_lock(&mutex_);
|
||||||
bg_compaction_paused_++;
|
bg_compaction_paused_++;
|
||||||
while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_ > 0) {
|
while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
|
||||||
|
bg_flush_scheduled_ > 0) {
|
||||||
bg_cv_.Wait();
|
bg_cv_.Wait();
|
||||||
}
|
}
|
||||||
bg_work_paused_++;
|
bg_work_paused_++;
|
||||||
@ -808,7 +809,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
|||||||
|
|
||||||
bool scheduled = false;
|
bool scheduled = false;
|
||||||
bool manual_conflict = false;
|
bool manual_conflict = false;
|
||||||
ManualCompaction manual;
|
ManualCompactionState manual;
|
||||||
manual.cfd = cfd;
|
manual.cfd = cfd;
|
||||||
manual.input_level = input_level;
|
manual.input_level = input_level;
|
||||||
manual.output_level = output_level;
|
manual.output_level = output_level;
|
||||||
@ -858,7 +859,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
|||||||
AddManualCompaction(&manual);
|
AddManualCompaction(&manual);
|
||||||
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
|
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
|
||||||
if (exclusive) {
|
if (exclusive) {
|
||||||
while (bg_compaction_scheduled_ > 0) {
|
while (bg_bottom_compaction_scheduled_ > 0 ||
|
||||||
|
bg_compaction_scheduled_ > 0) {
|
||||||
TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
|
TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
|
||||||
ROCKS_LOG_INFO(
|
ROCKS_LOG_INFO(
|
||||||
immutable_db_options_.info_log,
|
immutable_db_options_.info_log,
|
||||||
@ -878,14 +880,14 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
|||||||
while (!manual.done) {
|
while (!manual.done) {
|
||||||
assert(HasPendingManualCompaction());
|
assert(HasPendingManualCompaction());
|
||||||
manual_conflict = false;
|
manual_conflict = false;
|
||||||
|
Compaction* compaction;
|
||||||
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
|
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
|
||||||
scheduled ||
|
scheduled ||
|
||||||
((manual.manual_end = &manual.tmp_storage1)&&(
|
((manual.manual_end = &manual.tmp_storage1) &&
|
||||||
(manual.compaction = manual.cfd->CompactRange(
|
((compaction = manual.cfd->CompactRange(
|
||||||
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
|
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
|
||||||
manual.output_level, manual.output_path_id, manual.begin,
|
manual.output_level, manual.output_path_id, manual.begin,
|
||||||
manual.end, &manual.manual_end, &manual_conflict)) ==
|
manual.end, &manual.manual_end, &manual_conflict)) == nullptr) &&
|
||||||
nullptr) &&
|
|
||||||
manual_conflict)) {
|
manual_conflict)) {
|
||||||
// exclusive manual compactions should not see a conflict during
|
// exclusive manual compactions should not see a conflict during
|
||||||
// CompactRange
|
// CompactRange
|
||||||
@ -898,14 +900,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
|||||||
manual.incomplete = false;
|
manual.incomplete = false;
|
||||||
}
|
}
|
||||||
} else if (!scheduled) {
|
} else if (!scheduled) {
|
||||||
if (manual.compaction == nullptr) {
|
if (compaction == nullptr) {
|
||||||
manual.done = true;
|
manual.done = true;
|
||||||
bg_cv_.SignalAll();
|
bg_cv_.SignalAll();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ca = new CompactionArg;
|
ca = new CompactionArg;
|
||||||
ca->db = this;
|
ca->db = this;
|
||||||
ca->m = &manual;
|
ca->prepicked_compaction = new PrepickedCompaction;
|
||||||
|
ca->prepicked_compaction->manual_compaction_state = &manual;
|
||||||
|
ca->prepicked_compaction->compaction = compaction;
|
||||||
manual.incomplete = false;
|
manual.incomplete = false;
|
||||||
bg_compaction_scheduled_++;
|
bg_compaction_scheduled_++;
|
||||||
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
||||||
@ -1047,7 +1051,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
|
|||||||
unscheduled_compactions_ > 0) {
|
unscheduled_compactions_ > 0) {
|
||||||
CompactionArg* ca = new CompactionArg;
|
CompactionArg* ca = new CompactionArg;
|
||||||
ca->db = this;
|
ca->db = this;
|
||||||
ca->m = nullptr;
|
ca->prepicked_compaction = nullptr;
|
||||||
bg_compaction_scheduled_++;
|
bg_compaction_scheduled_++;
|
||||||
unscheduled_compactions_--;
|
unscheduled_compactions_--;
|
||||||
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
||||||
@ -1152,7 +1156,23 @@ void DBImpl::BGWorkCompaction(void* arg) {
|
|||||||
delete reinterpret_cast<CompactionArg*>(arg);
|
delete reinterpret_cast<CompactionArg*>(arg);
|
||||||
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
|
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
|
||||||
TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
|
TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
|
||||||
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(ca.m);
|
auto prepicked_compaction =
|
||||||
|
static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
|
||||||
|
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
|
||||||
|
prepicked_compaction, Env::Priority::LOW);
|
||||||
|
delete prepicked_compaction;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBImpl::BGWorkBottomCompaction(void* arg) {
|
||||||
|
CompactionArg ca = *(static_cast<CompactionArg*>(arg));
|
||||||
|
delete static_cast<CompactionArg*>(arg);
|
||||||
|
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
|
||||||
|
TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
|
||||||
|
auto* prepicked_compaction = ca.prepicked_compaction;
|
||||||
|
assert(prepicked_compaction && prepicked_compaction->compaction &&
|
||||||
|
!prepicked_compaction->manual_compaction_state);
|
||||||
|
ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
|
||||||
|
delete prepicked_compaction;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBImpl::BGWorkPurge(void* db) {
|
void DBImpl::BGWorkPurge(void* db) {
|
||||||
@ -1165,8 +1185,11 @@ void DBImpl::BGWorkPurge(void* db) {
|
|||||||
void DBImpl::UnscheduleCallback(void* arg) {
|
void DBImpl::UnscheduleCallback(void* arg) {
|
||||||
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
|
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
|
||||||
delete reinterpret_cast<CompactionArg*>(arg);
|
delete reinterpret_cast<CompactionArg*>(arg);
|
||||||
if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) {
|
if (ca.prepicked_compaction != nullptr) {
|
||||||
delete ca.m->compaction;
|
if (ca.prepicked_compaction->compaction != nullptr) {
|
||||||
|
delete ca.prepicked_compaction->compaction;
|
||||||
|
}
|
||||||
|
delete ca.prepicked_compaction;
|
||||||
}
|
}
|
||||||
TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
|
TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
|
||||||
}
|
}
|
||||||
@ -1293,9 +1316,9 @@ void DBImpl::BackgroundCallFlush() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBImpl::BackgroundCallCompaction(void* arg) {
|
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
||||||
|
Env::Priority bg_thread_pri) {
|
||||||
bool made_progress = false;
|
bool made_progress = false;
|
||||||
ManualCompaction* m = reinterpret_cast<ManualCompaction*>(arg);
|
|
||||||
JobContext job_context(next_job_id_.fetch_add(1), true);
|
JobContext job_context(next_job_id_.fetch_add(1), true);
|
||||||
TEST_SYNC_POINT("BackgroundCallCompaction:0");
|
TEST_SYNC_POINT("BackgroundCallCompaction:0");
|
||||||
MaybeDumpStats();
|
MaybeDumpStats();
|
||||||
@ -1313,9 +1336,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
|
|||||||
auto pending_outputs_inserted_elem =
|
auto pending_outputs_inserted_elem =
|
||||||
CaptureCurrentFileNumberInPendingOutputs();
|
CaptureCurrentFileNumberInPendingOutputs();
|
||||||
|
|
||||||
assert(bg_compaction_scheduled_);
|
assert((bg_thread_pri == Env::Priority::BOTTOM &&
|
||||||
Status s =
|
bg_bottom_compaction_scheduled_) ||
|
||||||
BackgroundCompaction(&made_progress, &job_context, &log_buffer, m);
|
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
|
||||||
|
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
|
||||||
|
prepicked_compaction);
|
||||||
TEST_SYNC_POINT("BackgroundCallCompaction:1");
|
TEST_SYNC_POINT("BackgroundCallCompaction:1");
|
||||||
if (!s.ok() && !s.IsShutdownInProgress()) {
|
if (!s.ok() && !s.IsShutdownInProgress()) {
|
||||||
// Wait a little bit before retrying background compaction in
|
// Wait a little bit before retrying background compaction in
|
||||||
@ -1361,17 +1386,24 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
|
|||||||
|
|
||||||
assert(num_running_compactions_ > 0);
|
assert(num_running_compactions_ > 0);
|
||||||
num_running_compactions_--;
|
num_running_compactions_--;
|
||||||
|
if (bg_thread_pri == Env::Priority::LOW) {
|
||||||
bg_compaction_scheduled_--;
|
bg_compaction_scheduled_--;
|
||||||
|
} else {
|
||||||
|
assert(bg_thread_pri == Env::Priority::BOTTOM);
|
||||||
|
bg_bottom_compaction_scheduled_--;
|
||||||
|
}
|
||||||
|
|
||||||
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
|
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
|
||||||
|
|
||||||
// See if there's more work to be done
|
// See if there's more work to be done
|
||||||
MaybeScheduleFlushOrCompaction();
|
MaybeScheduleFlushOrCompaction();
|
||||||
if (made_progress || bg_compaction_scheduled_ == 0 ||
|
if (made_progress ||
|
||||||
|
(bg_compaction_scheduled_ == 0 &&
|
||||||
|
bg_bottom_compaction_scheduled_ == 0) ||
|
||||||
HasPendingManualCompaction()) {
|
HasPendingManualCompaction()) {
|
||||||
// signal if
|
// signal if
|
||||||
// * made_progress -- need to wakeup DelayWrite
|
// * made_progress -- need to wakeup DelayWrite
|
||||||
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
|
// * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
|
||||||
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction
|
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction
|
||||||
// If none of this is true, there is no need to signal since nobody is
|
// If none of this is true, there is no need to signal since nobody is
|
||||||
// waiting for it
|
// waiting for it
|
||||||
@ -1386,14 +1418,23 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
|
|||||||
|
|
||||||
Status DBImpl::BackgroundCompaction(bool* made_progress,
|
Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||||
JobContext* job_context,
|
JobContext* job_context,
|
||||||
LogBuffer* log_buffer, void* arg) {
|
LogBuffer* log_buffer,
|
||||||
ManualCompaction* manual_compaction =
|
PrepickedCompaction* prepicked_compaction) {
|
||||||
reinterpret_cast<ManualCompaction*>(arg);
|
ManualCompactionState* manual_compaction =
|
||||||
|
prepicked_compaction == nullptr
|
||||||
|
? nullptr
|
||||||
|
: prepicked_compaction->manual_compaction_state;
|
||||||
*made_progress = false;
|
*made_progress = false;
|
||||||
mutex_.AssertHeld();
|
mutex_.AssertHeld();
|
||||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
|
||||||
|
|
||||||
bool is_manual = (manual_compaction != nullptr);
|
bool is_manual = (manual_compaction != nullptr);
|
||||||
|
unique_ptr<Compaction> c;
|
||||||
|
if (prepicked_compaction != nullptr &&
|
||||||
|
prepicked_compaction->compaction != nullptr) {
|
||||||
|
c.reset(prepicked_compaction->compaction);
|
||||||
|
}
|
||||||
|
bool is_prepicked = is_manual || c;
|
||||||
|
|
||||||
// (manual_compaction->in_progress == false);
|
// (manual_compaction->in_progress == false);
|
||||||
bool trivial_move_disallowed =
|
bool trivial_move_disallowed =
|
||||||
@ -1410,7 +1451,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
|||||||
manual_compaction->status = status;
|
manual_compaction->status = status;
|
||||||
manual_compaction->done = true;
|
manual_compaction->done = true;
|
||||||
manual_compaction->in_progress = false;
|
manual_compaction->in_progress = false;
|
||||||
delete manual_compaction->compaction;
|
|
||||||
manual_compaction = nullptr;
|
manual_compaction = nullptr;
|
||||||
}
|
}
|
||||||
return status;
|
return status;
|
||||||
@ -1421,13 +1461,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
|||||||
manual_compaction->in_progress = true;
|
manual_compaction->in_progress = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
unique_ptr<Compaction> c;
|
|
||||||
// InternalKey manual_end_storage;
|
// InternalKey manual_end_storage;
|
||||||
// InternalKey* manual_end = &manual_end_storage;
|
// InternalKey* manual_end = &manual_end_storage;
|
||||||
if (is_manual) {
|
if (is_manual) {
|
||||||
ManualCompaction* m = manual_compaction;
|
ManualCompactionState* m = manual_compaction;
|
||||||
assert(m->in_progress);
|
assert(m->in_progress);
|
||||||
c.reset(std::move(m->compaction));
|
|
||||||
if (!c) {
|
if (!c) {
|
||||||
m->done = true;
|
m->done = true;
|
||||||
m->manual_end = nullptr;
|
m->manual_end = nullptr;
|
||||||
@ -1449,7 +1487,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
|||||||
? "(end)"
|
? "(end)"
|
||||||
: m->manual_end->DebugString().c_str()));
|
: m->manual_end->DebugString().c_str()));
|
||||||
}
|
}
|
||||||
} else if (!compaction_queue_.empty()) {
|
} else if (!is_prepicked && !compaction_queue_.empty()) {
|
||||||
if (HaveManualCompaction(compaction_queue_.front())) {
|
if (HaveManualCompaction(compaction_queue_.front())) {
|
||||||
// Can't compact right now, but try again later
|
// Can't compact right now, but try again later
|
||||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
|
||||||
@ -1601,6 +1639,28 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
|||||||
|
|
||||||
// Clear Instrument
|
// Clear Instrument
|
||||||
ThreadStatusUtil::ResetThreadStatus();
|
ThreadStatusUtil::ResetThreadStatus();
|
||||||
|
} else if (c->column_family_data()->ioptions()->compaction_style ==
|
||||||
|
kCompactionStyleUniversal &&
|
||||||
|
!is_prepicked && c->output_level() > 0 &&
|
||||||
|
c->output_level() ==
|
||||||
|
c->column_family_data()
|
||||||
|
->current()
|
||||||
|
->storage_info()
|
||||||
|
->MaxOutputLevel(
|
||||||
|
immutable_db_options_.allow_ingest_behind) &&
|
||||||
|
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
|
||||||
|
// Forward universal compactions involving last level to the bottom pool
|
||||||
|
// if it exists, such that long-running compactions can't block short-
|
||||||
|
// lived ones, like L0->L0s.
|
||||||
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
|
||||||
|
CompactionArg* ca = new CompactionArg;
|
||||||
|
ca->db = this;
|
||||||
|
ca->prepicked_compaction = new PrepickedCompaction;
|
||||||
|
ca->prepicked_compaction->compaction = c.release();
|
||||||
|
ca->prepicked_compaction->manual_compaction_state = nullptr;
|
||||||
|
++bg_bottom_compaction_scheduled_;
|
||||||
|
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
|
||||||
|
this, &DBImpl::UnscheduleCallback);
|
||||||
} else {
|
} else {
|
||||||
int output_level __attribute__((unused)) = c->output_level();
|
int output_level __attribute__((unused)) = c->output_level();
|
||||||
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
|
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
|
||||||
@ -1664,7 +1724,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (is_manual) {
|
if (is_manual) {
|
||||||
ManualCompaction* m = manual_compaction;
|
ManualCompactionState* m = manual_compaction;
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
m->status = status;
|
m->status = status;
|
||||||
m->done = true;
|
m->done = true;
|
||||||
@ -1707,13 +1767,13 @@ bool DBImpl::HasPendingManualCompaction() {
|
|||||||
return (!manual_compaction_dequeue_.empty());
|
return (!manual_compaction_dequeue_.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) {
|
void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
|
||||||
manual_compaction_dequeue_.push_back(m);
|
manual_compaction_dequeue_.push_back(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
|
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
|
||||||
// Remove from queue
|
// Remove from queue
|
||||||
std::deque<ManualCompaction*>::iterator it =
|
std::deque<ManualCompactionState*>::iterator it =
|
||||||
manual_compaction_dequeue_.begin();
|
manual_compaction_dequeue_.begin();
|
||||||
while (it != manual_compaction_dequeue_.end()) {
|
while (it != manual_compaction_dequeue_.end()) {
|
||||||
if (m == (*it)) {
|
if (m == (*it)) {
|
||||||
@ -1726,16 +1786,17 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
|
bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
|
||||||
if (num_running_ingest_file_ > 0) {
|
if (num_running_ingest_file_ > 0) {
|
||||||
// We need to wait for other IngestExternalFile() calls to finish
|
// We need to wait for other IngestExternalFile() calls to finish
|
||||||
// before running a manual compaction.
|
// before running a manual compaction.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (m->exclusive) {
|
if (m->exclusive) {
|
||||||
return (bg_compaction_scheduled_ > 0);
|
return (bg_bottom_compaction_scheduled_ > 0 ||
|
||||||
|
bg_compaction_scheduled_ > 0);
|
||||||
}
|
}
|
||||||
std::deque<ManualCompaction*>::iterator it =
|
std::deque<ManualCompactionState*>::iterator it =
|
||||||
manual_compaction_dequeue_.begin();
|
manual_compaction_dequeue_.begin();
|
||||||
bool seen = false;
|
bool seen = false;
|
||||||
while (it != manual_compaction_dequeue_.end()) {
|
while (it != manual_compaction_dequeue_.end()) {
|
||||||
@ -1756,7 +1817,7 @@ bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
|
|||||||
|
|
||||||
bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
|
bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
|
||||||
// Remove from priority queue
|
// Remove from priority queue
|
||||||
std::deque<ManualCompaction*>::iterator it =
|
std::deque<ManualCompactionState*>::iterator it =
|
||||||
manual_compaction_dequeue_.begin();
|
manual_compaction_dequeue_.begin();
|
||||||
while (it != manual_compaction_dequeue_.end()) {
|
while (it != manual_compaction_dequeue_.end()) {
|
||||||
if ((*it)->exclusive) {
|
if ((*it)->exclusive) {
|
||||||
@ -1774,7 +1835,7 @@ bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
|
|||||||
|
|
||||||
bool DBImpl::HasExclusiveManualCompaction() {
|
bool DBImpl::HasExclusiveManualCompaction() {
|
||||||
// Remove from priority queue
|
// Remove from priority queue
|
||||||
std::deque<ManualCompaction*>::iterator it =
|
std::deque<ManualCompactionState*>::iterator it =
|
||||||
manual_compaction_dequeue_.begin();
|
manual_compaction_dequeue_.begin();
|
||||||
while (it != manual_compaction_dequeue_.end()) {
|
while (it != manual_compaction_dequeue_.end()) {
|
||||||
if ((*it)->exclusive) {
|
if ((*it)->exclusive) {
|
||||||
@ -1785,7 +1846,7 @@ bool DBImpl::HasExclusiveManualCompaction() {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
|
bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
|
||||||
if ((m->exclusive) || (m1->exclusive)) {
|
if ((m->exclusive) || (m1->exclusive)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -112,7 +112,9 @@ Status DBImpl::TEST_WaitForCompact() {
|
|||||||
// OR flush to finish.
|
// OR flush to finish.
|
||||||
|
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
while ((bg_compaction_scheduled_ || bg_flush_scheduled_) && bg_error_.ok()) {
|
while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
|
||||||
|
bg_flush_scheduled_) &&
|
||||||
|
bg_error_.ok()) {
|
||||||
bg_cv_.Wait();
|
bg_cv_.Wait();
|
||||||
}
|
}
|
||||||
return bg_error_;
|
return bg_error_;
|
||||||
|
@ -1370,6 +1370,103 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
|
|||||||
Destroy(options);
|
Destroy(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(DBTestUniversalCompaction, FullCompactionInBottomPriThreadPool) {
|
||||||
|
const int kNumFilesTrigger = 3;
|
||||||
|
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
|
||||||
|
for (bool allow_ingest_behind : {false, true}) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.allow_ingest_behind = allow_ingest_behind;
|
||||||
|
options.compaction_style = kCompactionStyleUniversal;
|
||||||
|
options.num_levels = num_levels_;
|
||||||
|
options.write_buffer_size = 100 << 10; // 100KB
|
||||||
|
options.target_file_size_base = 32 << 10; // 32KB
|
||||||
|
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
|
||||||
|
// Trigger compaction if size amplification exceeds 110%
|
||||||
|
options.compaction_options_universal.max_size_amplification_percent = 110;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
int num_bottom_pri_compactions = 0;
|
||||||
|
SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"DBImpl::BGWorkBottomCompaction",
|
||||||
|
[&](void* arg) { ++num_bottom_pri_compactions; });
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
Random rnd(301);
|
||||||
|
for (int num = 0; num < kNumFilesTrigger; num++) {
|
||||||
|
ASSERT_EQ(NumSortedRuns(), num);
|
||||||
|
int key_idx = 0;
|
||||||
|
GenerateNewFile(&rnd, &key_idx);
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
|
||||||
|
if (allow_ingest_behind || num_levels_ > 1) {
|
||||||
|
// allow_ingest_behind increases number of levels while sanitizing.
|
||||||
|
ASSERT_EQ(1, num_bottom_pri_compactions);
|
||||||
|
} else {
|
||||||
|
// for single-level universal, everything's bottom level so nothing should
|
||||||
|
// be executed in bottom-pri thread pool.
|
||||||
|
ASSERT_EQ(0, num_bottom_pri_compactions);
|
||||||
|
}
|
||||||
|
// Verify that size amplification did occur
|
||||||
|
ASSERT_EQ(NumSortedRuns(), 1);
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
|
||||||
|
if (num_levels_ == 1) {
|
||||||
|
// for single-level universal, everything's bottom level so nothing should
|
||||||
|
// be executed in bottom-pri thread pool.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const int kNumFilesTrigger = 3;
|
||||||
|
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.compaction_style = kCompactionStyleUniversal;
|
||||||
|
options.num_levels = num_levels_;
|
||||||
|
options.write_buffer_size = 100 << 10; // 100KB
|
||||||
|
options.target_file_size_base = 32 << 10; // 32KB
|
||||||
|
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
|
||||||
|
// Trigger compaction if size amplification exceeds 110%
|
||||||
|
options.compaction_options_universal.max_size_amplification_percent = 110;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||||
|
{// wait for the full compaction to be picked before adding files intended
|
||||||
|
// for the second one.
|
||||||
|
{"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
|
||||||
|
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"},
|
||||||
|
// the full (bottom-pri) compaction waits until a partial (low-pri)
|
||||||
|
// compaction has started to verify they can run in parallel.
|
||||||
|
{"DBImpl::BackgroundCompaction:NonTrivial",
|
||||||
|
"DBImpl::BGWorkBottomCompaction"}});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
Random rnd(301);
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
for (int num = 0; num < kNumFilesTrigger; num++) {
|
||||||
|
int key_idx = 0;
|
||||||
|
GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
|
||||||
|
// use no_wait above because that one waits for flush and compaction. We
|
||||||
|
// don't want to wait for compaction because the full compaction is
|
||||||
|
// intentionally blocked while more files are flushed.
|
||||||
|
dbfull()->TEST_WaitForFlushMemTable();
|
||||||
|
}
|
||||||
|
if (i == 0) {
|
||||||
|
TEST_SYNC_POINT(
|
||||||
|
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
|
||||||
|
// First compaction should output to bottom level. Second should output to L0
|
||||||
|
// since older L0 files pending compaction prevent it from being placed lower.
|
||||||
|
ASSERT_EQ(NumSortedRuns(), 2);
|
||||||
|
ASSERT_GT(NumTableFilesAtLevel(0), 0);
|
||||||
|
ASSERT_GT(NumTableFilesAtLevel(num_levels_ - 1), 0);
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
}
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction,
|
INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction,
|
||||||
::testing::Combine(::testing::Values(1, 3, 5),
|
::testing::Combine(::testing::Values(1, 3, 5),
|
||||||
::testing::Bool()));
|
::testing::Bool()));
|
||||||
|
@ -1227,6 +1227,14 @@ int VersionStorageInfo::MaxInputLevel() const {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
|
||||||
|
if (allow_ingest_behind) {
|
||||||
|
assert(num_levels() > 1);
|
||||||
|
return num_levels() - 2;
|
||||||
|
}
|
||||||
|
return num_levels() - 1;
|
||||||
|
}
|
||||||
|
|
||||||
void VersionStorageInfo::EstimateCompactionBytesNeeded(
|
void VersionStorageInfo::EstimateCompactionBytesNeeded(
|
||||||
const MutableCFOptions& mutable_cf_options) {
|
const MutableCFOptions& mutable_cf_options) {
|
||||||
// Only implemented for level-based compaction
|
// Only implemented for level-based compaction
|
||||||
|
@ -147,6 +147,7 @@ class VersionStorageInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int MaxInputLevel() const;
|
int MaxInputLevel() const;
|
||||||
|
int MaxOutputLevel(bool allow_ingest_behind) const;
|
||||||
|
|
||||||
// Return level number that has idx'th highest score
|
// Return level number that has idx'th highest score
|
||||||
int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; }
|
int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; }
|
||||||
|
12
env/env_posix.cc
vendored
12
env/env_posix.cc
vendored
@ -761,23 +761,23 @@ class PosixEnv : public Env {
|
|||||||
|
|
||||||
// Allow increasing the number of worker threads.
|
// Allow increasing the number of worker threads.
|
||||||
virtual void SetBackgroundThreads(int num, Priority pri) override {
|
virtual void SetBackgroundThreads(int num, Priority pri) override {
|
||||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||||
thread_pools_[pri].SetBackgroundThreads(num);
|
thread_pools_[pri].SetBackgroundThreads(num);
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual int GetBackgroundThreads(Priority pri) override {
|
virtual int GetBackgroundThreads(Priority pri) override {
|
||||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||||
return thread_pools_[pri].GetBackgroundThreads();
|
return thread_pools_[pri].GetBackgroundThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allow increasing the number of worker threads.
|
// Allow increasing the number of worker threads.
|
||||||
virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
|
virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
|
||||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||||
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
|
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override {
|
virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override {
|
||||||
assert(pool >= Priority::LOW && pool <= Priority::HIGH);
|
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
|
||||||
#ifdef OS_LINUX
|
#ifdef OS_LINUX
|
||||||
thread_pools_[pool].LowerIOPriority();
|
thread_pools_[pool].LowerIOPriority();
|
||||||
#endif
|
#endif
|
||||||
@ -883,7 +883,7 @@ PosixEnv::PosixEnv()
|
|||||||
|
|
||||||
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
|
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
|
||||||
void* tag, void (*unschedFunction)(void* arg)) {
|
void* tag, void (*unschedFunction)(void* arg)) {
|
||||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||||
thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
|
thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -892,7 +892,7 @@ int PosixEnv::UnSchedule(void* arg, Priority pri) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
|
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
|
||||||
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
|
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
|
||||||
return thread_pools_[pri].GetQueueLen();
|
return thread_pools_[pri].GetQueueLen();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
8
env/env_test.cc
vendored
8
env/env_test.cc
vendored
@ -125,12 +125,14 @@ static void SetBool(void* ptr) {
|
|||||||
reinterpret_cast<std::atomic<bool>*>(ptr)->store(true);
|
reinterpret_cast<std::atomic<bool>*>(ptr)->store(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_P(EnvPosixTestWithParam, RunImmediately) {
|
TEST_F(EnvPosixTest, RunImmediately) {
|
||||||
|
for (int pri = Env::BOTTOM; pri < Env::TOTAL; ++pri) {
|
||||||
std::atomic<bool> called(false);
|
std::atomic<bool> called(false);
|
||||||
env_->Schedule(&SetBool, &called);
|
env_->SetBackgroundThreads(1, static_cast<Env::Priority>(pri));
|
||||||
|
env_->Schedule(&SetBool, &called, static_cast<Env::Priority>(pri));
|
||||||
Env::Default()->SleepForMicroseconds(kDelayMicros);
|
Env::Default()->SleepForMicroseconds(kDelayMicros);
|
||||||
ASSERT_TRUE(called.load());
|
ASSERT_TRUE(called.load());
|
||||||
WaitThreadPoolsEmpty();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_P(EnvPosixTestWithParam, UnSchedule) {
|
TEST_P(EnvPosixTestWithParam, UnSchedule) {
|
||||||
|
@ -283,7 +283,7 @@ class Env {
|
|||||||
virtual Status UnlockFile(FileLock* lock) = 0;
|
virtual Status UnlockFile(FileLock* lock) = 0;
|
||||||
|
|
||||||
// Priority for scheduling job in thread pool
|
// Priority for scheduling job in thread pool
|
||||||
enum Priority { LOW, HIGH, TOTAL };
|
enum Priority { BOTTOM, LOW, HIGH, TOTAL };
|
||||||
|
|
||||||
// Priority for requesting bytes in rate limiter scheduler
|
// Priority for requesting bytes in rate limiter scheduler
|
||||||
enum IOPriority {
|
enum IOPriority {
|
||||||
|
@ -571,6 +571,7 @@ static void RunConcurrentRead(int run) {
|
|||||||
fprintf(stderr, "Run %d of %d\n", i, N);
|
fprintf(stderr, "Run %d of %d\n", i, N);
|
||||||
}
|
}
|
||||||
TestState state(seed + 1);
|
TestState state(seed + 1);
|
||||||
|
Env::Default()->SetBackgroundThreads(1);
|
||||||
Env::Default()->Schedule(ConcurrentReader, &state);
|
Env::Default()->Schedule(ConcurrentReader, &state);
|
||||||
state.Wait(TestState::RUNNING);
|
state.Wait(TestState::RUNNING);
|
||||||
for (int k = 0; k < kSize; ++k) {
|
for (int k = 0; k < kSize; ++k) {
|
||||||
|
@ -363,6 +363,7 @@ static void RunConcurrent(int run) {
|
|||||||
fprintf(stderr, "Run %d of %d\n", i, N);
|
fprintf(stderr, "Run %d of %d\n", i, N);
|
||||||
}
|
}
|
||||||
TestState state(seed + 1);
|
TestState state(seed + 1);
|
||||||
|
Env::Default()->SetBackgroundThreads(1);
|
||||||
Env::Default()->Schedule(ConcurrentReader, &state);
|
Env::Default()->Schedule(ConcurrentReader, &state);
|
||||||
state.Wait(TestState::RUNNING);
|
state.Wait(TestState::RUNNING);
|
||||||
for (int k = 0; k < kSize; k++) {
|
for (int k = 0; k < kSize; k++) {
|
||||||
|
@ -318,6 +318,10 @@ DEFINE_int32(max_background_jobs,
|
|||||||
"The maximum number of concurrent background jobs that can occur "
|
"The maximum number of concurrent background jobs that can occur "
|
||||||
"in parallel.");
|
"in parallel.");
|
||||||
|
|
||||||
|
DEFINE_int32(num_bottom_pri_threads, 0,
|
||||||
|
"The number of threads in the bottom-priority thread pool (used "
|
||||||
|
"by universal compaction only).");
|
||||||
|
|
||||||
DEFINE_int32(max_background_compactions,
|
DEFINE_int32(max_background_compactions,
|
||||||
rocksdb::Options().max_background_compactions,
|
rocksdb::Options().max_background_compactions,
|
||||||
"The maximum number of concurrent background compactions"
|
"The maximum number of concurrent background compactions"
|
||||||
@ -5242,6 +5246,8 @@ int db_bench_tool(int argc, char** argv) {
|
|||||||
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
|
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
|
||||||
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes,
|
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes,
|
||||||
rocksdb::Env::Priority::HIGH);
|
rocksdb::Env::Priority::HIGH);
|
||||||
|
FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
|
||||||
|
rocksdb::Env::Priority::BOTTOM);
|
||||||
|
|
||||||
// Choose a location for the test database if none given with --db=<path>
|
// Choose a location for the test database if none given with --db=<path>
|
||||||
if (FLAGS_db.empty()) {
|
if (FLAGS_db.empty()) {
|
||||||
|
@ -127,7 +127,7 @@ ThreadPoolImpl::Impl::Impl()
|
|||||||
low_io_priority_(false),
|
low_io_priority_(false),
|
||||||
priority_(Env::LOW),
|
priority_(Env::LOW),
|
||||||
env_(nullptr),
|
env_(nullptr),
|
||||||
total_threads_limit_(1),
|
total_threads_limit_(0),
|
||||||
queue_len_(),
|
queue_len_(),
|
||||||
exit_all_threads_(false),
|
exit_all_threads_(false),
|
||||||
wait_for_jobs_to_complete_(false),
|
wait_for_jobs_to_complete_(false),
|
||||||
|
Loading…
Reference in New Issue
Block a user