Rewritten system for scheduling background work

Summary:
When scaling to higher number of column families, the worst bottleneck was MaybeScheduleFlushOrCompaction(), which did a for loop over all column families while holding a mutex. This patch addresses the issue.

The approach is similar to our earlier efforts: instead of a pull-model, where we do something for every column family, we can do a push-based model -- when we detect that column family is ready to be flushed/compacted, we add it to the flush_queue_/compaction_queue_. That way we don't need to loop over every column family in MaybeScheduleFlushOrCompaction.

Here are the performance results:

Command:

    ./db_bench --write_buffer_size=268435456 --db_write_buffer_size=268435456 --db=/fast-rocksdb-tmp/rocks_lots_of_cf --use_existing_db=0 --open_files=55000 --statistics=1 --histogram=1 --disable_data_sync=1 --max_write_buffer_number=2 --sync=0 --benchmarks=fillrandom --threads=16 --num_column_families=5000  --disable_wal=1 --max_background_flushes=16 --max_background_compactions=16 --level0_file_num_compaction_trigger=2 --level0_slowdown_writes_trigger=2 --level0_stop_writes_trigger=3 --hard_rate_limit=1 --num=33333333 --writes=33333333

Before the patch:

     fillrandom   :      26.950 micros/op 37105 ops/sec;    4.1 MB/s

After the patch:

      fillrandom   :      17.404 micros/op 57456 ops/sec;    6.4 MB/s

Next bottleneck is VersionSet::AddLiveFiles, which is painfully slow when we have a lot of files. This is coming in the next patch, but when I removed that code, here's what I got:

      fillrandom   :       7.590 micros/op 131758 ops/sec;   14.6 MB/s

Test Plan:
make check

two stress tests:

Big number of compactions and flushes:

    ./db_stress --threads=30 --ops_per_thread=20000000 --max_key=10000 --column_families=20 --clear_column_family_one_in=10000000 --verify_before_write=0  --reopen=15 --max_background_compactions=10 --max_background_flushes=10 --db=/fast-rocksdb-tmp/db_stress --prefixpercent=0 --iterpercent=0 --writepercent=75 --db_write_buffer_size=2000000

max_background_flushes=0, to verify that this case also works correctly

    ./db_stress --threads=30 --ops_per_thread=2000000 --max_key=10000 --column_families=20 --clear_column_family_one_in=10000000 --verify_before_write=0  --reopen=3 --max_background_compactions=3 --max_background_flushes=0 --db=/fast-rocksdb-tmp/db_stress --prefixpercent=0 --iterpercent=0 --writepercent=75 --db_write_buffer_size=2000000

Reviewers: ljin, rven, yhchiang, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D30123
This commit is contained in:
Igor Canadi 2014-12-19 20:38:12 +01:00
parent a3001b1d3d
commit fdb6be4e24
5 changed files with 326 additions and 179 deletions

View File

@ -223,14 +223,11 @@ void SuperVersionUnrefHandle(void* ptr) {
}
} // anonymous namespace
ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* _dummy_versions,
Cache* _table_cache,
WriteBuffer* write_buffer,
const ColumnFamilyOptions& cf_options,
const DBOptions* db_options,
const EnvOptions& env_options,
ColumnFamilySet* column_family_set)
ColumnFamilyData::ColumnFamilyData(
uint32_t id, const std::string& name, Version* _dummy_versions,
Cache* _table_cache, WriteBuffer* write_buffer,
const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
const EnvOptions& env_options, ColumnFamilySet* column_family_set)
: id_(id),
name_(name),
dummy_versions_(_dummy_versions),
@ -250,7 +247,9 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
next_(nullptr),
prev_(nullptr),
log_number_(0),
column_family_set_(column_family_set) {
column_family_set_(column_family_set),
pending_flush_(false),
pending_compaction_(false) {
Ref();
// if _dummy_versions is nullptr, then this is a dummy column family.
@ -285,10 +284,14 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
new LevelCompactionPicker(ioptions_, &internal_comparator_));
}
Log(InfoLogLevel::INFO_LEVEL,
ioptions_.info_log, "Options for column family \"%s\":\n",
name.c_str());
options_.Dump(ioptions_.info_log);
if (column_family_set_->NumberOfColumnFamilies() < 10) {
Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
"--------------- Options for column family [%s]:\n", name.c_str());
options_.Dump(ioptions_.info_log);
} else {
Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
"\t(skipping printing options)\n");
}
}
RecalculateWriteStallConditions(mutable_cf_options_);
@ -313,6 +316,11 @@ ColumnFamilyData::~ColumnFamilyData() {
current_->Unref();
}
// It would be wrong if this ColumnFamilyData is in flush_queue_ or
// compaction_queue_ and we destroyed it
assert(!pending_flush_);
assert(!pending_compaction_);
if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
@ -434,6 +442,10 @@ void ColumnFamilyData::CreateNewMemtable(
mem_->Ref();
}
bool ColumnFamilyData::NeedsCompaction() const {
return compaction_picker_->NeedsCompaction(current_->storage_info());
}
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(

View File

@ -210,8 +210,11 @@ class ColumnFamilyData {
// See documentation in compaction_picker.h
// REQUIRES: DB mutex held
bool NeedsCompaction() const;
// REQUIRES: DB mutex held
Compaction* PickCompaction(const MutableCFOptions& mutable_options,
LogBuffer* log_buffer);
// REQUIRES: DB mutex held
Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, uint32_t output_path_id,
@ -248,6 +251,7 @@ class ColumnFamilyData {
// if its reference count is zero and needs deletion or nullptr if not
// As argument takes a pointer to allocated SuperVersion to enable
// the clients to allocate SuperVersion outside of mutex.
// IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
port::Mutex* db_mutex,
const MutableCFOptions& mutable_cf_options);
@ -261,6 +265,12 @@ class ColumnFamilyData {
bool triggered_flush_slowdown,
bool triggered_flush_stop);
// Protected by DB mutex
void set_pending_flush(bool value) { pending_flush_ = value; }
void set_pending_compaction(bool value) { pending_compaction_ = value; }
bool pending_flush() { return pending_flush_; }
bool pending_compaction() { return pending_compaction_; }
private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
@ -328,6 +338,13 @@ class ColumnFamilyData {
ColumnFamilySet* column_family_set_;
std::unique_ptr<WriteControllerToken> write_controller_token_;
// If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
bool pending_flush_;
// If true --> this ColumnFamily is currently present in
// DBImpl::compaction_queue_
bool pending_compaction_;
};
// ColumnFamilySet has interesting thread-safety requirements

View File

@ -695,15 +695,6 @@ Compaction* LevelCompactionPicker::PickCompaction(
Compaction* c = nullptr;
int level = -1;
// Compute the compactions needed. It is better to do it here
// and also in LogAndApply(), otherwise the values could be stale.
std::vector<uint64_t> size_being_compacted(NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
CompactionOptionsFIFO dummy_compaction_options_fifo;
vstorage->ComputeCompactionScore(
mutable_cf_options, dummy_compaction_options_fifo, size_being_compacted);
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
//
@ -766,6 +757,21 @@ Compaction* LevelCompactionPicker::PickCompaction(
compactions_in_progress_[level].insert(c);
c->mutable_cf_options_ = mutable_cf_options;
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
// being compacted). Since we just changed compaction score, we recalculate it
// here
{ // this piece of code recomputes compaction score
std::vector<uint64_t> size_being_compacted(NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
CompactionOptionsFIFO dummy_compaction_options_fifo;
vstorage->ComputeCompactionScore(mutable_cf_options,
dummy_compaction_options_fifo,
size_being_compacted);
}
return c;
}

View File

@ -89,6 +89,7 @@ void DumpRocksDBBuildVersion(Logger * log);
struct DBImpl::WriteContext {
autovector<SuperVersion*> superversions_to_free_;
autovector<log::Writer*> logs_to_free_;
bool schedule_bg_work_ = false;
~WriteContext() {
for (auto& sv : superversions_to_free_) {
@ -205,8 +206,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
max_total_in_memory_state_(0),
is_snapshot_supported_(true),
write_buffer_(options.db_write_buffer_size),
tmp_batch_(),
bg_schedule_needed_(false),
unscheduled_flushes_(0),
unscheduled_compactions_(0),
bg_compaction_scheduled_(0),
bg_manual_only_(0),
bg_flush_scheduled_(0),
@ -272,6 +273,19 @@ DBImpl::~DBImpl() {
listeners_.clear();
flush_scheduler_.Clear();
while (!flush_queue_.empty()) {
auto cfd = PopFirstFromFlushQueue();
if (cfd->Unref()) {
delete cfd;
}
}
while (!compaction_queue_.empty()) {
auto cfd = PopFirstFromCompactionQueue();
if (cfd->Unref()) {
delete cfd;
}
}
if (default_cf_handle_ != nullptr) {
// we need to delete handle outside of lock because it does its own locking
mutex_.Unlock();
@ -1643,10 +1657,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
// SetNewMemtableAndNewLogFile() will release and reacquire mutex
// during execution
s = SetNewMemtableAndNewLogFile(cfd, &context);
cfd->imm()->FlushRequested();
MaybeScheduleFlushOrCompaction();
write_thread_.ExitWriteThread(&w, &w, s);
cfd->imm()->FlushRequested();
// schedule flush
SchedulePendingFlush(cfd);
MaybeScheduleFlushOrCompaction();
}
if (s.ok() && flush_options.wait) {
@ -1671,52 +1688,90 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
void DBImpl::MaybeScheduleFlushOrCompaction() {
mutex_.AssertHeld();
bg_schedule_needed_ = false;
if (bg_work_gate_closed_) {
// gate closed for backgrond work
// gate closed for background work
return;
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else {
bool is_flush_pending = false;
// no need to refcount since we're under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->imm()->IsFlushPending()) {
is_flush_pending = true;
}
}
if (is_flush_pending) {
// memtable flush needed
if (bg_flush_scheduled_ < db_options_.max_background_flushes) {
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
} else if (db_options_.max_background_flushes > 0) {
bg_schedule_needed_ = true;
}
}
bool is_compaction_needed = false;
// no need to refcount since we're under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->compaction_picker()->NeedsCompaction(
cfd->current()->storage_info())) {
is_compaction_needed = true;
break;
}
}
return;
} else if (bg_manual_only_) {
// manual only
return;
}
// Schedule BGWorkCompaction if there's a compaction pending (or a memtable
// flush, but the HIGH pool is not enabled)
// Do it only if max_background_compactions hasn't been reached and
// bg_manual_only_ == 0
if (!bg_manual_only_ &&
(is_compaction_needed ||
(is_flush_pending && db_options_.max_background_flushes == 0))) {
if (bg_compaction_scheduled_ < db_options_.max_background_compactions) {
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
} else {
bg_schedule_needed_ = true;
}
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < db_options_.max_background_flushes) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
}
if (db_options_.max_background_flushes == 0 &&
bg_compaction_scheduled_ < db_options_.max_background_compactions &&
unscheduled_flushes_ > 0) {
// special case where flush is executed by compaction thread
// (if max_background_flushes == 0).
// Compaction thread will execute all the flushes
unscheduled_flushes_ = 0;
if (unscheduled_compactions_ > 0) {
// bg compaction will execute one compaction
unscheduled_compactions_--;
}
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
}
while (bg_compaction_scheduled_ < db_options_.max_background_compactions &&
unscheduled_compactions_ > 0) {
bg_compaction_scheduled_++;
unscheduled_compactions_--;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
}
}
void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
assert(!cfd->pending_compaction());
cfd->Ref();
compaction_queue_.push_back(cfd);
cfd->set_pending_compaction(true);
}
ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
assert(!compaction_queue_.empty());
auto cfd = *compaction_queue_.begin();
compaction_queue_.pop_front();
assert(cfd->pending_compaction());
cfd->set_pending_compaction(false);
return cfd;
}
void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd) {
assert(!cfd->pending_flush());
cfd->Ref();
flush_queue_.push_back(cfd);
cfd->set_pending_flush(true);
}
ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() {
assert(!flush_queue_.empty());
auto cfd = *flush_queue_.begin();
flush_queue_.pop_front();
assert(cfd->pending_flush());
cfd->set_pending_flush(false);
return cfd;
}
void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd) {
if (!cfd->pending_flush() && cfd->imm()->IsFlushPending()) {
AddToFlushQueue(cfd);
++unscheduled_flushes_;
}
}
void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
if (!cfd->pending_compaction() && cfd->NeedsCompaction()) {
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
}
}
@ -1743,33 +1798,41 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
return bg_error_;
}
// call_status is failure if at least one flush was a failure. even if
// flushing one column family reports a failure, we will continue flushing
// other column families. however, call_status will be a failure in that case.
Status call_status;
// refcounting in iteration
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref();
Status flush_status;
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
while (flush_status.ok() && cfd->imm()->IsFlushPending()) {
LogToBuffer(
log_buffer,
"BackgroundCallFlush doing FlushMemTableToOutputFile with column "
"family [%s], flush slots available %d",
cfd->GetName().c_str(),
db_options_.max_background_flushes - bg_flush_scheduled_);
flush_status = FlushMemTableToOutputFile(
cfd, mutable_cf_options, madeProgress, job_context, log_buffer);
ColumnFamilyData* cfd = nullptr;
while (!flush_queue_.empty()) {
// This cfd is already referenced
cfd = PopFirstFromFlushQueue();
if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
// can't flush this CF, try next one
if (cfd->Unref()) {
delete cfd;
}
continue;
}
if (call_status.ok() && !flush_status.ok()) {
call_status = flush_status;
}
cfd->Unref();
// found a flush!
break;
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
return call_status;
Status status;
if (cfd != nullptr) {
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
LogToBuffer(
log_buffer,
"Calling FlushMemTableToOutputFile with column "
"family [%s], flush slots available %d, compaction slots available %d",
cfd->GetName().c_str(),
db_options_.max_background_flushes - bg_flush_scheduled_,
db_options_.max_background_compactions - bg_compaction_scheduled_);
status = FlushMemTableToOutputFile(cfd, mutable_cf_options, madeProgress,
job_context, log_buffer);
if (cfd->Unref()) {
delete cfd;
}
}
return status;
}
void DBImpl::BackgroundCallFlush() {
@ -1829,13 +1892,8 @@ void DBImpl::BackgroundCallFlush() {
}
bg_flush_scheduled_--;
// Any time the mutex is released After finding the work to do, another
// thread might execute MaybeScheduleFlushOrCompaction(). It is possible
// that there is a pending job but it is not scheduled because of the
// max thread limit.
if (madeProgress || bg_schedule_needed_) {
MaybeScheduleFlushOrCompaction();
}
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
RecordFlushIOStats();
bg_cv_.SignalAll();
// IMPORTANT: there should be no code after calling SignalAll. This call may
@ -1909,17 +1967,8 @@ void DBImpl::BackgroundCallCompaction() {
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
// Previous compaction may have produced too many files in a level,
// So reschedule another compaction if we made progress in the
// last compaction.
//
// Also, any time the mutex is released After finding the work to do,
// another thread might execute MaybeScheduleFlushOrCompaction(). It is
// possible that there is a pending job but it is not scheduled because of
// the max thread limit.
if (madeProgress || bg_schedule_needed_) {
MaybeScheduleFlushOrCompaction();
}
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) {
// signal if
// * madeProgress -- need to wakeup DelayWrite
@ -1964,35 +2013,28 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
}
// FLUSH preempts compaction
Status flush_stat;
for (auto cfd : *versions_->GetColumnFamilySet()) {
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
while (cfd->imm()->IsFlushPending()) {
LogToBuffer(
log_buffer,
"BackgroundCompaction doing FlushMemTableToOutputFile, "
"compaction slots available %d",
db_options_.max_background_compactions - bg_compaction_scheduled_);
cfd->Ref();
flush_stat = FlushMemTableToOutputFile(
cfd, mutable_cf_options, madeProgress, job_context, log_buffer);
cfd->Unref();
if (!flush_stat.ok()) {
if (is_manual) {
manual_compaction_->status = flush_stat;
manual_compaction_->done = true;
manual_compaction_->in_progress = false;
manual_compaction_ = nullptr;
}
return flush_stat;
// TODO(icanadi) we should only do this if max_background_flushes == 0
// BackgroundFlush() will only execute a single flush. We keep calling it as
// long as there's more flushes to be done
while (!flush_queue_.empty()) {
LogToBuffer(
log_buffer,
"BackgroundCompaction calling BackgroundFlush. flush slots available "
"%d, compaction slots available %d",
db_options_.max_background_flushes - bg_flush_scheduled_,
db_options_.max_background_compactions - bg_compaction_scheduled_);
auto flush_status = BackgroundFlush(madeProgress, job_context, log_buffer);
if (!flush_status.ok()) {
if (is_manual) {
manual_compaction_->status = flush_status;
manual_compaction_->done = true;
manual_compaction_->in_progress = false;
manual_compaction_ = nullptr;
}
return flush_status;
}
}
// Compaction makes a copy of the latest MutableCFOptions. It should be used
// throughout the compaction procedure to make sure consistency. It will
// eventually be installed into SuperVersion
unique_ptr<Compaction> c;
InternalKey manual_end_storage;
InternalKey* manual_end = &manual_end_storage;
@ -2014,22 +2056,53 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
((m->done || manual_end == nullptr)
? "(end)"
: manual_end->DebugString().c_str()));
} else {
// no need to refcount in iteration since it's always under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
// Pick up latest mutable CF Options and use it throughout the
// compaction job
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
if (!mutable_cf_options->disable_auto_compactions) {
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
// compaction is not necessary. Need to make sure mutex is held
// until we make a copy in the following code
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
if (c != nullptr) {
// update statistics
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs(0)->size());
break;
} else if (!compaction_queue_.empty()) {
// cfd is referenced here
auto cfd = PopFirstFromCompactionQueue();
// We unreference here because the following code will take a Ref() on
// this cfd if it is going to use it (Compaction class holds a
// reference).
// This will all happen under a mutex so we don't have to be afraid of
// somebody else deleting it.
if (cfd->Unref()) {
delete cfd;
// This was the last reference of the column family, so no need to
// compact.
return Status::OK();
}
// Pick up latest mutable CF Options and use it throughout the
// compaction job
// Compaction makes a copy of the latest MutableCFOptions. It should be used
// throughout the compaction procedure to make sure consistency. It will
// eventually be installed into SuperVersion
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
// compaction is not necessary. Need to make sure mutex is held
// until we make a copy in the following code
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
if (c != nullptr) {
// update statistics
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs(0)->size());
// There are three things that can change compaction score:
// 1) When flush or compaction finish. This case is covered by
// InstallSuperVersion()
// 2) When MutableCFOptions changes. This case is also covered by
// InstallSuperVersion(), because this is when the new options take
// effect.
// 3) When we Pick a new compaction, we "remove" those files being
// compacted from the calculation, which then influences compaction
// score. Here we check if we need the new compaction even without the
// files that are currently being compacted. If we need another
// compaction, we might be able to execute it in parallel, so we add it
// to the queue and schedule a new thread.
if (cfd->NeedsCompaction()) {
// Yes, we need more compactions!
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
MaybeScheduleFlushOrCompaction();
}
}
}
@ -2085,8 +2158,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
c->ReleaseCompactionFiles(status);
*madeProgress = true;
} else {
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
auto yield_callback = [&]() {
return CallFlushDuringCompaction(c->column_family_data(),
*c->mutable_cf_options(), job_context,
@ -2275,7 +2346,7 @@ void DBImpl::InstallSuperVersionBackground(
SuperVersion* DBImpl::InstallSuperVersion(
ColumnFamilyData* cfd, SuperVersion* new_sv,
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, bool dont_schedule_bg_work) {
mutex_.AssertHeld();
// Update max_total_in_memory_state_
@ -2289,10 +2360,15 @@ SuperVersion* DBImpl::InstallSuperVersion(
auto* old = cfd->InstallSuperVersion(
new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);
// We want to schedule potential flush or compactions since new options may
// have been picked up in this new version. New options may cause flush
// compaction trigger condition to change.
MaybeScheduleFlushOrCompaction();
// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions. dont_schedule_bg_work is true when scheduling from write
// thread and we don't want to add additional overhead. Callers promise to
// call SchedulePendingFlush() and MaybeScheduleFlushOrCompaction() eventually
if (!dont_schedule_bg_work) {
SchedulePendingFlush(cfd);
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
// Update max_total_in_memory_state_
max_total_in_memory_state_ =
@ -2848,9 +2924,10 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
break;
}
cfd->imm()->FlushRequested();
SchedulePendingFlush(cfd);
context.schedule_bg_work_ = true;
}
}
MaybeScheduleFlushOrCompaction();
} else if (UNLIKELY(write_buffer_.ShouldFlush())) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Flushing all column families. Write buffer is using %" PRIu64
@ -2865,6 +2942,8 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
break;
}
cfd->imm()->FlushRequested();
SchedulePendingFlush(cfd);
context.schedule_bg_work_ = true;
}
}
MaybeScheduleFlushOrCompaction();
@ -2986,6 +3065,10 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
}
write_thread_.ExitWriteThread(&w, last_writer, status);
if (context.schedule_bg_work_) {
MaybeScheduleFlushOrCompaction();
}
mutex_.Unlock();
if (status.IsTimedOut()) {
@ -3023,11 +3106,11 @@ Status DBImpl::DelayWrite(uint64_t expiration_time) {
}
Status DBImpl::ScheduleFlushes(WriteContext* context) {
bool schedule_bg_work = false;
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
schedule_bg_work = true;
auto status = SetNewMemtableAndNewLogFile(cfd, context);
SchedulePendingFlush(cfd);
context->schedule_bg_work_ = true;
if (cfd->Unref()) {
delete cfd;
}
@ -3035,9 +3118,6 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
return status;
}
}
if (schedule_bg_work) {
MaybeScheduleFlushOrCompaction();
}
return Status::OK();
}
@ -3113,7 +3193,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
new_mem->Ref();
cfd->SetMemtable(new_mem);
context->superversions_to_free_.push_back(
InstallSuperVersion(cfd, new_superversion, mutable_cf_options));
InstallSuperVersion(cfd, new_superversion, mutable_cf_options, true));
return s;
}
@ -3380,12 +3460,6 @@ Status DBImpl::DeleteFile(std::string name) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
{
MutexLock l(&mutex_);
// schedule flush if file deletion means we freed the space for flushes to
// continue
MaybeScheduleFlushOrCompaction();
}
return status;
}
@ -3620,7 +3694,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_));
impl->DeleteObsoleteFiles();
impl->MaybeScheduleFlushOrCompaction();
s = impl->db_directory_->Fsync();
}
}

View File

@ -362,6 +362,8 @@ class DBImpl : public DB {
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
void MaybeScheduleFlushOrCompaction();
void SchedulePendingFlush(ColumnFamilyData* cfd);
void SchedulePendingCompaction(ColumnFamilyData* cfd);
static void BGWorkCompaction(void* db);
static void BGWorkFlush(void* db);
void BackgroundCallCompaction();
@ -393,6 +395,12 @@ class DBImpl : public DB {
// hold the data set.
Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1);
// helper functions for adding and removing from flush & compaction queues
void AddToCompactionQueue(ColumnFamilyData* cfd);
ColumnFamilyData* PopFirstFromCompactionQueue();
void AddToFlushQueue(ColumnFamilyData* cfd);
ColumnFamilyData* PopFirstFromFlushQueue();
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;
@ -460,9 +468,32 @@ class DBImpl : public DB {
// State is protected with db mutex.
std::list<uint64_t> pending_outputs_;
// At least one compaction or flush job is pending but not yet scheduled
// because of the max background thread limit.
bool bg_schedule_needed_;
// flush_queue_ and compaction_queue_ hold column families that we need to
// flush and compact, respectively.
// A column family is inserted into flush_queue_ when it satisfies condition
// cfd->imm()->IsFlushPending()
// A column family is inserted into compaction_queue_ when it satisfied
// condition cfd->NeedsCompaction()
// Column families in this list are all Ref()-erenced
// TODO(icanadi) Provide some kind of ReferencedColumnFamily class that will
// do RAII on ColumnFamilyData
// Column families are in this queue when they need to be flushed or
// compacted. Consumers of these queues are flush and compaction threads. When
// column family is put on this queue, we increase unscheduled_flushes_ and
// unscheduled_compactions_. When these variables are bigger than zero, that
// means we need to schedule background threads for compaction and thread.
// Once the background threads are scheduled, we decrease unscheduled_flushes_
// and unscheduled_compactions_. That way we keep track of number of
// compaction and flush threads we need to schedule. This scheduling is done
// in MaybeScheduleFlushOrCompaction()
// invariant(column family present in flush_queue_ <==>
// ColumnFamilyData::pending_flush_ == true)
std::deque<ColumnFamilyData*> flush_queue_;
// invariant(column family present in compaction_queue_ <==>
// ColumnFamilyData::pending_compaction_ == true)
std::deque<ColumnFamilyData*> compaction_queue_;
int unscheduled_flushes_;
int unscheduled_compactions_;
// count how many background compactions are running or have been scheduled
int bg_compaction_scheduled_;
@ -553,9 +584,17 @@ class DBImpl : public DB {
ColumnFamilyData* cfd, JobContext* job_context,
const MutableCFOptions& mutable_cf_options);
SuperVersion* InstallSuperVersion(
ColumnFamilyData* cfd, SuperVersion* new_sv,
const MutableCFOptions& mutable_cf_options);
// All ColumnFamily state changes go through this function. Here we analyze
// the new state and we schedule background work if we detect that the new
// state needs flush or compaction.
// If dont_schedule_bg_work == true, then caller asks us to not schedule flush
// or compaction here, but it also promises to schedule needed background
// work. We use this to scheduling background compactions when we are in the
// write thread, which is very performance critical. Caller schedules
// background work as soon as it exits the write thread
SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, SuperVersion* new_sv,
const MutableCFOptions& mutable_cf_options,
bool dont_schedule_bg_work = false);
// Find Super version and reference it. Based on options, it might return
// the thread local cached one.