Add APIs PauseBackgroundWork() and ContinueBackgroundWork()

Summary:
To support a new MongoDB capability, we need to make sure that we don't do any IO for a short period of time. For background, see:
* https://jira.mongodb.org/browse/SERVER-20704
* https://jira.mongodb.org/browse/SERVER-18899

To implement that, I add a new API calls PauseBackgroundWork() and ContinueBackgroundWork() which reuse the capability we already have in place for RefitLevel() function.

Test Plan: Added a new test in db_test. Made sure that test fails when PauseBackgroundWork() is commented out.

Reviewers: IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D47901
This commit is contained in:
Igor Canadi 2015-10-02 13:17:34 -07:00
parent a39897369a
commit 115427ef63
5 changed files with 98 additions and 35 deletions

View File

@ -260,7 +260,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
wal_manager_(db_options_, env_options_),
#endif // ROCKSDB_LITE
event_logger_(db_options_.info_log.get()),
bg_work_gate_closed_(false),
bg_work_paused_(0),
refitting_level_(false),
opened_successfully_(false) {
env_->GetAbsolutePath(dbname, &db_absolute_path_);
@ -1548,8 +1548,14 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
}
if (options.change_level) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[RefitLevel] waiting for background threads to stop");
s = PauseBackgroundWork();
if (s.ok()) {
s = ReFitLevel(cfd, final_output_level, options.target_level);
}
ContinueBackgroundWork();
}
LogFlush(db_options_.info_log);
{
@ -1747,6 +1753,25 @@ Status DBImpl::CompactFilesImpl(
}
#endif // ROCKSDB_LITE
Status DBImpl::PauseBackgroundWork() {
InstrumentedMutexLock guard_lock(&mutex_);
bg_work_paused_++;
while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_ > 0) {
bg_cv_.Wait();
}
return Status::OK();
}
Status DBImpl::ContinueBackgroundWork() {
InstrumentedMutexLock guard_lock(&mutex_);
assert(bg_work_paused_ > 0);
bg_work_paused_--;
if (bg_work_paused_ == 0) {
MaybeScheduleFlushOrCompaction();
}
return Status::OK();
}
void DBImpl::NotifyOnCompactionCompleted(
ColumnFamilyData* cfd, Compaction *c, const Status &st,
const CompactionJobStats& compaction_job_stats,
@ -1857,14 +1882,18 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
return minimum_level;
}
// REQUIREMENT: block all background work by calling PauseBackgroundWork()
// before calling this function
Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
assert(level < cfd->NumberLevels());
if (target_level >= cfd->NumberLevels()) {
return Status::InvalidArgument("Target level exceeds number of levels");
}
SuperVersion* superversion_to_free = nullptr;
SuperVersion* new_superversion = new SuperVersion();
std::unique_ptr<SuperVersion> superversion_to_free;
std::unique_ptr<SuperVersion> new_superversion(new SuperVersion());
Status status;
InstrumentedMutexLock guard_lock(&mutex_);
@ -1872,40 +1901,26 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
if (refitting_level_) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[ReFitLevel] another thread is refitting");
delete new_superversion;
return Status::NotSupported("another thread is refitting");
}
refitting_level_ = true;
// wait for all background threads to stop
bg_work_gate_closed_ = true;
while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[RefitLevel] waiting for background threads to stop: %d %d",
bg_compaction_scheduled_, bg_flush_scheduled_);
bg_cv_.Wait();
}
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
// move to a smaller level
int to_level = target_level;
if (target_level < 0) {
to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
}
Status status;
auto* vstorage = cfd->current()->storage_info();
if (to_level > level) {
if (level == 0) {
delete new_superversion;
return Status::NotSupported(
"Cannot change from level 0 to other levels.");
}
// Check levels are empty for a trivial move
for (int l = level + 1; l <= to_level; l++) {
if (vstorage->NumLevelFiles(l) > 0) {
delete new_superversion;
return Status::NotSupported(
"Levels between source and target are not empty for a move.");
}
@ -1913,8 +1928,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
}
if (to_level != level) {
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[%s] Before refitting:\n%s",
cfd->GetName().c_str(), cfd->current()->DebugString().data());
"[%s] Before refitting:\n%s", cfd->GetName().c_str(),
cfd->current()->DebugString().data());
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
@ -1926,14 +1941,13 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
f->marked_for_compaction);
}
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[%s] Apply version edit:\n%s",
cfd->GetName().c_str(), edit.DebugString().data());
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
edit.DebugString().data());
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir());
superversion_to_free = InstallSuperVersionAndScheduleWork(
cfd, new_superversion, mutable_cf_options);
new_superversion = nullptr;
superversion_to_free.reset(InstallSuperVersionAndScheduleWork(
cfd, new_superversion.release(), mutable_cf_options));
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[%s] LogAndApply: %s\n", cfd->GetName().c_str(),
@ -1941,16 +1955,13 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
if (status.ok()) {
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[%s] After refitting:\n%s",
cfd->GetName().c_str(), cfd->current()->DebugString().data());
"[%s] After refitting:\n%s", cfd->GetName().c_str(),
cfd->current()->DebugString().data());
}
}
refitting_level_ = false;
bg_work_gate_closed_ = false;
delete superversion_to_free;
delete new_superversion;
return status;
}
@ -2203,8 +2214,8 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
// Compaction may introduce data race to DB open
return;
}
if (bg_work_gate_closed_) {
// gate closed for background work
if (bg_work_paused_ > 0) {
// we paused the background work
return;
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions

View File

@ -139,6 +139,9 @@ class DBImpl : public DB {
const int output_level,
const int output_path_id = -1) override;
virtual Status PauseBackgroundWork() override;
virtual Status ContinueBackgroundWork() override;
using DB::SetOptions;
Status SetOptions(
ColumnFamilyHandle* column_family,
@ -746,8 +749,8 @@ class DBImpl : public DB {
// Unified interface for logging events
EventLogger event_logger_;
// A value of true temporarily disables scheduling of background work
bool bg_work_gate_closed_;
// A value of >0 temporarily disables scheduling of background work
int bg_work_paused_;
// Guard against multiple concurrent refitting
bool refitting_level_;

View File

@ -5664,6 +5664,14 @@ class ModelDB: public DB {
return Status::NotSupported("Not supported operation.");
}
Status PauseBackgroundWork() override {
return Status::NotSupported("Not supported operation.");
}
Status ContinueBackgroundWork() override {
return Status::NotSupported("Not supported operation.");
}
using DB::NumberLevels;
virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
return 1;
@ -9634,6 +9642,33 @@ TEST_F(DBTest, AddExternalSstFileMultiThreaded) {
INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
::testing::Values(1, 4));
TEST_F(DBTest, PauseBackgroundWorkTest) {
Options options;
options.write_buffer_size = 100000; // Small write buffer
options = CurrentOptions(options);
Reopen(options);
std::vector<std::thread> threads;
std::atomic<bool> done;
db_->PauseBackgroundWork();
threads.emplace_back([&]() {
Random rnd(301);
for (int i = 0; i < 10000; ++i) {
Put(RandomString(&rnd, 10), RandomString(&rnd, 10));
}
done.store(true);
});
env_->SleepForMicroseconds(200000);
// make sure the thread is not done
ASSERT_EQ(false, done.load());
db_->ContinueBackgroundWork();
for (auto& t : threads) {
t.join();
}
// now it's done
ASSERT_EQ(true, done.load());
}
} // namespace rocksdb
#endif

View File

@ -524,6 +524,13 @@ class DB {
return CompactFiles(compact_options, DefaultColumnFamily(),
input_file_names, output_level, output_path_id);
}
// This function will wait until all currently running background processes
// finish. After it returns, no background process will be run until
// UnblockBackgroundWork is called
virtual Status PauseBackgroundWork() = 0;
virtual Status ContinueBackgroundWork() = 0;
// Number of levels used for this DB.
virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0;
virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); }

View File

@ -169,6 +169,13 @@ class StackableDB : public DB {
output_level, output_path_id);
}
virtual Status PauseBackgroundWork() override {
return db_->PauseBackgroundWork();
}
virtual Status ContinueBackgroundWork() override {
return db_->ContinueBackgroundWork();
}
using DB::NumberLevels;
virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
return db_->NumberLevels(column_family);