diff --git a/db/column_family.cc b/db/column_family.cc index e4fb0a77d..4e8a82dac 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -989,6 +989,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( GetL0ThresholdSpeedupCompaction( mutable_cf_options.level0_file_num_compaction_trigger, mutable_cf_options.level0_slowdown_writes_trigger)) { + fprintf(stdout, "JJJ2\n"); write_controller_token_ = write_controller->GetCompactionPressureToken(); ROCKS_LOG_INFO( diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index 5ca2c41ea..5ffb25dba 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -372,6 +372,7 @@ Compaction* UniversalCompactionBuilder::PickCompaction() { const int kLevel0 = 0; score_ = vstorage_->CompactionScore(kLevel0); sorted_runs_ = CalculateSortedRuns(*vstorage_); + fprintf(stdout, "JJJ1\n"); if (sorted_runs_.size() == 0 || (vstorage_->FilesMarkedForPeriodicCompaction().empty() && diff --git a/db/db_impl/compacted_db_impl.cc b/db/db_impl/compacted_db_impl.cc index b65455437..e1c061c27 100644 --- a/db/db_impl/compacted_db_impl.cc +++ b/db/db_impl/compacted_db_impl.cc @@ -171,7 +171,9 @@ Status CompactedDBImpl::Open(const Options& options, std::unique_ptr db(new CompactedDBImpl(db_options, dbname)); Status s = db->Init(options); if (s.ok()) { - db->StartPeriodicWorkScheduler(); + s = db->StartPeriodicWorkScheduler(); + } + if (s.ok()) { ROCKS_LOG_INFO(db->immutable_db_options_.info_log, "Opened the db as fully compacted mode"); LogFlush(db->immutable_db_options_.info_log); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index c540e82bb..93d62c063 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -768,7 +768,7 @@ void DBImpl::PrintStatistics() { } } -void DBImpl::StartPeriodicWorkScheduler() { +Status DBImpl::StartPeriodicWorkScheduler() { #ifndef ROCKSDB_LITE #ifndef NDEBUG @@ -778,7 +778,7 @@ void DBImpl::StartPeriodicWorkScheduler() { "DBImpl::StartPeriodicWorkScheduler:DisableScheduler", &disable_scheduler); if (disable_scheduler) { - return; + return Status::OK(); } #endif // !NDEBUG @@ -789,10 +789,11 @@ void DBImpl::StartPeriodicWorkScheduler() { &periodic_work_scheduler_); } - periodic_work_scheduler_->Register( + return periodic_work_scheduler_->Register( this, mutable_db_options_.stats_dump_period_sec, mutable_db_options_.stats_persist_period_sec); #endif // !ROCKSDB_LITE + return Status::OK(); } // esitmate the total size of stats_history_ @@ -1226,7 +1227,7 @@ Status DBImpl::SetDBOptions( mutable_db_options_.stats_persist_period_sec) { mutex_.Unlock(); periodic_work_scheduler_->Unregister(this); - periodic_work_scheduler_->Register( + s = periodic_work_scheduler_->Register( this, new_options.stats_dump_period_sec, new_options.stats_persist_period_sec); mutex_.Lock(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index d08652d06..164d8ef3c 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1872,7 +1872,7 @@ class DBImpl : public DB { LogBuffer* log_buffer); // Schedule background tasks - void StartPeriodicWorkScheduler(); + Status StartPeriodicWorkScheduler(); void PrintStatistics(); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index c55448077..4dc01bc53 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -285,6 +285,7 @@ Status DBImpl::FlushMemTableToOutputFile( assert(storage_info); VersionStorageInfo::LevelSummaryStorage tmp; + fprintf(stdout, "JJJ4\n"); ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", column_family_name.c_str(), storage_info->LevelSummary(&tmp)); @@ -730,6 +731,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( assert(storage_info); VersionStorageInfo::LevelSummaryStorage tmp; + fprintf(stdout, "JJJ3\n"); ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", column_family_name.c_str(), storage_info->LevelSummary(&tmp)); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index dbbedf049..101da5fa1 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1956,8 +1956,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, persist_options_status.ToString().c_str()); } if (s.ok()) { - impl->StartPeriodicWorkScheduler(); - } else { + s = impl->StartPeriodicWorkScheduler(); + } + if (!s.ok()) { for (auto* h : *handles) { delete h; } diff --git a/db/periodic_work_scheduler.cc b/db/periodic_work_scheduler.cc index 677eec90c..904b847f1 100644 --- a/db/periodic_work_scheduler.cc +++ b/db/periodic_work_scheduler.cc @@ -16,31 +16,41 @@ PeriodicWorkScheduler::PeriodicWorkScheduler( timer = std::unique_ptr(new Timer(clock.get())); } -void PeriodicWorkScheduler::Register(DBImpl* dbi, - unsigned int stats_dump_period_sec, - unsigned int stats_persist_period_sec) { +Status PeriodicWorkScheduler::Register(DBImpl* dbi, + unsigned int stats_dump_period_sec, + unsigned int stats_persist_period_sec) { MutexLock l(&timer_mu_); static std::atomic initial_delay(0); timer->Start(); if (stats_dump_period_sec > 0) { - timer->Add([dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"), - initial_delay.fetch_add(1) % - static_cast(stats_dump_period_sec) * - kMicrosInSecond, - static_cast(stats_dump_period_sec) * kMicrosInSecond); + bool succeeded = timer->Add( + [dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"), + initial_delay.fetch_add(1) % + static_cast(stats_dump_period_sec) * kMicrosInSecond, + static_cast(stats_dump_period_sec) * kMicrosInSecond); + if (!succeeded) { + return Status::Aborted("Unable to add periodic task DumpStats"); + } } if (stats_persist_period_sec > 0) { - timer->Add( + bool succeeded = timer->Add( [dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"), initial_delay.fetch_add(1) % static_cast(stats_persist_period_sec) * kMicrosInSecond, static_cast(stats_persist_period_sec) * kMicrosInSecond); + if (!succeeded) { + return Status::Aborted("Unable to add periodic task PersistStats"); + } } - timer->Add([dbi]() { dbi->FlushInfoLog(); }, - GetTaskName(dbi, "flush_info_log"), - initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec * - kMicrosInSecond, - kDefaultFlushInfoLogPeriodSec * kMicrosInSecond); + bool succeeded = timer->Add( + [dbi]() { dbi->FlushInfoLog(); }, GetTaskName(dbi, "flush_info_log"), + initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec * + kMicrosInSecond, + kDefaultFlushInfoLogPeriodSec * kMicrosInSecond); + if (!succeeded) { + return Status::Aborted("Unable to add periodic task PersistStats"); + } + return Status::OK(); } void PeriodicWorkScheduler::Unregister(DBImpl* dbi) { diff --git a/db/periodic_work_scheduler.h b/db/periodic_work_scheduler.h index fe89ff567..e494c7eee 100644 --- a/db/periodic_work_scheduler.h +++ b/db/periodic_work_scheduler.h @@ -30,8 +30,8 @@ class PeriodicWorkScheduler { PeriodicWorkScheduler& operator=(const PeriodicWorkScheduler&) = delete; PeriodicWorkScheduler& operator=(PeriodicWorkScheduler&&) = delete; - void Register(DBImpl* dbi, unsigned int stats_dump_period_sec, - unsigned int stats_persist_period_sec); + Status Register(DBImpl* dbi, unsigned int stats_dump_period_sec, + unsigned int stats_persist_period_sec); void Unregister(DBImpl* dbi); diff --git a/util/timer.h b/util/timer.h index 736d0bf0a..6571dc7de 100644 --- a/util/timer.h +++ b/util/timer.h @@ -48,36 +48,38 @@ class Timer { ~Timer() { Shutdown(); } // Add a new function to run. - // fn_name has to be identical, otherwise, the new one overrides the existing - // one, regardless if the function is pending removed (invalid) or not. + // fn_name has to be identical, otherwise it will fail to add and return false // start_after_us is the initial delay. // repeat_every_us is the interval between ending time of the last call and // starting time of the next call. For example, repeat_every_us = 2000 and // the function takes 1000us to run. If it starts at time [now]us, then it // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us. // repeat_every_us == 0 means do not repeat. - void Add(std::function fn, - const std::string& fn_name, - uint64_t start_after_us, - uint64_t repeat_every_us) { - std::unique_ptr fn_info(new FunctionInfo( - std::move(fn), fn_name, clock_->NowMicros() + start_after_us, - repeat_every_us)); - { - InstrumentedMutexLock l(&mutex_); - auto it = map_.find(fn_name); - if (it == map_.end()) { - heap_.push(fn_info.get()); - map_.emplace(std::make_pair(fn_name, std::move(fn_info))); - } else { - // If it already exists, overriding it. - it->second->fn = std::move(fn_info->fn); - it->second->valid = true; - it->second->next_run_time_us = clock_->NowMicros() + start_after_us; - it->second->repeat_every_us = repeat_every_us; - } + bool Add(std::function fn, const std::string& fn_name, + uint64_t start_after_us, uint64_t repeat_every_us) { + auto fn_info = std::make_unique(std::move(fn), fn_name, 0, + repeat_every_us); + InstrumentedMutexLock l(&mutex_); + // Assign time within mutex to make sure the next_run_time is larger than + // the current running one + fn_info->next_run_time_us = clock_->NowMicros() + start_after_us; + // the new task start time should never before the current task executing + // time, as the executing task can only be running if it's next_run_time_us + // is due (<= clock_->NowMicros()). + if (executing_task_ && + fn_info->next_run_time_us < heap_.top()->next_run_time_us) { + return false; + } + auto it = map_.find(fn_name); + if (it == map_.end()) { + heap_.push(fn_info.get()); + map_.try_emplace(fn_name, std::move(fn_info)); + } else { + // timer doesn't support duplicated function name + return false; } cond_var_.SignalAll(); + return true; } void Cancel(const std::string& fn_name) { @@ -116,7 +118,7 @@ class Timer { } running_ = true; - thread_.reset(new port::Thread(&Timer::Run, this)); + thread_ = std::make_unique(&Timer::Run, this); return true; } @@ -140,8 +142,8 @@ class Timer { bool HasPendingTask() const { InstrumentedMutexLock l(&mutex_); - for (auto it = map_.begin(); it != map_.end(); it++) { - if (it->second->IsValid()) { + for (const auto& fn_info : map_) { + if (fn_info.second->IsValid()) { return true; } } @@ -155,7 +157,7 @@ class Timer { // here to bump current time and trigger Timer. See timer_test for example. // // Note: only support one caller of this method. - void TEST_WaitForRun(std::function callback = nullptr) { + void TEST_WaitForRun(const std::function& callback = nullptr) { InstrumentedMutexLock l(&mutex_); // It act as a spin lock while (executing_task_ || @@ -177,8 +179,8 @@ class Timer { size_t TEST_GetPendingTaskNum() const { InstrumentedMutexLock l(&mutex_); size_t ret = 0; - for (auto it = map_.begin(); it != map_.end(); it++) { - if (it->second->IsValid()) { + for (const auto& fn_info : map_) { + if (fn_info.second->IsValid()) { ret++; } } @@ -220,10 +222,13 @@ class Timer { executing_task_ = false; cond_var_.SignalAll(); - // Remove the work from the heap once it is done executing. + // Remove the work from the heap once it is done executing, make sure + // it's the same function after executing the work while mutex is + // released. // Note that we are just removing the pointer from the heap. Its // memory is still managed in the map (as it holds a unique ptr). // So current_fn is still a valid ptr. + assert(heap_.top() == current_fn); heap_.pop(); // current_fn may be cancelled already. @@ -234,6 +239,10 @@ class Timer { // Schedule new work into the heap with new time. heap_.push(current_fn); + } else { + // if current_fn is cancelled or no need to repeat, remove it from the + // map to avoid leak. + map_.erase(current_fn->name); } } else { cond_var_.TimedWait(current_fn->next_run_time_us); @@ -280,10 +289,10 @@ class Timer { // calls `Cancel()`. bool valid; - FunctionInfo(std::function&& _fn, const std::string& _name, + FunctionInfo(std::function&& _fn, std::string _name, const uint64_t _next_run_time_us, uint64_t _repeat_every_us) : fn(std::move(_fn)), - name(_name), + name(std::move(_name)), next_run_time_us(_next_run_time_us), repeat_every_us(_repeat_every_us), valid(true) {} diff --git a/util/timer_test.cc b/util/timer_test.cc index 9256fcd45..a04c098ea 100644 --- a/util/timer_test.cc +++ b/util/timer_test.cc @@ -273,33 +273,32 @@ TEST_F(TimerTest, AddSameFuncName) { ASSERT_TRUE(timer.Start()); int func_counter1 = 0; - timer.Add([&] { func_counter1++; }, "duplicated_func", kInitDelayUs, - kRepeat1Us); + ASSERT_TRUE(timer.Add([&] { func_counter1++; }, "duplicated_func", + kInitDelayUs, kRepeat1Us)); int func2_counter = 0; - timer.Add([&] { func2_counter++; }, "func2", kInitDelayUs, kRepeat2Us); + ASSERT_TRUE( + timer.Add([&] { func2_counter++; }, "func2", kInitDelayUs, kRepeat2Us)); - // New function with the same name should override the existing one + // New function with the same name should fail to add int func_counter2 = 0; - timer.Add([&] { func_counter2++; }, "duplicated_func", kInitDelayUs, - kRepeat1Us); + ASSERT_FALSE(timer.Add([&] { func_counter2++; }, "duplicated_func", + kInitDelayUs, kRepeat1Us)); ASSERT_EQ(0, func_counter1); ASSERT_EQ(0, func2_counter); - ASSERT_EQ(0, func_counter2); timer.TEST_WaitForRun( [&] { mock_clock_->SleepForMicroseconds(kInitDelayUs); }); - ASSERT_EQ(0, func_counter1); + ASSERT_EQ(1, func_counter1); ASSERT_EQ(1, func2_counter); - ASSERT_EQ(1, func_counter2); timer.TEST_WaitForRun([&] { mock_clock_->SleepForMicroseconds(kRepeat1Us); }); - ASSERT_EQ(0, func_counter1); + ASSERT_EQ(2, func_counter1); ASSERT_EQ(2, func2_counter); - ASSERT_EQ(2, func_counter2); + ASSERT_EQ(0, func_counter2); ASSERT_TRUE(timer.Shutdown()); }