Make flush part of compaction process

This will enable user to use only 1 background thread.
This commit is contained in:
Igor Canadi 2014-04-07 13:46:51 -07:00
parent 2a0917b28e
commit e48348d196

View File

@ -276,9 +276,6 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
if (result.max_open_files != -1) { if (result.max_open_files != -1) {
ClipToRange(&result.max_open_files, 20, 1000000); ClipToRange(&result.max_open_files, 20, 1000000);
} }
if (result.max_background_flushes == 0) {
result.max_background_flushes = 1;
}
if (result.info_log == nullptr) { if (result.info_log == nullptr) {
Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env, Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env,
@ -1895,9 +1892,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} }
if (is_flush_pending) { if (is_flush_pending) {
// memtable flush needed // memtable flush needed
// max_background_compactions should not be 0, because that means
// flush will never get executed
assert(options_.max_background_flushes != 0);
if (bg_flush_scheduled_ < options_.max_background_flushes) { if (bg_flush_scheduled_ < options_.max_background_flushes) {
bg_flush_scheduled_++; bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
@ -1914,10 +1908,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} }
} }
// Schedule BGWorkCompaction if there's a compaction pending // Schedule BGWorkCompaction if there's a compaction pending (or a memtable
// flush, but the HIGH pool is not enabled)
// Do it only if max_background_compactions hasn't been reached and, in case // Do it only if max_background_compactions hasn't been reached and, in case
// bg_manual_only_ > 0, if it's a manual compaction. // bg_manual_only_ > 0, if it's a manual compaction.
if ((manual_compaction_ || is_compaction_needed) && if ((manual_compaction_ || is_compaction_needed ||
(is_flush_pending && options_.max_background_flushes == 0)) &&
(!bg_manual_only_ || manual_compaction_)) { (!bg_manual_only_ || manual_compaction_)) {
if (bg_compaction_scheduled_ < options_.max_background_compactions) { if (bg_compaction_scheduled_ < options_.max_background_compactions) {
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
@ -2134,14 +2130,57 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
*madeProgress = false; *madeProgress = false;
mutex_.AssertHeld(); mutex_.AssertHeld();
unique_ptr<Compaction> c;
bool is_manual = (manual_compaction_ != nullptr) && bool is_manual = (manual_compaction_ != nullptr) &&
(manual_compaction_->in_progress == false); (manual_compaction_->in_progress == false);
if (is_manual) {
// another thread cannot pick up the same work
manual_compaction_->in_progress = true;
}
// FLUSH preempts compaction
autovector<ColumnFamilyData*> to_delete;
Status flush_stat;
for (auto cfd : *versions_->GetColumnFamilySet()) {
while (cfd->imm()->IsFlushPending()) {
LogToBuffer(
log_buffer,
"BackgroundCompaction doing FlushMemTableToOutputFile, "
"compaction slots available %d",
options_.max_background_compactions - bg_compaction_scheduled_);
cfd->Ref();
flush_stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state,
log_buffer);
if (cfd->Unref()) {
to_delete.push_back(cfd);
}
if (!flush_stat.ok()) {
if (is_manual) {
manual_compaction_->status = flush_stat;
manual_compaction_->done = true;
manual_compaction_->in_progress = false;
manual_compaction_ = nullptr;
}
break;
}
}
if (!flush_stat.ok()) {
break;
}
}
for (auto cfd : to_delete) {
delete cfd;
}
if (!flush_stat.ok()) {
return flush_stat;
}
unique_ptr<Compaction> c;
InternalKey manual_end_storage; InternalKey manual_end_storage;
InternalKey* manual_end = &manual_end_storage; InternalKey* manual_end = &manual_end_storage;
if (is_manual) { if (is_manual) {
ManualCompaction* m = manual_compaction_; ManualCompaction* m = manual_compaction_;
m->in_progress = true; assert(m->in_progress);
c.reset(m->cfd->CompactRange(m->input_level, m->output_level, m->begin, c.reset(m->cfd->CompactRange(m->input_level, m->output_level, m->begin,
m->end, &manual_end)); m->end, &manual_end));
if (!c) { if (!c) {
@ -2510,7 +2549,23 @@ Status DBImpl::ProcessKeyValueCompaction(
compaction_filter = compaction_filter_from_factory.get(); compaction_filter = compaction_filter_from_factory.get();
} }
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { while (input->Valid() && !shutting_down_.Acquire_Load() &&
!cfd->IsDropped()) {
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (cfd->imm()->IsFlushPending()) {
FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
Slice key; Slice key;
Slice value; Slice value;
// If is_compaction_v2 is on, kv-pairs are reset to the prefix batch. // If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.