From 6ce5580882bda5791bec61b033e03a452a7a8483 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 10 Jun 2019 12:53:56 -0700 Subject: [PATCH] Improve memtable earliest seqno assignment for secondary instance (#5413) Summary: In regular RocksDB instance, `MemTable::earliest_seqno_` is "db sequence number at the time of creation". However, we cannot use the db sequence number to set the value of `MemTable::earliest_seqno_` for secondary instance, i.e. `DBImplSecondary` due to the logic of MANIFEST and WAL replay. When replaying the log files of the primary, the secondary instance first replays MANIFEST and updates the db sequence number if necessary. Next, the secondary replays WAL files, creates new memtables if necessary and inserts key-value pairs into memtables. The following can occur when the db has two or more column families. Assume the db has column family "default" and "cf1". At a certain in time, both "default" and "cf1" have data in memtables. 1. Primary triggers a flush and flushes "cf1". "default" is **not** flushed. 2. Secondary replays the MANIFEST updates its db sequence number to the latest value learned from the MANIFEST. 3. Secondary starts to replay WAL that contains the writes to "default". It is possible that the write batches' sequence numbers are smaller than the db sequence number. In this case, these write batches will be skipped, and these updates will not be visible to reader until "default" is later flushed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5413 Differential Revision: D15637407 Pulled By: riversand963 fbshipit-source-id: 3de3fe35cfc6f1b9f844f3f926f0df29717b6580 --- HISTORY.md | 1 + db/db_impl/db_impl_secondary.cc | 36 ++++++++++++++++++++++++--------- db/db_impl/db_secondary_test.cc | 7 +++++++ 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index c88b436e4..ad6c370b5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -27,6 +27,7 @@ ### Bug Fixes * Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number. * Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level. +* Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family. ## 6.2.0 (4/30/2019) ### New Features diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 827d99929..eb8c4c987 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -199,16 +199,8 @@ Status DBImplSecondary::RecoverLogFiles( record.size(), Status::Corruption("log record too small")); continue; } - SequenceNumber seq = versions_->LastSequence(); WriteBatchInternal::SetContents(&batch, record); SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch); - // If the write batch's sequence number is smaller than the last sequence - // number of the db, then we should skip this write batch because its - // data must reside in an SST that has already been added in the prior - // MANIFEST replay. - if (seq_of_batch < seq) { - continue; - } std::vector column_family_ids; status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); if (status.ok()) { @@ -221,6 +213,17 @@ Status DBImplSecondary::RecoverLogFiles( if (cfds_changed->count(cfd) == 0) { cfds_changed->insert(cfd); } + const std::vector& l0_files = + cfd->current()->storage_info()->LevelFiles(0); + SequenceNumber seq = + l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno; + // If the write batch's sequence number is smaller than the last + // sequence number of the largest sequence persisted for this column + // family, then its data must reside in an SST that has already been + // added in the prior MANIFEST replay. + if (seq_of_batch <= seq) { + continue; + } auto curr_log_num = port::kMaxUint64; if (cfd_to_current_log_.count(cfd) > 0) { curr_log_num = cfd_to_current_log_[cfd]; @@ -233,7 +236,7 @@ Status DBImplSecondary::RecoverLogFiles( const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); MemTable* new_mem = - cfd->ConstructNewMemtable(mutable_cf_options, seq); + cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch); cfd->mem()->SetNextLogNumber(log_number); cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free); new_mem->Ref(); @@ -452,6 +455,21 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { InstrumentedMutexLock lock_guard(&mutex_); s = static_cast(versions_.get()) ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, + static_cast(versions_->LastSequence())); + for (ColumnFamilyData* cfd : cfds_changed) { + if (cfd->IsDropped()) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", + cfd->GetName().c_str()); + continue; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] Level summary: %s\n", + cfd->GetName().c_str(), + cfd->current()->storage_info()->LevelSummary(&tmp)); + } + // list wal_dir to discover new WALs and apply new changes to the secondary // instance if (s.ok()) { diff --git a/db/db_impl/db_secondary_test.cc b/db/db_impl/db_secondary_test.cc index 5b375422f..c9aaa3611 100644 --- a/db/db_impl/db_secondary_test.cc +++ b/db/db_impl/db_secondary_test.cc @@ -576,6 +576,11 @@ TEST_F(DBSecondaryTest, SwitchWAL) { TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { const int kNumKeysPerMemtable = 1; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::BackgroundCallFlush:ContextCleanedUp", + "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}}); + SyncPoint::GetInstance()->EnableProcessing(); const std::string kCFName1 = "pikachu"; Options options; options.env = env_; @@ -629,8 +634,10 @@ TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); ASSERT_OK( Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); + TEST_SYNC_POINT("DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"); ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); verify_db(dbfull(), handles_, db_secondary_, handles_secondary_); + SyncPoint::GetInstance()->ClearTrace(); } }