diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index ed1665b68..b3e435472 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -695,6 +695,7 @@ TEST_F(DBFlushTest, MemPurgeBasic) { options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. options.experimental_allow_mempurge = true; + options.experimental_mempurge_policy = MemPurgePolicy::kAlways; ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; uint32_t sst_count = 0; @@ -808,7 +809,7 @@ TEST_F(DBFlushTest, MemPurgeBasic) { // Assert that at least one flush to storage has been performed ASSERT_GT(sst_count, EXPECTED_SST_COUNT); // (which will consequently increase the number of mempurges recorded too). - ASSERT_EQ(mempurge_count, mempurge_count_record); + ASSERT_GE(mempurge_count, mempurge_count_record); // Assert that there is no data corruption, even with // a flush to storage. @@ -842,6 +843,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. options.experimental_allow_mempurge = true; + options.experimental_mempurge_policy = MemPurgePolicy::kAlways; ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; @@ -1045,6 +1047,7 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. options.experimental_allow_mempurge = true; + options.experimental_mempurge_policy = MemPurgePolicy::kAlways; ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; @@ -1116,10 +1119,11 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) { options.inplace_update_support = false; options.allow_concurrent_memtable_write = true; - // Enforce size of a single MemTable to 1MB. + // Enforce size of a single MemTable to 128KB. options.write_buffer_size = 128 << 10; // Activate the MemPurge prototype. options.experimental_allow_mempurge = true; + options.experimental_mempurge_policy = MemPurgePolicy::kAlways; ASSERT_OK(TryReopen(options)); const size_t KVSIZE = 10; @@ -1158,7 +1162,7 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) { // more than would fit in maximum allowed memtables. Random rnd(719); const size_t NUM_REPEAT = 100; - const size_t RAND_KEY_LENGTH = 8192; + const size_t RAND_KEY_LENGTH = 4096; const size_t RAND_VALUES_LENGTH = 1024; std::vector values_default(KVSIZE), values_pikachu(KVSIZE); @@ -1235,7 +1239,9 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) { const uint32_t EXPECTED_SST_COUNT = 0; EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); - EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); + if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) { + EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); + } ReopenWithColumnFamilies({"default", "pikachu"}, options); // Check that there was no data corruption anywhere, diff --git a/db/flush_job.cc b/db/flush_job.cc index 09d348cc8..9ce20b2b4 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -192,6 +192,19 @@ void FlushJob::PickMemTable() { // path 0 for level 0 file. meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); + // If mempurge feature is activated, keep track of any potential + // memtables coming from a previous mempurge operation. + // Used for mempurge policy. + if (db_options_.experimental_allow_mempurge) { + contains_mempurge_outcome_ = false; + for (MemTable* mt : mems_) { + if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) { + contains_mempurge_outcome_ = true; + break; + } + } + } + base_ = cfd_->current(); base_->Ref(); // it is likely that we do not need this reference } @@ -230,7 +243,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, Status mempurge_s = Status::NotFound("No MemPurge."); if (db_options_.experimental_allow_mempurge && (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) && - (!mems_.empty())) { + (!mems_.empty()) && MemPurgeDecider()) { mempurge_s = MemPurge(); if (!mempurge_s.ok()) { // Mempurge is typically aborted when the new_mem output memtable @@ -339,7 +352,15 @@ Status FlushJob::MemPurge() { db_mutex_->Unlock(); assert(!mems_.empty()); + // Measure purging time. + const uint64_t start_micros = clock_->NowMicros(); + const uint64_t start_cpu_micros = clock_->CPUNanos() / 1000; + MemTable* new_mem = nullptr; + // For performance/log investigation purposes: + // look at how much useful payload we harvest in the new_mem. + // This value is then printed to the DB log. + double new_mem_capacity = 0.0; // Create two iterators, one for the memtable data (contains // info from puts + deletes), and one for the memtable @@ -392,8 +413,8 @@ Status FlushJob::MemPurge() { // or at least range tombstones, copy over the info // to the new memtable. if (iter->Valid() || !range_del_agg->IsEmpty()) { - // Arbitrary heuristic: maxSize is 60% cpacity. - size_t maxSize = ((mutable_cf_options_.write_buffer_size + 6U) / 10U); + // MaxSize is the size of a memtable. + size_t maxSize = mutable_cf_options_.write_buffer_size; std::unique_ptr compaction_filter; if (ioptions->compaction_filter_factory != nullptr && ioptions->compaction_filter_factory->ShouldFilterTableFileCreation( @@ -480,6 +501,7 @@ Status FlushJob::MemPurge() { // and destroy new_mem. if (new_mem->ApproximateMemoryUsage() > maxSize) { s = Status::Aborted("Mempurge filled more than one memtable."); + new_mem_capacity = 1.0; break; } } @@ -524,6 +546,7 @@ Status FlushJob::MemPurge() { // and destroy new_mem. if (new_mem->ApproximateMemoryUsage() > maxSize) { s = Status::Aborted(Slice("Mempurge filled more than one memtable.")); + new_mem_capacity = 1.0; break; } } @@ -538,19 +561,35 @@ Status FlushJob::MemPurge() { new_mem->SetFirstSequenceNumber(new_first_seqno); // The new_mem is added to the list of immutable memtables - // only if it filled at less than 60% capacity (arbitrary heuristic). - if (new_mem->ApproximateMemoryUsage() < maxSize) { + // only if it filled at less than 100% capacity and isn't flagged + // as in need of being flushed. + if (new_mem->ApproximateMemoryUsage() < maxSize && + !(new_mem->ShouldFlushNow())) { db_mutex_->Lock(); + uint64_t new_mem_id = mems_[0]->GetID(); + // Copy lowest memtable ID + // House keeping work. + for (MemTable* mt : mems_) { + new_mem_id = mt->GetID() < new_mem_id ? mt->GetID() : new_mem_id; + // Note: if m is not a previous mempurge output memtable, + // nothing happens. + cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID()); + } + new_mem->SetID(new_mem_id); + cfd_->imm()->AddMemPurgeOutputID(new_mem_id); cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free, false /* -> trigger_flush=false: * adding this memtable * will not trigger a flush. */); + new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 / + mutable_cf_options_.write_buffer_size; new_mem->Ref(); db_mutex_->Unlock(); } else { s = Status::Aborted(Slice("Mempurge filled more than one memtable.")); + new_mem_capacity = 1.0; if (new_mem) { job_context_->memtables_to_free.push_back(new_mem); } @@ -572,10 +611,32 @@ Status FlushJob::MemPurge() { } else { TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful"); } + const uint64_t micros = clock_->NowMicros() - start_micros; + const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros; + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Mempurge lasted %" PRIu64 + " microseconds, and %" PRIu64 + " cpu " + "microseconds. Status is %s ok. Perc capacity: %f\n", + cfd_->GetName().c_str(), job_context_->job_id, micros, + cpu_micros, s.ok() ? "" : "not", new_mem_capacity); return s; } +bool FlushJob::MemPurgeDecider() { + MemPurgePolicy policy = db_options_.experimental_mempurge_policy; + if (policy == MemPurgePolicy::kAlways) { + return true; + } else if (policy == MemPurgePolicy::kAlternate) { + // Note: if at least one of the flushed memtables is + // an output of a previous mempurge process, then flush + // to storage. + return !(contains_mempurge_outcome_); + } + return false; +} + Status FlushJob::WriteLevel0Table() { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_FLUSH_WRITE_L0); @@ -762,8 +823,16 @@ Status FlushJob::WriteLevel0Table() { // Note that here we treat flush as level 0 compaction in internal stats InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); - stats.micros = clock_->NowMicros() - start_micros; - stats.cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros; + const uint64_t micros = clock_->NowMicros() - start_micros; + const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros; + stats.micros = micros; + stats.cpu_micros = cpu_micros; + + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Flush lasted %" PRIu64 + " microseconds, and %" PRIu64 " cpu microseconds.\n", + cfd_->GetName().c_str(), job_context_->job_id, micros, + cpu_micros); if (has_output) { stats.bytes_written = meta_.fd.GetFileSize(); @@ -777,12 +846,22 @@ Status FlushJob::WriteLevel0Table() { stats.num_output_files_blob = static_cast(blobs.size()); + if (db_options_.experimental_allow_mempurge && s.ok()) { + // The db_mutex is held at this point. + for (MemTable* mt : mems_) { + // Note: if m is not a previous mempurge output memtable, + // nothing happens here. + cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID()); + } + } + RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); cfd_->internal_stats()->AddCFStats( InternalStats::BYTES_FLUSHED, stats.bytes_written + stats.bytes_written_blob); RecordFlushIOStats(); + return s; } diff --git a/db/flush_job.h b/db/flush_job.h index b0a6bf2de..694dd71d2 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -123,6 +123,7 @@ class FlushJob { // recommend all users not to set this flag as true given that the MemPurge // process has not matured yet. Status MemPurge(); + bool MemPurgeDecider(); #ifndef ROCKSDB_LITE std::unique_ptr GetFlushJobInfo() const; #endif // !ROCKSDB_LITE @@ -190,6 +191,9 @@ class FlushJob { const std::string full_history_ts_low_; BlobFileCompletionCallback* blob_callback_; + + // Used when experimental_allow_mempurge set to true. + bool contains_mempurge_outcome_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable.h b/db/memtable.h index 6f908ae5b..93060941a 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -471,6 +471,9 @@ class MemTable { } #endif // !ROCKSDB_LITE + // Returns a heuristic flush decision + bool ShouldFlushNow(); + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -558,9 +561,6 @@ class MemTable { std::unique_ptr flush_job_info_; #endif // !ROCKSDB_LITE - // Returns a heuristic flush decision - bool ShouldFlushNow(); - // Updates flush_state_ using ShouldFlushNow() void UpdateFlushState(); diff --git a/db/memtable_list.h b/db/memtable_list.h index 22967c8c1..ada6469a1 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -389,6 +389,24 @@ class MemTableList { // not freed, but put into a vector for future deref and reclamation. void RemoveOldMemTables(uint64_t log_number, autovector* to_delete); + void AddMemPurgeOutputID(uint64_t mid) { + if (mempurged_ids_.find(mid) == mempurged_ids_.end()) { + mempurged_ids_.insert(mid); + } + } + + void RemoveMemPurgeOutputID(uint64_t mid) { + if (mempurged_ids_.find(mid) != mempurged_ids_.end()) { + mempurged_ids_.erase(mid); + } + } + + bool IsMemPurgeOutput(uint64_t mid) { + if (mempurged_ids_.find(mid) == mempurged_ids_.end()) { + return false; + } + return true; + } private: friend Status InstallMemtableAtomicFlushResults( @@ -433,6 +451,10 @@ class MemTableList { // Cached value of current_->HasHistory(). std::atomic current_has_history_; + + // Store the IDs of the memtables installed in this + // list that result from a mempurge operation. + std::unordered_set mempurged_ids_; }; // Installs memtable atomic flush results. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 220f90425..d7ecb5b3d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -369,6 +369,11 @@ struct DbPath { extern const char* kHostnameForDbHostId; +enum class MemPurgePolicy : char { + kAlternate = 0x00, + kAlways = 0x01, +}; + enum class CompactionServiceJobStatus : char { kSuccess, kFailure, @@ -785,6 +790,11 @@ struct DBOptions { // If true, allows for memtable purge instead of flush to storage. // (experimental). bool experimental_allow_mempurge = false; + // If experimental_allow_mempurge is true, will dictate MemPurge + // policy. + // Default: kAlternate + // (experimental). + MemPurgePolicy experimental_mempurge_policy = MemPurgePolicy::kAlternate; // Amount of data to build up in memtables across all column // families before writing to disk. diff --git a/options/db_options.cc b/options/db_options.cc index 53533c252..9a364d57b 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -47,6 +47,11 @@ static std::unordered_map info_log_level_string_map = {"FATAL_LEVEL", InfoLogLevel::FATAL_LEVEL}, {"HEADER_LEVEL", InfoLogLevel::HEADER_LEVEL}}; +static std::unordered_map + experimental_mempurge_policy_string_map = { + {"kAlternate", MemPurgePolicy::kAlternate}, + {"kAlways", MemPurgePolicy::kAlways}}; + static std::unordered_map db_mutable_options_type_info = { {"allow_os_buffer", @@ -196,6 +201,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, experimental_allow_mempurge), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"experimental_mempurge_policy", + OptionTypeInfo::Enum( + offsetof(struct ImmutableDBOptions, experimental_mempurge_policy), + &experimental_mempurge_policy_string_map)}, {"is_fd_close_on_exec", {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -546,6 +555,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) is_fd_close_on_exec(options.is_fd_close_on_exec), advise_random_on_open(options.advise_random_on_open), experimental_allow_mempurge(options.experimental_allow_mempurge), + experimental_mempurge_policy(options.experimental_mempurge_policy), db_write_buffer_size(options.db_write_buffer_size), write_buffer_manager(options.write_buffer_manager), access_hint_on_compaction_start(options.access_hint_on_compaction_start), @@ -682,6 +692,9 @@ void ImmutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.experimental_allow_mempurge: %d", experimental_allow_mempurge); + ROCKS_LOG_HEADER(log, + " Options.experimental_mempurge_policy: %d", + static_cast(experimental_mempurge_policy)); ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index e609563d2..188e688c3 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -55,6 +55,7 @@ struct ImmutableDBOptions { bool is_fd_close_on_exec; bool advise_random_on_open; bool experimental_allow_mempurge; + MemPurgePolicy experimental_mempurge_policy; size_t db_write_buffer_size; std::shared_ptr write_buffer_manager; DBOptions::AccessHint access_hint_on_compaction_start; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index c91814663..354453c23 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1000,6 +1000,19 @@ static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( return ROCKSDB_NAMESPACE::kSnappyCompression; // default value } +static enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy( + const char* mpolicy) { + assert(mpolicy); + if (!strcasecmp(mpolicy, "kAlways")) { + return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways; + } else if (!strcasecmp(mpolicy, "kAlternate")) { + return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; + } + + fprintf(stdout, "Cannot parse mempurge policy '%s'\n", mpolicy); + return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; +} + static std::string ColumnFamilyName(size_t i) { if (i == 0) { return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName; @@ -1137,6 +1150,9 @@ DEFINE_bool(allow_concurrent_memtable_write, true, DEFINE_bool(experimental_allow_mempurge, false, "Allow memtable garbage collection."); +DEFINE_string(experimental_mempurge_policy, "kAlternate", + "Specify memtable garbage collection policy."); + DEFINE_bool(inplace_update_support, ROCKSDB_NAMESPACE::Options().inplace_update_support, "Support in-place memtable update for smaller or same-size values"); @@ -4211,6 +4227,8 @@ class Benchmark { options.allow_concurrent_memtable_write = FLAGS_allow_concurrent_memtable_write; options.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge; + options.experimental_mempurge_policy = + StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str()); options.inplace_update_support = FLAGS_inplace_update_support; options.inplace_update_num_locks = FLAGS_inplace_update_num_locks; options.enable_write_thread_adaptive_yield =