diff --git a/db/flush_job.cc b/db/flush_job.cc index bc4824af6..43dc87d68 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -192,19 +192,6 @@ void FlushJob::PickMemTable() { // path 0 for level 0 file. meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); - // If mempurge feature is activated, keep track of any potential - // memtables coming from a previous mempurge operation. - // Used for mempurge policy. - if (db_options_.experimental_mempurge_threshold > 0.0) { - contains_mempurge_outcome_ = false; - for (MemTable* mt : mems_) { - if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) { - contains_mempurge_outcome_ = true; - break; - } - } - } - base_ = cfd_->current(); base_->Ref(); // it is likely that we do not need this reference } @@ -246,8 +233,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, (!mems_.empty()) && MemPurgeDecider()) { mempurge_s = MemPurge(); if (!mempurge_s.ok()) { - // Mempurge is typically aborted when the new_mem output memtable - // is filled at more than XX % capacity (currently: 60%). + // Mempurge is typically aborted when the output + // bytes cannot be contained onto a single output memtable. if (mempurge_s.IsAborted()) { ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n", mempurge_s.ToString().c_str()); @@ -567,16 +554,9 @@ Status FlushJob::MemPurge() { !(new_mem->ShouldFlushNow())) { db_mutex_->Lock(); uint64_t new_mem_id = mems_[0]->GetID(); - // Copy lowest memtable ID - // House keeping work. - for (MemTable* mt : mems_) { - new_mem_id = mt->GetID() < new_mem_id ? mt->GetID() : new_mem_id; - // Note: if m is not a previous mempurge output memtable, - // nothing happens. - cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID()); - } + new_mem->SetID(new_mem_id); - cfd_->imm()->AddMemPurgeOutputID(new_mem_id); + // This addition will not trigger another flush, because // we do not call SchedulePendingFlush(). cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free); @@ -631,7 +611,20 @@ bool FlushJob::MemPurgeDecider() { // Payload and useful_payload (in bytes). // The useful payload ratio of a given MemTable // is estimated to be useful_payload/payload. - uint64_t payload = 0, useful_payload = 0; + uint64_t payload = 0, useful_payload = 0, entry_size = 0; + + // Local variables used repetitively inside the for-loop + // when iterating over the sampled entries. + Slice key_slice, value_slice; + ParsedInternalKey res; + SnapshotImpl min_snapshot; + std::string vget; + Status mget_s, parse_s; + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq = 0, sqno = 0, + min_seqno_snapshot = 0; + bool get_res, can_be_useful_payload, not_in_next_mems; + // If estimated_useful_payload is > threshold, // then flush to storage, else MemPurge. double estimated_useful_payload = 0.0; @@ -643,106 +636,136 @@ bool FlushJob::MemPurgeDecider() { ro.total_order_seek = true; // Iterate over each memtable of the set. - for (MemTable* mt : mems_) { - // If the memtable is the output of a previous mempurge, - // its approximate useful payload ratio is already calculated. - if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) { - // We make the assumption that this memtable is already - // free of garbage (garbage underestimation). - estimated_useful_payload += mt->ApproximateMemoryUsage(); - } else { - // Else sample from the table. - uint64_t nentries = mt->num_entries(); - // Corrected Cochran formula for small populations - // (converges to n0 for large populations). - uint64_t target_sample_size = - static_cast(ceil(n0 / (1.0 + (n0 / nentries)))); - std::unordered_set sentries = {}; - // Populate sample entries set. - mt->UniqueRandomSample(target_sample_size, &sentries); + for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_); + mem_iter++) { + MemTable* mt = *mem_iter; - // Estimate the garbage ratio by comparing if - // each sample corresponds to a valid entry. - for (const char* ss : sentries) { - ParsedInternalKey res; - Slice entry_slice = GetLengthPrefixedSlice(ss); - Status parse_s = - ParseInternalKey(entry_slice, &res, true /*log_err_key*/); - if (!parse_s.ok()) { - ROCKS_LOG_WARN(db_options_.info_log, - "Memtable Decider: ParseInternalKey did not parse " - "entry_slice %s" - "successfully.", - entry_slice.data()); - } - LookupKey lkey(res.user_key, kMaxSequenceNumber); - std::string vget; - Status s; - MergeContext merge_context; - SequenceNumber max_covering_tombstone_seq = 0, sqno = 0; + // Else sample from the table. + uint64_t nentries = mt->num_entries(); + // Corrected Cochran formula for small populations + // (converges to n0 for large populations). + uint64_t target_sample_size = + static_cast(ceil(n0 / (1.0 + (n0 / nentries)))); + std::unordered_set sentries = {}; + // Populate sample entries set. + mt->UniqueRandomSample(target_sample_size, &sentries); - // Pick the oldest existing snapshot that is more recent - // than the sequence number of the sampled entry. - SequenceNumber min_seqno_snapshot = kMaxSequenceNumber; - SnapshotImpl min_snapshot; - for (SequenceNumber seq_num : existing_snapshots_) { - if (seq_num > res.sequence && seq_num < min_seqno_snapshot) { - min_seqno_snapshot = seq_num; - } - } - min_snapshot.number_ = min_seqno_snapshot; - ro.snapshot = - min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr; + // Estimate the garbage ratio by comparing if + // each sample corresponds to a valid entry. + for (const char* ss : sentries) { + key_slice = GetLengthPrefixedSlice(ss); + parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/); + if (!parse_s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Memtable Decider: ParseInternalKey did not parse " + "key_slice %s successfully.", + key_slice.data()); + } - // Estimate if the sample entry is valid or not. - bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context, - &max_covering_tombstone_seq, &sqno, ro); - if (!gres) { - ROCKS_LOG_WARN( - db_options_.info_log, - "Memtable Get returned false when Get(sampled entry). " - "Yet each sample entry should exist somewhere in the memtable, " - "unrelated to whether it has been deleted or not."); - } - payload += entry_slice.size(); + // Size of the entry is "key size (+ value size if KV entry)" + entry_size = key_slice.size(); + if (res.type == kTypeValue) { + value_slice = + GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); + entry_size += value_slice.size(); + } - // TODO(bjlemaire): evaluate typeMerge. - // This is where the sampled entry is estimated to be - // garbage or not. Note that this is a garbage *estimation* - // because we do not include certain items such as - // CompactionFitlers triggered at flush, or if the same delete - // has been inserted twice or more in the memtable. - if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) { - useful_payload += entry_slice.size(); - } else if (((res.type == kTypeDeletion) || - (res.type == kTypeSingleDeletion)) && - s.IsNotFound() && gres) { - useful_payload += entry_slice.size(); + // Count entry bytes as payload. + payload += entry_size; + + LookupKey lkey(res.user_key, kMaxSequenceNumber); + + // Paranoia: zero out these values just in case. + max_covering_tombstone_seq = 0; + sqno = 0; + + // Pick the oldest existing snapshot that is more recent + // than the sequence number of the sampled entry. + min_seqno_snapshot = kMaxSequenceNumber; + for (SequenceNumber seq_num : existing_snapshots_) { + if (seq_num > res.sequence && seq_num < min_seqno_snapshot) { + min_seqno_snapshot = seq_num; } } - if (payload > 0) { - // We used the estimated useful payload ratio - // to evaluate how much of the total memtable is useful bytes. - estimated_useful_payload += - (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload); - ROCKS_LOG_INFO( - db_options_.info_log, - "Mempurge sampling - found garbage ratio from sampling: %f.\n", - (payload - useful_payload) * 1.0 / payload); - } else { + min_snapshot.number_ = min_seqno_snapshot; + ro.snapshot = + min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr; + + // Estimate if the sample entry is valid or not. + get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context, + &max_covering_tombstone_seq, &sqno, ro); + if (!get_res) { ROCKS_LOG_WARN( db_options_.info_log, - "Mempurge kSampling policy: null payload measured, and collected " - "sample size is %zu\n.", - sentries.size()); + "Memtable Get returned false when Get(sampled entry). " + "Yet each sample entry should exist somewhere in the memtable, " + "unrelated to whether it has been deleted or not."); + } + + // TODO(bjlemaire): evaluate typeMerge. + // This is where the sampled entry is estimated to be + // garbage or not. Note that this is a garbage *estimation* + // because we do not include certain items such as + // CompactionFitlers triggered at flush, or if the same delete + // has been inserted twice or more in the memtable. + + // Evaluate if the entry can be useful payload + // Situation #1: entry is a KV entry, was found in the memtable mt + // and the sequence numbers match. + can_be_useful_payload = (res.type == kTypeValue) && get_res && + mget_s.ok() && (sqno == res.sequence); + + // Situation #2: entry is a delete entry, was found in the memtable mt + // (because gres==true) and no valid KV entry is found. + // (note: duplicate delete entries are also taken into + // account here, because the sequence number 'sqno' + // in memtable->Get(&sqno) operation is set to be equal + // to the most recent delete entry as well). + can_be_useful_payload |= + ((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) && + mget_s.IsNotFound() && get_res && (sqno == res.sequence); + + // If there is a chance that the entry is useful payload + // Verify that the entry does not appear in the following memtables + // (memtables with greater memtable ID/larger sequence numbers). + if (can_be_useful_payload) { + not_in_next_mems = true; + for (auto next_mem_iter = mem_iter + 1; + next_mem_iter != std::end(mems_); next_mem_iter++) { + if ((*next_mem_iter) + ->Get(lkey, &vget, nullptr, &mget_s, &merge_context, + &max_covering_tombstone_seq, &sqno, ro)) { + not_in_next_mems = false; + break; + } + } + if (not_in_next_mems) { + useful_payload += entry_size; + } } } + if (payload > 0) { + // We use the estimated useful payload ratio to + // evaluate how many of the memtable bytes are useful bytes. + estimated_useful_payload += + (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload); + + ROCKS_LOG_INFO( + db_options_.info_log, + "Mempurge sampling - found garbage ratio from sampling: %f.\n", + (payload - useful_payload) * 1.0 / payload); + } else { + ROCKS_LOG_WARN(db_options_.info_log, + "Mempurge sampling: null payload measured, and collected " + "sample size is %zu\n.", + sentries.size()); + } } - // We convert the total number of useful paylaod bytes + // We convert the total number of useful payload bytes // into the proportion of memtable necessary to store all these bytes. // We compare this proportion with the threshold value. - return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) < - threshold; + return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) < + threshold); } Status FlushJob::WriteLevel0Table() { @@ -954,15 +977,6 @@ Status FlushJob::WriteLevel0Table() { stats.num_output_files_blob = static_cast(blobs.size()); - if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) { - // The db_mutex is held at this point. - for (MemTable* mt : mems_) { - // Note: if m is not a previous mempurge output memtable, - // nothing happens here. - cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID()); - } - } - RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); cfd_->internal_stats()->AddCFStats( diff --git a/db/flush_job.h b/db/flush_job.h index 9050657de..81b4e86dd 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -191,9 +191,6 @@ class FlushJob { const std::string full_history_ts_low_; BlobFileCompletionCallback* blob_callback_; - - // Used when experimental_mempurge_threshold > 0.0. - bool contains_mempurge_outcome_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 3927a3f03..e522d2207 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -5,10 +5,12 @@ // #include "db/memtable_list.h" +#include #include #include #include #include + #include "db/db_impl/db_impl.h" #include "db/memtable.h" #include "db/range_tombstone_fragmenter.h" @@ -340,6 +342,14 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); const auto& memlist = current_->memlist_; bool atomic_flush = false; + + // Note: every time MemTableList::Add(mem) is called, it adds the new mem + // at the FRONT of the memlist (memlist.push_front(mem)). Therefore, by + // iterating through the memlist starting at the end, the vector + // ret is filled with memtables already sorted in increasing MemTable ID. + // However, when the mempurge feature is activated, new memtables with older + // IDs will be added to the memlist. Therefore we std::sort(ret) at the end to + // return a vector of memtables sorted by increasing memtable ID. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { @@ -361,6 +371,15 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, if (!atomic_flush || num_flush_not_started_ == 0) { flush_requested_ = false; // start-flush request is complete } + + // Sort the list of memtables by increasing memtable ID. + // This is useful when the mempurge feature is activated + // and the memtables are not guaranteed to be sorted in + // the memlist vector. + std::sort(ret->begin(), ret->end(), + [](const MemTable* m1, const MemTable* m2) -> bool { + return m1->GetID() < m2->GetID(); + }); } void MemTableList::RollbackMemtableFlush(const autovector& mems, diff --git a/db/memtable_list.h b/db/memtable_list.h index 6dde85016..b73b7c203 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -390,20 +390,6 @@ class MemTableList { // not freed, but put into a vector for future deref and reclamation. void RemoveOldMemTables(uint64_t log_number, autovector* to_delete); - void AddMemPurgeOutputID(uint64_t mid) { mempurged_ids_.insert(mid); } - - void RemoveMemPurgeOutputID(uint64_t mid) { - if (mempurged_ids_.find(mid) != mempurged_ids_.end()) { - mempurged_ids_.erase(mid); - } - } - - bool IsMemPurgeOutput(uint64_t mid) { - if (mempurged_ids_.find(mid) == mempurged_ids_.end()) { - return false; - } - return true; - } private: friend Status InstallMemtableAtomicFlushResults( @@ -450,10 +436,6 @@ class MemTableList { // Cached value of current_->HasHistory(). std::atomic current_has_history_; - - // Store the IDs of the memtables installed in this - // list that result from a mempurge operation. - std::unordered_set mempurged_ids_; }; // Installs memtable atomic flush results. diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 6974831fd..934a0085a 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -198,8 +198,8 @@ class MemTableRep { // Returns a vector of unique random memtable entries of approximate // size 'target_sample_size' (this size is not strictly enforced). - virtual void UniqueRandomSample(const uint64_t& num_entries, - const uint64_t& target_sample_size, + virtual void UniqueRandomSample(const uint64_t num_entries, + const uint64_t target_sample_size, std::unordered_set* entries) { (void)num_entries; (void)target_sample_size; diff --git a/memtable/skiplistrep.cc b/memtable/skiplistrep.cc index abe7144ab..d7f78672f 100644 --- a/memtable/skiplistrep.cc +++ b/memtable/skiplistrep.cc @@ -97,8 +97,8 @@ public: return (end_count >= start_count) ? (end_count - start_count) : 0; } - void UniqueRandomSample(const uint64_t& num_entries, - const uint64_t& target_sample_size, + void UniqueRandomSample(const uint64_t num_entries, + const uint64_t target_sample_size, std::unordered_set* entries) override { entries->clear(); // Avoid divide-by-0.