From d11c09d9e21afd40097040089a9d20a919cb4879 Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Thu, 11 Aug 2016 19:10:16 -0700 Subject: [PATCH] Eliminate memcpy from ForwardIterator Summary: This diff update ForwardIterator to support pinning keys and values, which will allow DBIter to take advantage of that and eliminate memcpy when executing merge operators This diff is stacked on D61305 Test Plan: existing tests (updated them to test tailing iterator) new test Reviewers: andrewkr, yhchiang, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D60009 --- db/db_test2.cc | 95 ++++++++++++++++--------- db/db_test_util.cc | 76 ++++++++++++++++---- db/db_test_util.h | 3 +- db/forward_iterator.cc | 156 +++++++++++++++++++++++++++++++++++------ db/forward_iterator.h | 13 ++++ 5 files changed, 273 insertions(+), 70 deletions(-) diff --git a/db/db_test2.cc b/db/db_test2.cc index 147715a7d..e89364651 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1628,37 +1628,7 @@ TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) { // 3 L4 Files ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3"); - // Verify Get() - for (auto kv : true_data) { - ASSERT_EQ(Get(kv.first), kv.second); - } - - Iterator* iter = db_->NewIterator(ReadOptions()); - - // Verify Iterator::Next() - auto data_iter = true_data.begin(); - for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { - ASSERT_EQ(iter->key().ToString(), data_iter->first); - ASSERT_EQ(iter->value().ToString(), data_iter->second); - } - ASSERT_EQ(data_iter, true_data.end()); - - // Verify Iterator::Prev() - auto data_rev = true_data.rbegin(); - for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { - ASSERT_EQ(iter->key().ToString(), data_rev->first); - ASSERT_EQ(iter->value().ToString(), data_rev->second); - } - ASSERT_EQ(data_rev, true_data.rend()); - - // Verify Iterator::Seek() - for (auto kv : true_data) { - iter->Seek(kv.first); - ASSERT_EQ(kv.first, iter->key().ToString()); - ASSERT_EQ(kv.second, iter->value().ToString()); - } - - delete iter; + VerifyDBFromMap(true_data); } TEST_P(MergeOperatorPinningTest, Randomized) { @@ -1807,12 +1777,69 @@ TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) { } }; - VerifyDBFromMap(true_data); - ASSERT_EQ(merge_cnt, kNumKeys * 4 /* get + next + prev + seek */); + size_t total_reads; + VerifyDBFromMap(true_data, &total_reads); + ASSERT_EQ(merge_cnt, total_reads); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - VerifyDBFromMap(true_data); + VerifyDBFromMap(true_data, &total_reads); +} + +TEST_P(MergeOperatorPinningTest, TailingIterator) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreateMaxOperator(); + BlockBasedTableOptions bbto; + bbto.no_block_cache = disable_block_cache_; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); + + const int kNumOperands = 100; + const int kNumWrites = 100000; + + std::function writer_func = [&]() { + int k = 0; + for (int i = 0; i < kNumWrites; i++) { + db_->Merge(WriteOptions(), Key(k), Key(k)); + + if (i && i % kNumOperands == 0) { + k++; + } + if (i && i % 127 == 0) { + ASSERT_OK(Flush()); + } + if (i && i % 317 == 0) { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + } + } + }; + + std::function reader_func = [&]() { + ReadOptions ro; + ro.tailing = true; + Iterator* iter = db_->NewIterator(ro); + + iter->SeekToFirst(); + for (int i = 0; i < (kNumWrites / kNumOperands); i++) { + while (!iter->Valid()) { + // wait for the key to be written + env_->SleepForMicroseconds(100); + iter->Seek(Key(i)); + } + ASSERT_EQ(iter->key(), Key(i)); + ASSERT_EQ(iter->value(), Key(i)); + + iter->Next(); + } + + delete iter; + }; + + std::thread writer_thread(writer_func); + std::thread reader_thread(reader_func); + + writer_thread.join(); + reader_thread.join(); } #endif // ROCKSDB_LITE diff --git a/db/db_test_util.cc b/db/db_test_util.cc index c79b05cba..6a1a91492 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1076,38 +1076,90 @@ std::vector DBTestBase::ListTableFiles(Env* env, return file_numbers; } -void DBTestBase::VerifyDBFromMap(std::map true_data) { +void DBTestBase::VerifyDBFromMap(std::map true_data, + size_t* total_reads_res) { + size_t total_reads = 0; + for (auto& kv : true_data) { ASSERT_EQ(Get(kv.first), kv.second); + total_reads++; } + // Normal Iterator + { + int iter_cnt = 0; + ReadOptions ro; + ro.total_order_seek = true; + Iterator* iter = db_->NewIterator(ro); + // Verify Iterator::Next() + iter_cnt = 0; + auto data_iter = true_data.begin(); + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { + ASSERT_EQ(iter->key().ToString(), data_iter->first); + ASSERT_EQ(iter->value().ToString(), data_iter->second); + iter_cnt++; + total_reads++; + } + ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / " + << true_data.size(); + + // Verify Iterator::Prev() + iter_cnt = 0; + auto data_rev = true_data.rbegin(); + for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { + ASSERT_EQ(iter->key().ToString(), data_rev->first); + ASSERT_EQ(iter->value().ToString(), data_rev->second); + iter_cnt++; + total_reads++; + } + ASSERT_EQ(data_rev, true_data.rend()) << iter_cnt << " / " + << true_data.size(); + + // Verify Iterator::Seek() + for (auto kv : true_data) { + iter->Seek(kv.first); + ASSERT_EQ(kv.first, iter->key().ToString()); + ASSERT_EQ(kv.second, iter->value().ToString()); + total_reads++; + } + + delete iter; + } + +#ifndef ROCKSDB_LITE + // Tailing iterator + int iter_cnt = 0; ReadOptions ro; + ro.tailing = true; ro.total_order_seek = true; Iterator* iter = db_->NewIterator(ro); - // Verify Iterator::Next() + + // Verify ForwardIterator::Next() + iter_cnt = 0; auto data_iter = true_data.begin(); for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { ASSERT_EQ(iter->key().ToString(), data_iter->first); ASSERT_EQ(iter->value().ToString(), data_iter->second); + iter_cnt++; + total_reads++; } - ASSERT_EQ(data_iter, true_data.end()); + ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / " + << true_data.size(); - // Verify Iterator::Prev() - auto data_rev = true_data.rbegin(); - for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { - ASSERT_EQ(iter->key().ToString(), data_rev->first); - ASSERT_EQ(iter->value().ToString(), data_rev->second); - } - ASSERT_EQ(data_rev, true_data.rend()); - - // Verify Iterator::Seek() + // Verify ForwardIterator::Seek() for (auto kv : true_data) { iter->Seek(kv.first); ASSERT_EQ(kv.first, iter->key().ToString()); ASSERT_EQ(kv.second, iter->value().ToString()); + total_reads++; } delete iter; +#endif // ROCKSDB_LITE + + if (total_reads_res) { + *total_reads_res = total_reads; + } } #ifndef ROCKSDB_LITE diff --git a/db/db_test_util.h b/db/db_test_util.h index 237dc51e4..645619602 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -810,7 +810,8 @@ class DBTestBase : public testing::Test { std::vector ListTableFiles(Env* env, const std::string& path); - void VerifyDBFromMap(std::map true_data); + void VerifyDBFromMap(std::map true_data, + size_t* total_reads_res = nullptr); #ifndef ROCKSDB_LITE Status GenerateAndAddExternalFile(const Options options, diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index f7eb8ca24..f66e1f0af 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -32,10 +32,15 @@ namespace rocksdb { class LevelIterator : public InternalIterator { public: LevelIterator(const ColumnFamilyData* const cfd, - const ReadOptions& read_options, - const std::vector& files) - : cfd_(cfd), read_options_(read_options), files_(files), valid_(false), - file_index_(std::numeric_limits::max()) {} + const ReadOptions& read_options, + const std::vector& files) + : cfd_(cfd), + read_options_(read_options), + files_(files), + valid_(false), + file_index_(std::numeric_limits::max()), + file_iter_(nullptr), + pinned_iters_mgr_(nullptr) {} void SetFileIndex(uint32_t file_index) { assert(file_index < files_.size()); @@ -47,10 +52,20 @@ class LevelIterator : public InternalIterator { } void Reset() { assert(file_index_ < files_.size()); - file_iter_.reset(cfd_->table_cache()->NewIterator( + + // Reset current pointer + if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { + pinned_iters_mgr_->PinIterator(file_iter_); + } else { + delete file_iter_; + } + + file_iter_ = cfd_->table_cache()->NewIterator( read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), files_[file_index_]->fd, nullptr /* table_reader_ptr */, nullptr, - false)); + false); + + file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); } void SeekToLast() override { status_ = Status::NotSupported("LevelIterator::SeekToLast()"); @@ -105,6 +120,20 @@ class LevelIterator : public InternalIterator { } return Status::OK(); } + bool IsKeyPinned() const override { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + file_iter_->IsKeyPinned(); + } + bool IsValuePinned() const override { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + file_iter_->IsValuePinned(); + } + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + if (file_iter_) { + file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); + } + } private: const ColumnFamilyData* const cfd_; @@ -114,7 +143,8 @@ class LevelIterator : public InternalIterator { bool valid_; uint32_t file_index_; Status status_; - std::unique_ptr file_iter_; + InternalIterator* file_iter_; + PinnedIteratorsManager* pinned_iters_mgr_; }; ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, @@ -135,7 +165,8 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, has_iter_trimmed_for_upper_bound_(false), current_over_upper_bound_(false), is_prev_set_(false), - is_prev_inclusive_(false) { + is_prev_inclusive_(false), + pinned_iters_mgr_(nullptr) { if (sv_) { RebuildIterators(false); } @@ -145,6 +176,13 @@ ForwardIterator::~ForwardIterator() { Cleanup(true); } +namespace { +// Used in PinnedIteratorsManager to release pinned SuperVersion +static void ReleaseSuperVersionFunc(void* sv) { + delete reinterpret_cast(sv); +} +} // namespace + void ForwardIterator::SVCleanup() { if (sv_ != nullptr && sv_->Unref()) { // Job id == 0 means that this is not our background process, but rather @@ -157,7 +195,11 @@ void ForwardIterator::SVCleanup() { db_->ScheduleBgLogWriterClose(&job_context); } db_->mutex_.Unlock(); - delete sv_; + if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { + pinned_iters_mgr_->PinPtr(sv_, &ReleaseSuperVersionFunc); + } else { + delete sv_; + } if (job_context.HaveSomethingToDelete()) { db_->PurgeObsoleteFiles( job_context, read_options_.background_purge_on_iterator_cleanup); @@ -168,18 +210,21 @@ void ForwardIterator::SVCleanup() { void ForwardIterator::Cleanup(bool release_sv) { if (mutable_iter_ != nullptr) { - mutable_iter_->~InternalIterator(); + DeleteIterator(mutable_iter_, true /* is_arena */); } + for (auto* m : imm_iters_) { - m->~InternalIterator(); + DeleteIterator(m, true /* is_arena */); } imm_iters_.clear(); + for (auto* f : l0_iters_) { - delete f; + DeleteIterator(f); } l0_iters_.clear(); + for (auto* l : level_iters_) { - delete l; + DeleteIterator(l); } level_iters_.clear(); @@ -280,7 +325,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, l0[i]->largest.user_key()) > 0) { if (read_options_.iterate_upper_bound != nullptr) { has_iter_trimmed_for_upper_bound_ = true; - delete l0_iters_[i]; + DeleteIterator(l0_iters_[i]); l0_iters_[i] = nullptr; } continue; @@ -295,7 +340,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, immutable_min_heap_.push(l0_iters_[i]); } else { has_iter_trimmed_for_upper_bound_ = true; - delete l0_iters_[i]; + DeleteIterator(l0_iters_[i]); l0_iters_[i] = nullptr; } } @@ -372,7 +417,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, } else { // Nothing in this level is interesting. Remove. has_iter_trimmed_for_upper_bound_ = true; - delete level_iters_[level - 1]; + DeleteIterator(level_iters_[level - 1]); level_iters_[level - 1] = nullptr; } } @@ -485,6 +530,50 @@ Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) { return Status::InvalidArgument(); } +void ForwardIterator::SetPinnedItersMgr( + PinnedIteratorsManager* pinned_iters_mgr) { + pinned_iters_mgr_ = pinned_iters_mgr; + UpdateChildrenPinnedItersMgr(); +} + +void ForwardIterator::UpdateChildrenPinnedItersMgr() { + // Set PinnedIteratorsManager for mutable memtable iterator. + if (mutable_iter_) { + mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_); + } + + // Set PinnedIteratorsManager for immutable memtable iterators. + for (InternalIterator* child_iter : imm_iters_) { + if (child_iter) { + child_iter->SetPinnedItersMgr(pinned_iters_mgr_); + } + } + + // Set PinnedIteratorsManager for L0 files iterators. + for (InternalIterator* child_iter : l0_iters_) { + if (child_iter) { + child_iter->SetPinnedItersMgr(pinned_iters_mgr_); + } + } + + // Set PinnedIteratorsManager for L1+ levels iterators. + for (LevelIterator* child_iter : level_iters_) { + if (child_iter) { + child_iter->SetPinnedItersMgr(pinned_iters_mgr_); + } + } +} + +bool ForwardIterator::IsKeyPinned() const { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + current_->IsKeyPinned(); +} + +bool ForwardIterator::IsValuePinned() const { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + current_->IsValuePinned(); +} + void ForwardIterator::RebuildIterators(bool refresh_sv) { // Clean up Cleanup(refresh_sv); @@ -513,6 +602,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { BuildLevelIterators(vstorage); current_ = nullptr; is_prev_set_ = false; + + UpdateChildrenPinnedItersMgr(); } void ForwardIterator::RenewIterators() { @@ -521,10 +612,10 @@ void ForwardIterator::RenewIterators() { svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); if (mutable_iter_ != nullptr) { - mutable_iter_->~InternalIterator(); + DeleteIterator(mutable_iter_, true /* is_arena */); } for (auto* m : imm_iters_) { - m->~InternalIterator(); + DeleteIterator(m, true /* is_arena */); } imm_iters_.clear(); @@ -565,13 +656,13 @@ void ForwardIterator::RenewIterators() { } for (auto* f : l0_iters_) { - delete f; + DeleteIterator(f); } l0_iters_.clear(); l0_iters_ = l0_iters_new; for (auto* l : level_iters_) { - delete l; + DeleteIterator(l); } level_iters_.clear(); BuildLevelIterators(vstorage_new); @@ -579,6 +670,8 @@ void ForwardIterator::RenewIterators() { is_prev_set_ = false; SVCleanup(); sv_ = svnew; + + UpdateChildrenPinnedItersMgr(); } void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) { @@ -608,10 +701,11 @@ void ForwardIterator::ResetIncompleteIterators() { if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) { continue; } - delete l0_iters_[i]; + DeleteIterator(l0_iters_[i]); l0_iters_[i] = cfd_->table_cache()->NewIterator( read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0_files[i]->fd); + l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_); } for (auto* level_iter : level_iters_) { @@ -700,7 +794,7 @@ void ForwardIterator::DeleteCurrentIter() { } if (l0_iters_[i] == current_) { has_iter_trimmed_for_upper_bound_ = true; - delete l0_iters_[i]; + DeleteIterator(l0_iters_[i]); l0_iters_[i] = nullptr; return; } @@ -712,7 +806,7 @@ void ForwardIterator::DeleteCurrentIter() { } if (level_iters_[level - 1] == current_) { has_iter_trimmed_for_upper_bound_ = true; - delete level_iters_[level - 1]; + DeleteIterator(level_iters_[level - 1]); level_iters_[level - 1] = nullptr; } } @@ -776,6 +870,22 @@ uint32_t ForwardIterator::FindFileInRange( return right; } +void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) { + if (iter == nullptr) { + return; + } + + if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { + pinned_iters_mgr_->PinIterator(iter, is_arena); + } else { + if (is_arena) { + iter->~InternalIterator(); + } else { + delete iter; + } + } +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/forward_iterator.h b/db/forward_iterator.h index b5beeceef..32ebf262c 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -72,6 +72,10 @@ class ForwardIterator : public InternalIterator { virtual Slice value() const override; virtual Status status() const override; virtual Status GetProperty(std::string prop_name, std::string* prop) override; + virtual void SetPinnedItersMgr( + PinnedIteratorsManager* pinned_iters_mgr) override; + virtual bool IsKeyPinned() const override; + virtual bool IsValuePinned() const override; bool TEST_CheckDeletedIters(int* deleted_iters, int* num_iters); @@ -92,6 +96,14 @@ class ForwardIterator : public InternalIterator { bool IsOverUpperBound(const Slice& internal_key) const; + // Set PinnedIteratorsManager for all children Iterators, this function should + // be called whenever we update children Iterators or pinned_iters_mgr_. + void UpdateChildrenPinnedItersMgr(); + + // A helper function that will release iter in the proper manner, or pass it + // to pinned_iters_mgr_ to release it later if pinning is enabled. + void DeleteIterator(InternalIterator* iter, bool is_arena = false); + DBImpl* const db_; const ReadOptions read_options_; ColumnFamilyData* const cfd_; @@ -129,6 +141,7 @@ class ForwardIterator : public InternalIterator { bool is_prev_set_; bool is_prev_inclusive_; + PinnedIteratorsManager* pinned_iters_mgr_; Arena arena_; };