From 8f5bf04468a2ff3e1c8018c162fc8bfc2484d50f Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Tue, 21 Mar 2017 10:59:57 -0700 Subject: [PATCH] Flush triggered by DB write buffer size picks the oldest unflushed CF Summary: Previously, when DB write buffer size triggers, we always pick the CF with most data in its memtable to flush. This approach can minimize total flush happens. Change the behavior to always pick the oldest unflushed CF, which makes it the same behavior when max_total_wal_size hits. This approach will minimize size used by max_total_wal_size. Closes https://github.com/facebook/rocksdb/pull/1987 Differential Revision: D4703214 Pulled By: siying fbshipit-source-id: 9ff8b09 --- db/db_impl.cc | 38 +++++----- db/db_test2.cc | 191 ++++++++++++++++++++++++------------------------- db/memtable.cc | 5 +- db/memtable.h | 8 +++ 4 files changed, 124 insertions(+), 118 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 48983bf03..96c3db8c9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4758,8 +4758,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, write_buffer_manager_->buffer_size()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread - ColumnFamilyData* largest_cfd = nullptr; - size_t largest_cfd_size = 0; + ColumnFamilyData* cfd_picked = nullptr; + SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { @@ -4768,18 +4768,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (!cfd->mem()->IsEmpty()) { // We only consider active mem table, hoping immutable memtable is // already in the process of flushing. - size_t cfd_size = cfd->mem()->ApproximateMemoryUsage(); - if (largest_cfd == nullptr || cfd_size > largest_cfd_size) { - largest_cfd = cfd; - largest_cfd_size = cfd_size; + uint64_t seq = cfd->mem()->GetCreationSeq(); + if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { + cfd_picked = cfd; + seq_num_for_cf_picked = seq; } } } - if (largest_cfd != nullptr) { - status = SwitchMemtable(largest_cfd, &context); + if (cfd_picked != nullptr) { + status = SwitchMemtable(cfd_picked, &context); if (status.ok()) { - largest_cfd->imm()->FlushRequested(); - SchedulePendingFlush(largest_cfd); + cfd_picked->imm()->FlushRequested(); + SchedulePendingFlush(cfd_picked); MaybeScheduleFlushOrCompaction(); } } @@ -5317,17 +5317,21 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log_dir_synced_ = false; logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); - for (auto loop_cfd : *versions_->GetColumnFamilySet()) { - // all this is just optimization to delete logs that - // are no longer needed -- if CF is empty, that means it - // doesn't need that particular log to stay alive, so we just - // advance the log number. no need to persist this in the manifest - if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 && - loop_cfd->imm()->NumNotFlushed() == 0) { + } + for (auto loop_cfd : *versions_->GetColumnFamilySet()) { + // all this is just optimization to delete logs that + // are no longer needed -- if CF is empty, that means it + // doesn't need that particular log to stay alive, so we just + // advance the log number. no need to persist this in the manifest + if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 && + loop_cfd->imm()->NumNotFlushed() == 0) { + if (creating_new_log) { loop_cfd->SetLogNumber(logfile_number_); } + loop_cfd->mem()->SetCreationSeq(versions_->LastSequence()); } } + cfd->mem()->SetNextLogNumber(logfile_number_); cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); new_mem->Ref(); diff --git a/db/db_test2.cc b/db/db_test2.cc index e9110772e..109a550ac 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -183,19 +183,32 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { options.write_buffer_size = 500000; // this is never hit CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); - // Trigger a flush on CF "nikitich" - ASSERT_OK(Put(0, Key(1), DummyString(1))); - ASSERT_OK(Put(1, Key(1), DummyString(1))); - ASSERT_OK(Put(3, Key(1), DummyString(90000))); - ASSERT_OK(Put(2, Key(2), DummyString(20000))); - ASSERT_OK(Put(2, Key(1), DummyString(1))); + WriteOptions wo; + wo.disableWAL = true; + + // Create some data and flush "default" and "nikitich" so that they + // are newer CFs created. + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + Flush(3); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + Flush(0); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(1)); + + ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(60000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + // No flush should trigger dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), @@ -204,98 +217,84 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { static_cast(1)); } - // "dobrynia": 20KB - // Flush 'dobrynia' - ASSERT_OK(Put(3, Key(2), DummyString(40000))); - ASSERT_OK(Put(2, Key(2), DummyString(70000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(1)); - } - - // "nikitich" still has data of 80KB - // Inserting Data in "dobrynia" triggers "nikitich" flushing. - ASSERT_OK(Put(3, Key(2), DummyString(40000))); - ASSERT_OK(Put(2, Key(2), DummyString(40000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(2)); - } - - // "dobrynia" still has 40KB - ASSERT_OK(Put(1, Key(2), DummyString(20000))); - ASSERT_OK(Put(0, Key(1), DummyString(10000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); + // Trigger a flush. Flushing "nikitich". + ASSERT_OK(Put(3, Key(2), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - // This should triggers no flush { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } - // "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB - ASSERT_OK(Put(1, Key(2), DummyString(40000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); + // Without hitting the threshold, no flush should trigger. + ASSERT_OK(Put(2, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - // This should triggers flush of "pikachu" { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } - // "default": 10KB, "dobrynia": 40KB - // Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on - // closure. - ASSERT_OK(Put(3, Key(1), DummyString(1))); - ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, - options); + // Hit the write buffer limit again. "default" + // will have been flushed. + ASSERT_OK(Put(2, Key(2), DummyString(10000), wo)); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(2)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(3)); + static_cast(2)); + } + + // Trigger another flush. This time "dobrynia". "pikachu" should not + // be flushed, althrough it was never flushed. + ASSERT_OK(Put(1, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(80000), wo)); + ASSERT_OK(Put(1, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(2)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(2)); } } @@ -314,23 +313,38 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { ASSERT_OK(DB::Open(options, dbname2, &db2)); WriteOptions wo; + wo.disableWAL = true; // Trigger a flush on cf2 - ASSERT_OK(Put(0, Key(1), DummyString(1))); - ASSERT_OK(Put(1, Key(1), DummyString(1))); - ASSERT_OK(Put(2, Key(1), DummyString(90000))); + ASSERT_OK(Put(2, Key(1), DummyString(70000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(20000), wo)); // Insert to DB2 ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000))); - ASSERT_OK(Put(2, Key(1), DummyString(1))); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); static_cast(db2)->TEST_WaitForFlushMemTable(); { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") + + GetNumberOfSstFilesForColumnFamily(db_, "cf1") + + GetNumberOfSstFilesForColumnFamily(db_, "cf2"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), static_cast(0)); + } + + // Triggering to flush another CF in DB1 + ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000))); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), @@ -339,9 +353,8 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { static_cast(0)); } - // db2: 20KB - ASSERT_OK(db2->Put(wo, Key(2), DummyString(40000))); - ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000))); + // Triggering flush in DB2. + ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000))); ASSERT_OK(db2->Put(wo, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); @@ -349,7 +362,7 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { static_cast(db2)->TEST_WaitForFlushMemTable(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), @@ -358,26 +371,6 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { static_cast(1)); } - // - // Inserting Data in db2 and db_ triggers flushing in db_. - ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000))); - ASSERT_OK(Put(2, Key(2), DummyString(45000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - static_cast(db2)->TEST_WaitForFlushMemTable(); - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), - static_cast(2)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), - static_cast(1)); - } - delete db2; ASSERT_OK(DestroyDB(dbname2, options)); } diff --git a/db/memtable.cc b/db/memtable.cc index a8ece01ce..0ecd36f7e 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -61,7 +61,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, WriteBufferManager* write_buffer_manager, - SequenceNumber earliest_seq) + SequenceNumber latest_seq) : comparator_(cmp), moptions_(ioptions, mutable_cf_options), refs_(0), @@ -83,7 +83,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, flush_completed_(false), file_number_(0), first_seqno_(0), - earliest_seqno_(earliest_seq), + earliest_seqno_(latest_seq), + creation_seq_(latest_seq), mem_next_logfile_number_(0), min_prep_log_referenced_(0), locks_(moptions_.inplace_update_support diff --git a/db/memtable.h b/db/memtable.h index 12d7be61d..2417a1130 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -286,6 +286,12 @@ class MemTable { return earliest_seqno_.load(std::memory_order_relaxed); } + // DB's latest sequence ID when the memtable is created. This number + // may be updated to a more recent one before any key is inserted. + SequenceNumber GetCreationSeq() const { return creation_seq_; } + + void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; } + // Returns the next active logfile number when this memtable is about to // be flushed to storage // REQUIRES: external synchronization to prevent simultaneous @@ -381,6 +387,8 @@ class MemTable { // if not set. std::atomic earliest_seqno_; + SequenceNumber creation_seq_; + // The log files earlier than this number can be deleted. uint64_t mem_next_logfile_number_;