diff --git a/db/builder.cc b/db/builder.cc index b37e7dd7d..16e5744d1 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -204,6 +204,8 @@ Status BuildTable( const Slice& value = c_iter.value(); const ParsedInternalKey& ikey = c_iter.ikey(); // Generate a rolling 64-bit hash of the key and values + // Note : + // Here "key" integrates 'sequence_number'+'kType'+'user key'. s = output_validator.Add(key, value); if (!s.ok()) { break; diff --git a/db/c.cc b/db/c.cc index 79fa5181d..3c719e6ff 100644 --- a/db/c.cc +++ b/db/c.cc @@ -3028,6 +3028,11 @@ unsigned char rocksdb_options_get_advise_random_on_open( return opt->rep.advise_random_on_open; } +void rocksdb_options_set_experimental_allow_mempurge(rocksdb_options_t* opt, + unsigned char v) { + opt->rep.experimental_allow_mempurge = v; +} + void rocksdb_options_set_access_hint_on_compaction_start( rocksdb_options_t* opt, int v) { switch(v) { diff --git a/db/column_family.cc b/db/column_family.cc index c168f2b1d..178186379 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -443,14 +443,26 @@ bool SuperVersion::Unref() { return previous_refs == 1; } -void SuperVersion::Cleanup() { +void SuperVersion::Cleanup(const bool noImmMemoryContribution) { assert(refs.load(std::memory_order_relaxed) == 0); + // Since this SuperVersion object is being deleted, + // decrement reference to the immutable MemtableList + // this SV object was pointing to. imm->Unref(&to_delete); MemTable* m = mem->Unref(); if (m != nullptr) { - auto* memory_usage = current->cfd()->imm()->current_memory_usage(); - assert(*memory_usage >= m->ApproximateMemoryUsage()); - *memory_usage -= m->ApproximateMemoryUsage(); + // Typically, if the m memtable was not made + // immutable, and therefore was not added to the + // imm list, it does not contribute to the imm + // memory footprint (and actually is not part of + // the 'imm' MemtableList at all). + // At the moment, noImmMemoryContribution is only + // used by the experimental 'MemPurge' prototype. + if (!noImmMemoryContribution) { + auto* memory_usage = current->cfd()->imm()->current_memory_usage(); + assert(*memory_usage >= m->ApproximateMemoryUsage()); + *memory_usage -= m->ApproximateMemoryUsage(); + } to_delete.push_back(m); } current->Unref(); @@ -1260,7 +1272,7 @@ void ColumnFamilyData::InstallSuperVersion( void ColumnFamilyData::InstallSuperVersion( SuperVersionContext* sv_context, InstrumentedMutex* db_mutex, - const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, bool noImmMemoryContribution) { SuperVersion* new_superversion = sv_context->new_superversion.release(); new_superversion->db_mutex = db_mutex; new_superversion->mutable_cf_options = mutable_cf_options; @@ -1290,7 +1302,7 @@ void ColumnFamilyData::InstallSuperVersion( new_superversion->write_stall_condition, GetName(), ioptions()); } if (old_superversion->Unref()) { - old_superversion->Cleanup(); + old_superversion->Cleanup(noImmMemoryContribution); sv_context->superversions_to_free.push_back(old_superversion); } } diff --git a/db/column_family.h b/db/column_family.h index 7ad560e44..cfba13171 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -222,7 +222,10 @@ struct SuperVersion { // Cleanup unrefs mem, imm and current. Also, it stores all memtables // that needs to be deleted in to_delete vector. Unrefing those // objects needs to be done in the mutex - void Cleanup(); + // The 'noImmMemoryContribution' is set to true if the memtable being + // dereferenced in this SuperVersion was not added to the Immutable + // memtable list. + void Cleanup(bool noImmMemoryContribution = false); void Init(ColumnFamilyData* new_cfd, MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current); @@ -454,7 +457,8 @@ class ColumnFamilyData { // IMPORTANT: Only call this from DBImpl::InstallSuperVersion() void InstallSuperVersion(SuperVersionContext* sv_context, InstrumentedMutex* db_mutex, - const MutableCFOptions& mutable_cf_options); + const MutableCFOptions& mutable_cf_options, + bool noImmMemoryContribution = false); void InstallSuperVersion(SuperVersionContext* sv_context, InstrumentedMutex* db_mutex); diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index b5d3026d8..ef062f510 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -24,6 +24,10 @@ namespace ROCKSDB_NAMESPACE { +// This is a static filter used for filtering +// kvs during the compaction process. +static std::string NEW_VALUE = "NewValue"; + class DBFlushTest : public DBTestBase { public: DBFlushTest() : DBTestBase("/db_flush_test", /*env_do_fsync=*/true) {} @@ -658,6 +662,373 @@ TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) { Close(); } +TEST_F(DBFlushTest, MemPurgeBasic) { + Options options = CurrentOptions(); + + // The following options are used to enforce several values that + // may already exist as default values to make this test resilient + // to default value updates in the future. + options.statistics = CreateDBStatistics(); + + // Record all statistics. + options.statistics->set_stats_level(StatsLevel::kAll); + + // create the DB if it's not already present + options.create_if_missing = true; + + // Useful for now as we are trying to compare uncompressed data savings on + // flush(). + options.compression = kNoCompression; + + // Prevent memtable in place updates. Should already be disabled + // (from Wiki: + // In place updates can be enabled by toggling on the bool + // inplace_update_support flag. However, this flag is by default set to + // false + // because this thread-safe in-place update support is not compatible + // with concurrent memtable writes. Note that the bool + // allow_concurrent_memtable_write is set to true by default ) + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). + options.write_buffer_size = 64 << 20; + // Activate the MemPurge prototype. + options.experimental_allow_mempurge = true; + ASSERT_OK(TryReopen(options)); + uint32_t mempurge_count = 0; + uint32_t flush_count = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::string KEY1 = "IamKey1"; + std::string KEY2 = "IamKey2"; + std::string KEY3 = "IamKey3"; + std::string KEY4 = "IamKey4"; + std::string KEY5 = "IamKey5"; + std::string VALUE1 = "IamValue1"; + std::string VALUE2 = "IamValue2"; + const std::string NOT_FOUND = "NOT_FOUND"; + + // Check simple operations (put-delete). + ASSERT_OK(Put(KEY1, VALUE1)); + ASSERT_OK(Put(KEY2, VALUE2)); + ASSERT_OK(Delete(KEY1)); + ASSERT_OK(Put(KEY2, VALUE1)); + ASSERT_OK(Put(KEY1, VALUE2)); + ASSERT_OK(Flush()); + + ASSERT_EQ(Get(KEY1), VALUE2); + ASSERT_EQ(Get(KEY2), VALUE1); + + ASSERT_OK(Delete(KEY1)); + ASSERT_EQ(Get(KEY1), NOT_FOUND); + ASSERT_OK(Flush()); + ASSERT_EQ(Get(KEY1), NOT_FOUND); + + // Heavy overwrite workload, + // more than would fit in maximum allowed memtables. + Random rnd(719); + const size_t NUM_REPEAT = 100000; + const size_t RAND_VALUES_LENGTH = 512; + std::string p_v1, p_v2, p_v3, p_v4, p_v5; + // Insertion of of K-V pairs, multiple times. + // Also insert DeleteRange + for (size_t i = 0; i < NUM_REPEAT; i++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v5 = rnd.RandomString(RAND_VALUES_LENGTH); + + ASSERT_OK(Put(KEY1, p_v1)); + ASSERT_OK(Put(KEY2, p_v2)); + ASSERT_OK(Put(KEY3, p_v3)); + ASSERT_OK(Put(KEY4, p_v4)); + ASSERT_OK(Put(KEY5, p_v5)); + + ASSERT_EQ(Get(KEY1), p_v1); + ASSERT_EQ(Get(KEY2), p_v2); + ASSERT_EQ(Get(KEY3), p_v3); + ASSERT_EQ(Get(KEY4), p_v4); + ASSERT_EQ(Get(KEY5), p_v5); + } + + // Check that there was at least one mempurge + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; + // Check that there was no flush to storage. + const uint32_t EXPECTED_FLUSH_COUNT = 0; + + EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT); + + Close(); +} + +TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { + Options options = CurrentOptions(); + + options.statistics = CreateDBStatistics(); + options.statistics->set_stats_level(StatsLevel::kAll); + options.create_if_missing = true; + options.compression = kNoCompression; + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). + options.write_buffer_size = 64 << 20; + // Activate the MemPurge prototype. + options.experimental_allow_mempurge = true; + ASSERT_OK(TryReopen(options)); + + uint32_t mempurge_count = 0; + uint32_t flush_count = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::string KEY1 = "ThisIsKey1"; + std::string KEY2 = "ThisIsKey2"; + std::string KEY3 = "ThisIsKey3"; + std::string KEY4 = "ThisIsKey4"; + std::string KEY5 = "ThisIsKey5"; + const std::string NOT_FOUND = "NOT_FOUND"; + + Random rnd(117); + const size_t NUM_REPEAT = 200; + const size_t RAND_VALUES_LENGTH = 512; + bool atLeastOneFlush = false; + std::string key, value, p_v1, p_v2, p_v3, p_v3b, p_v4, p_v5; + int count = 0; + const int EXPECTED_COUNT_FORLOOP = 3; + const int EXPECTED_COUNT_END = 4; + + ReadOptions ropt; + ropt.pin_data = true; + ropt.total_order_seek = true; + Iterator* iter = nullptr; + // Insertion of of K-V pairs, multiple times. + // Also insert DeleteRange + for (size_t i = 0; i < NUM_REPEAT; i++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3b = rnd.RandomString(RAND_VALUES_LENGTH); + p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v5 = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEY1, p_v1)); + ASSERT_OK(Put(KEY2, p_v2)); + ASSERT_OK(Put(KEY3, p_v3)); + ASSERT_OK(Put(KEY4, p_v4)); + ASSERT_OK(Put(KEY5, p_v5)); + ASSERT_OK(Delete(KEY2)); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2, + KEY4)); + ASSERT_OK(Put(KEY3, p_v3b)); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1, + KEY3)); + ASSERT_OK(Delete(KEY1)); + + // Flush (MemPurge) with a probability of 50%. + if (rnd.OneIn(2)) { + ASSERT_OK(Flush()); + atLeastOneFlush = true; + } + + ASSERT_EQ(Get(KEY1), NOT_FOUND); + ASSERT_EQ(Get(KEY2), NOT_FOUND); + ASSERT_EQ(Get(KEY3), p_v3b); + ASSERT_EQ(Get(KEY4), p_v4); + ASSERT_EQ(Get(KEY5), p_v5); + + iter = db_->NewIterator(ropt); + iter->SeekToFirst(); + count = 0; + for (; iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + key = (iter->key()).ToString(false); + value = (iter->value()).ToString(false); + if (key.compare(KEY3) == 0) + ASSERT_EQ(value, p_v3b); + else if (key.compare(KEY4) == 0) + ASSERT_EQ(value, p_v4); + else if (key.compare(KEY5) == 0) + ASSERT_EQ(value, p_v5); + else + ASSERT_EQ(value, NOT_FOUND); + count++; + } + + // Expected count here is 3: KEY3, KEY4, KEY5. + ASSERT_EQ(count, EXPECTED_COUNT_FORLOOP); + if (iter) { + delete iter; + } + } + + // Check that there was at least one mempurge + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; + // Check that there was no flush to storage. + const uint32_t EXPECTED_FLUSH_COUNT = 0; + + if (atLeastOneFlush) { + EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); + } else { + // Note that there isn't enough values added to + // automatically trigger a flush/MemPurge in the background. + // Therefore we can make the assumption that if we never + // called "Flush()", no mempurge happened. + EXPECT_EQ(mempurge_count, EXPECTED_FLUSH_COUNT); + } + EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT); + + // Additional test for the iterator+memPurge. + ASSERT_OK(Put(KEY2, p_v2)); + iter = db_->NewIterator(ropt); + iter->SeekToFirst(); + ASSERT_OK(Put(KEY4, p_v4)); + count = 0; + for (; iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + key = (iter->key()).ToString(false); + value = (iter->value()).ToString(false); + if (key.compare(KEY2) == 0) + ASSERT_EQ(value, p_v2); + else if (key.compare(KEY3) == 0) + ASSERT_EQ(value, p_v3b); + else if (key.compare(KEY4) == 0) + ASSERT_EQ(value, p_v4); + else if (key.compare(KEY5) == 0) + ASSERT_EQ(value, p_v5); + else + ASSERT_EQ(value, NOT_FOUND); + count++; + } + // Expected count here is 4: KEY2, KEY3, KEY4, KEY5. + ASSERT_EQ(count, EXPECTED_COUNT_END); + if (iter) delete iter; + + Close(); +} + +// Create a Compaction Fitler that will be invoked +// at flush time and will update the value of a KV pair +// if the key string is "lower" than the filter_key_ string. +class ConditionalUpdateFilter : public CompactionFilter { + public: + explicit ConditionalUpdateFilter(const std::string* filtered_key) + : filtered_key_(filtered_key) {} + bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/, + std::string* new_value, bool* value_changed) const override { + // If key CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override { + return std::unique_ptr( + new ConditionalUpdateFilter(&filtered_key_)); + } + + const char* Name() const override { return "ConditionalUpdateFilterFactory"; } + + bool ShouldFilterTableFileCreation( + TableFileCreationReason reason) const override { + // This compaction filter will be invoked + // at flush time (and therefore at MemPurge time). + return (reason == TableFileCreationReason::kFlush); + } + + private: + std::string filtered_key_; +}; + +TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { + Options options = CurrentOptions(); + + std::string KEY1 = "ThisIsKey1"; + std::string KEY2 = "ThisIsKey2"; + std::string KEY3 = "ThisIsKey3"; + std::string KEY4 = "ThisIsKey4"; + std::string KEY5 = "ThisIsKey5"; + const std::string NOT_FOUND = "NOT_FOUND"; + + options.statistics = CreateDBStatistics(); + options.statistics->set_stats_level(StatsLevel::kAll); + options.create_if_missing = true; + options.compression = kNoCompression; + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Create a ConditionalUpdate compaction filter + // that will update all the values of the KV pairs + // where the keys are "lower" than KEY4. + options.compaction_filter_factory = + std::make_shared(KEY4); + + // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). + options.write_buffer_size = 64 << 20; + // Activate the MemPurge prototype. + options.experimental_allow_mempurge = true; + ASSERT_OK(TryReopen(options)); + + Random rnd(53); + const size_t NUM_REPEAT = 25; + const size_t RAND_VALUES_LENGTH = 128; + std::string p_v1, p_v2, p_v3, p_v4, p_v5; + + // Insertion of of K-V pairs, multiple times. + // Also insert DeleteRange + for (size_t i = 0; i < NUM_REPEAT; i++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v5 = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEY1, p_v1)); + ASSERT_OK(Put(KEY2, p_v2)); + ASSERT_OK(Put(KEY3, p_v3)); + ASSERT_OK(Put(KEY4, p_v4)); + ASSERT_OK(Put(KEY5, p_v5)); + + ASSERT_OK(Delete(KEY1)); + + ASSERT_OK(Flush()); + + // Verify that the ConditionalUpdateCompactionFilter + // updated the values of KEY2 and KEY3, and not KEY4 and KEY5. + ASSERT_EQ(Get(KEY1), NOT_FOUND); + ASSERT_EQ(Get(KEY2), NEW_VALUE); + ASSERT_EQ(Get(KEY3), NEW_VALUE); + ASSERT_EQ(Get(KEY4), p_v4); + ASSERT_EQ(Get(KEY5), p_v5); + } +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 3d4d0a2e7..89e1719a6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -20,6 +20,7 @@ #include #include "db/column_family.h" +#include "db/compaction/compaction_iterator.h" #include "db/compaction/compaction_job.h" #include "db/dbformat.h" #include "db/error_handler.h" @@ -53,6 +54,7 @@ #include "rocksdb/trace_reader_writer.h" #include "rocksdb/transaction_log.h" #include "rocksdb/write_buffer_manager.h" +#include "table/merging_iterator.h" #include "table/scoped_arena_iterator.h" #include "util/autovector.h" #include "util/hash.h" @@ -1610,6 +1612,23 @@ class DBImpl : public DB { Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); + // Memtable Garbage Collection algorithm: a MemPurge takes the memtable + // and filters (or "purge") the outdated bytes out of it. The output + // (the filtered bytes, or "useful payload") is then transfered into + // the new memtable "new_mem". This process is typically intended for + // workloads with heavy overwrites to save on IO cost resulting from + // expensive flush operations. + // "MemPurge" is an experimental feature still at a very early stage + // of development. At the moment it is only compatible with the Get, Put, + // Delete operations as well as Iterators and CompactionFilters. + // For this early version, "MemPurge" is called by setting the + // options.experimental_allow_mempurge flag as "true". When this is + // the case, ALL flush operations will be replaced by MemPurge operations. + // (for prototype stress-testing purposes). Therefore, we strongly + // recommend all users not to set this flag as true given that the MemPurge + // process has not matured yet. + Status MemPurge(ColumnFamilyData* cfd, MemTable* new_mem); + void SelectColumnFamiliesForAtomicFlush(autovector* cfds); // Force current memtable contents to be flushed. @@ -1835,7 +1854,7 @@ class DBImpl : public DB { // state needs flush or compaction. void InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersionContext* sv_context, - const MutableCFOptions& mutable_cf_options); + const MutableCFOptions& mutable_cf_options, bool fromMemPurge = false); bool GetIntPropertyInternal(ColumnFamilyData* cfd, const DBPropertyInfo& property_info, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 9c85aa773..69a98c934 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -151,7 +151,6 @@ Status DBImpl::FlushMemTableToOutputFile( assert(cfd); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); - FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, port::kMaxUint64 /* memtable_id */, file_options_for_compaction_, @@ -3437,7 +3436,7 @@ void DBImpl::BuildCompactionJobInfo( void DBImpl::InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersionContext* sv_context, - const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, bool fromMemPurge) { mutex_.AssertHeld(); // Update max_total_in_memory_state_ @@ -3452,7 +3451,8 @@ void DBImpl::InstallSuperVersionAndScheduleWork( if (UNLIKELY(sv_context->new_superversion == nullptr)) { sv_context->NewSuperVersion(); } - cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options); + cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options, + fromMemPurge); // There may be a small data race here. The snapshot tricking bottommost // compaction may already be released here. But assuming there will always be diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 2bd6f7124..acccbff23 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1737,6 +1737,184 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/, } #endif // ROCKSDB_LITE +Status DBImpl::MemPurge(ColumnFamilyData* cfd, MemTable* new_mem) { + Status s; + assert(new_mem != nullptr); + + JobContext job_context(next_job_id_.fetch_add(1), true); + std::vector snapshot_seqs; + SequenceNumber earliest_write_conflict_snapshot; + SnapshotChecker* snapshot_checker; + GetSnapshotContext(&job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); + + // Grab current memtable + MemTable* m = cfd->mem(); + SequenceNumber first_seqno = m->GetFirstSequenceNumber(); + SequenceNumber earliest_seqno = m->GetEarliestSequenceNumber(); + + // Create two iterators, one for the memtable data (contains + // info from puts + deletes), and one for the memtable + // Range Tombstones (from DeleteRanges). + ReadOptions ro; + ro.total_order_seek = true; + Arena arena; + std::vector memtables(1, m->NewIterator(ro, &arena)); + std::vector> + range_del_iters; + auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber); + if (range_del_iter != nullptr) { + range_del_iters.emplace_back(range_del_iter); + } + ScopedArenaIterator iter( + NewMergingIterator(&(cfd->internal_comparator()), memtables.data(), + static_cast(memtables.size()), &arena)); + + auto* ioptions = cfd->ioptions(); + + // Place iterator at the First (meaning most recent) key node. + iter->SeekToFirst(); + + std::unique_ptr range_del_agg( + new CompactionRangeDelAggregator(&(cfd->internal_comparator()), + snapshot_seqs)); + for (auto& rd_iter : range_del_iters) { + range_del_agg->AddTombstones(std::move(rd_iter)); + } + + // If there is valid data in the memtable, + // or at least range tombstones, copy over the info + // to the new memtable. + if (iter->Valid() || !range_del_agg->IsEmpty()) { + std::unique_ptr compaction_filter; + if (ioptions->compaction_filter_factory != nullptr && + ioptions->compaction_filter_factory->ShouldFilterTableFileCreation( + TableFileCreationReason::kFlush)) { + CompactionFilter::Context ctx; + ctx.is_full_compaction = false; + ctx.is_manual_compaction = false; + ctx.column_family_id = cfd->GetID(); + ctx.reason = TableFileCreationReason::kFlush; + compaction_filter = + ioptions->compaction_filter_factory->CreateCompactionFilter(ctx); + if (compaction_filter != nullptr && + !compaction_filter->IgnoreSnapshots()) { + s = Status::NotSupported( + "CompactionFilter::IgnoreSnapshots() = false is not supported " + "anymore."); + return s; + } + } + + Env* env = immutable_db_options_.env; + assert(env); + MergeHelper merge( + env, (cfd->internal_comparator()).user_comparator(), + (ioptions->merge_operator).get(), compaction_filter.get(), + ioptions->logger, true /* internal key corruption is not ok */, + snapshot_seqs.empty() ? 0 : snapshot_seqs.back(), snapshot_checker); + CompactionIterator c_iter( + iter.get(), (cfd->internal_comparator()).user_comparator(), &merge, + kMaxSequenceNumber, &snapshot_seqs, earliest_write_conflict_snapshot, + snapshot_checker, env, ShouldReportDetailedTime(env, ioptions->stats), + true /* internal key corruption is not ok */, range_del_agg.get(), + nullptr, ioptions->allow_data_in_errors, + /*compaction=*/nullptr, compaction_filter.get(), + /*shutting_down=*/nullptr, + /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr, + /*manual_compaction_canceled=*/nullptr, immutable_db_options_.info_log, + &(cfd->GetFullHistoryTsLow())); + + c_iter.SeekToFirst(); + + mutex_.AssertHeld(); + + // Set earliest sequence number in the new memtable + // to be equal to the earliest sequence number of the + // memtable being flushed (See later if there is a need + // to update this number!). + new_mem->SetEarliestSequenceNumber(earliest_seqno); + // Likewise for first seq number. + new_mem->SetFirstSequenceNumber(first_seqno); + SequenceNumber new_first_seqno = kMaxSequenceNumber; + + // Key transfer + for (; c_iter.Valid(); c_iter.Next()) { + const ParsedInternalKey ikey = c_iter.ikey(); + const Slice value = c_iter.value(); + new_first_seqno = + ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno; + + // Should we update "OldestKeyTime" ???? + s = new_mem->Add( + ikey.sequence, ikey.type, ikey.user_key, value, + nullptr, // KV protection info set as nullptr since it + // should only be useful for the first add to + // the original memtable. + false, // : allow concurrent_memtable_writes_ + // Not seen as necessary for now. + nullptr, // get_post_process_info(m) must be nullptr + // when concurrent_memtable_writes is switched off. + nullptr); // hint, only used when concurrent_memtable_writes_ + // is switched on. + if (!s.ok()) { + break; + } + } + + // Check status and propagate + // potential error status from c_iter + if (!s.ok()) { + c_iter.status().PermitUncheckedError(); + } else if (!c_iter.status().ok()) { + s = c_iter.status(); + } + + // Range tombstone transfer. + if (s.ok()) { + auto range_del_it = range_del_agg->NewIterator(); + for (range_del_it->SeekToFirst(); range_del_it->Valid(); + range_del_it->Next()) { + auto tombstone = range_del_it->Tombstone(); + new_first_seqno = + tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno; + s = new_mem->Add( + tombstone.seq_, // Sequence number + kTypeRangeDeletion, // KV type + tombstone.start_key_, // Key is start key. + tombstone.end_key_, // Value is end key. + nullptr, // KV protection info set as nullptr since it + // should only be useful for the first add to + // the original memtable. + false, // : allow concurrent_memtable_writes_ + // Not seen as necessary for now. + nullptr, // get_post_process_info(m) must be nullptr + // when concurrent_memtable_writes is switched off. + nullptr); // hint, only used when concurrent_memtable_writes_ + // is switched on. + + if (!s.ok()) { + break; + } + } + } + // Rectify the first sequence number, which (unlike the earliest seq + // number) needs to be present in the new memtable. + new_mem->SetFirstSequenceNumber(new_first_seqno); + } + // Note: if the mempurge was ineffective, meaning that there was no + // garbage to remove, and this new_mem needs to be flushed again, + // the new_mem->Add would have updated the flush status when it + // called "UpdateFlushState()" internally at the last Add() call. + // Therefore if the new mem needs to be flushed again, we mark + // the return status as "aborted", which will trigger the regular + // flush operation. + if (s.ok() && new_mem->ShouldScheduleFlush()) { + s = Status::Aborted(Slice("No garbage collected.")); + } + return s; +} + // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the 2nd writer queue if @@ -1911,6 +2089,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { for (auto cf : empty_cfs) { if (cf->IsEmpty()) { cf->SetLogNumber(logfile_number_); + // MEMPURGE: No need to change this, because new adds + // should still receive new sequence numbers. cf->mem()->SetCreationSeq(versions_->LastSequence()); } // cf may become non-empty. } @@ -1933,11 +2113,51 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } cfd->mem()->SetNextLogNumber(logfile_number_); - cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); + + // By default, it is assumed that the 'old' memtable + // will be added to the Imm memtable list and will therefore + // contribute to the Imm memory footprint. + bool noImmMemoryContribution = false; + // If MemPurge activated, purge and delete current memtable. + if (immutable_db_options_.experimental_allow_mempurge && + (new_mem != nullptr) && + ((cfd->GetFlushReason() == FlushReason::kOthers) || + (cfd->GetFlushReason() == FlushReason::kManualFlush))) { + Status mempurge_s = MemPurge(cfd, new_mem); + if (mempurge_s.ok()) { + // If mempurge worked successfully, + // create sync point and decrement current memtable reference. + TEST_SYNC_POINT("DBImpl::MemPurge"); + cfd->mem()->Unref(); + // If the MemPurge is successful, the 'old' (purged) memtable + // is not added to the Imm memtable list and therefore + // does not contribute to the Imm memory cost anymore. + noImmMemoryContribution = true; + } else { + // If mempurge failed, go back to regular mem->imm->flush workflow. + if (new_mem) { + delete new_mem; + } + SuperVersion* new_superversion = + context->superversion_context.new_superversion.release(); + if (new_superversion != nullptr) { + delete new_superversion; + } + SequenceNumber seq = versions_->LastSequence(); + new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); + context->superversion_context.NewSuperVersion(); + cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); + } + } else { + // Else make the memtable immutable and proceed as usual. + cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); + } new_mem->Ref(); cfd->SetMemtable(new_mem); InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, - mutable_cf_options); + mutable_cf_options, + noImmMemoryContribution); + #ifndef ROCKSDB_LITE mutex_.Unlock(); // Notify client that memtable is sealed, now that we have successfully diff --git a/db/flush_job.cc b/db/flush_job.cc index 10d6ed108..409024a08 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -440,6 +440,7 @@ Status FlushJob::WriteLevel0Table() { } } if (tboptions.reason == TableFileCreationReason::kFlush) { + TEST_SYNC_POINT("DBImpl::FlushJob:Flush"); RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH, memtable_payload_bytes); RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH, diff --git a/db/memtable.h b/db/memtable.h index 54155f9b5..6f908ae5b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -341,6 +341,14 @@ class MemTable { return first_seqno_.load(std::memory_order_relaxed); } + // Returns the sequence number of the first element that was inserted + // into the memtable. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + void SetFirstSequenceNumber(SequenceNumber first_seqno) { + return first_seqno_.store(first_seqno, std::memory_order_relaxed); + } + // Returns the sequence number that is guaranteed to be smaller than or equal // to the sequence number of any key that could be inserted into this // memtable. It can then be assumed that any write with a larger(or equal) @@ -352,6 +360,15 @@ class MemTable { return earliest_seqno_.load(std::memory_order_relaxed); } + // Sets the sequence number that is guaranteed to be smaller than or equal + // to the sequence number of any key that could be inserted into this + // memtable. It can then be assumed that any write with a larger(or equal) + // sequence number will be present in this memtable or a later memtable. + // Used only for MemPurge operation + void SetEarliestSequenceNumber(SequenceNumber earliest_seqno) { + return earliest_seqno_.store(earliest_seqno, std::memory_order_relaxed); + } + // DB's latest sequence ID when the memtable is created. This number // may be updated to a more recent one before any key is inserted. SequenceNumber GetCreationSeq() const { return creation_seq_; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index affbb7aa0..30dd2b95d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -778,6 +778,10 @@ struct DBOptions { // Default: true bool advise_random_on_open = true; + // If true, allows for memtable purge instead of flush to storage. + // (experimental). + bool experimental_allow_mempurge = false; + // Amount of data to build up in memtables across all column // families before writing to disk. // diff --git a/options/db_options.cc b/options/db_options.cc index 1e6113d37..17f96a553 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -192,6 +192,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, error_if_exists), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"experimental_allow_mempurge", + {offsetof(struct ImmutableDBOptions, experimental_allow_mempurge), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"is_fd_close_on_exec", {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -541,6 +545,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), advise_random_on_open(options.advise_random_on_open), + experimental_allow_mempurge(options.experimental_allow_mempurge), db_write_buffer_size(options.db_write_buffer_size), write_buffer_manager(options.write_buffer_manager), access_hint_on_compaction_start(options.access_hint_on_compaction_start), @@ -674,6 +679,9 @@ void ImmutableDBOptions::Dump(Logger* log) const { is_fd_close_on_exec); ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d", advise_random_on_open); + ROCKS_LOG_HEADER(log, + " Options.experimental_allow_mempurge: %d", + experimental_allow_mempurge); ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index edbdbe6a2..7ff90318c 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -54,6 +54,7 @@ struct ImmutableDBOptions { bool allow_fallocate; bool is_fd_close_on_exec; bool advise_random_on_open; + bool experimental_allow_mempurge; size_t db_write_buffer_size; std::shared_ptr write_buffer_manager; DBOptions::AccessHint access_hint_on_compaction_start; diff --git a/options/options_test.cc b/options/options_test.cc index 71921de7c..d3787c475 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -140,6 +140,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, + {"experimental_allow_mempurge", "false"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, @@ -298,6 +299,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); + ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); @@ -1981,6 +1983,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, + {"experimental_allow_mempurge", "false"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, @@ -2133,6 +2136,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); + ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);