diff --git a/db/db_test2.cc b/db/db_test2.cc index 147715a7d..e89364651 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1628,37 +1628,7 @@ TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) { // 3 L4 Files ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3"); - // Verify Get() - for (auto kv : true_data) { - ASSERT_EQ(Get(kv.first), kv.second); - } - - Iterator* iter = db_->NewIterator(ReadOptions()); - - // Verify Iterator::Next() - auto data_iter = true_data.begin(); - for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { - ASSERT_EQ(iter->key().ToString(), data_iter->first); - ASSERT_EQ(iter->value().ToString(), data_iter->second); - } - ASSERT_EQ(data_iter, true_data.end()); - - // Verify Iterator::Prev() - auto data_rev = true_data.rbegin(); - for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { - ASSERT_EQ(iter->key().ToString(), data_rev->first); - ASSERT_EQ(iter->value().ToString(), data_rev->second); - } - ASSERT_EQ(data_rev, true_data.rend()); - - // Verify Iterator::Seek() - for (auto kv : true_data) { - iter->Seek(kv.first); - ASSERT_EQ(kv.first, iter->key().ToString()); - ASSERT_EQ(kv.second, iter->value().ToString()); - } - - delete iter; + VerifyDBFromMap(true_data); } TEST_P(MergeOperatorPinningTest, Randomized) { @@ -1807,12 +1777,69 @@ TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) { } }; - VerifyDBFromMap(true_data); - ASSERT_EQ(merge_cnt, kNumKeys * 4 /* get + next + prev + seek */); + size_t total_reads; + VerifyDBFromMap(true_data, &total_reads); + ASSERT_EQ(merge_cnt, total_reads); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - VerifyDBFromMap(true_data); + VerifyDBFromMap(true_data, &total_reads); +} + +TEST_P(MergeOperatorPinningTest, TailingIterator) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreateMaxOperator(); + BlockBasedTableOptions bbto; + bbto.no_block_cache = disable_block_cache_; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); + + const int kNumOperands = 100; + const int kNumWrites = 100000; + + std::function writer_func = [&]() { + int k = 0; + for (int i = 0; i < kNumWrites; i++) { + db_->Merge(WriteOptions(), Key(k), Key(k)); + + if (i && i % kNumOperands == 0) { + k++; + } + if (i && i % 127 == 0) { + ASSERT_OK(Flush()); + } + if (i && i % 317 == 0) { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + } + } + }; + + std::function reader_func = [&]() { + ReadOptions ro; + ro.tailing = true; + Iterator* iter = db_->NewIterator(ro); + + iter->SeekToFirst(); + for (int i = 0; i < (kNumWrites / kNumOperands); i++) { + while (!iter->Valid()) { + // wait for the key to be written + env_->SleepForMicroseconds(100); + iter->Seek(Key(i)); + } + ASSERT_EQ(iter->key(), Key(i)); + ASSERT_EQ(iter->value(), Key(i)); + + iter->Next(); + } + + delete iter; + }; + + std::thread writer_thread(writer_func); + std::thread reader_thread(reader_func); + + writer_thread.join(); + reader_thread.join(); } #endif // ROCKSDB_LITE diff --git a/db/db_test_util.cc b/db/db_test_util.cc index c79b05cba..6a1a91492 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1076,38 +1076,90 @@ std::vector DBTestBase::ListTableFiles(Env* env, return file_numbers; } -void DBTestBase::VerifyDBFromMap(std::map true_data) { +void DBTestBase::VerifyDBFromMap(std::map true_data, + size_t* total_reads_res) { + size_t total_reads = 0; + for (auto& kv : true_data) { ASSERT_EQ(Get(kv.first), kv.second); + total_reads++; } + // Normal Iterator + { + int iter_cnt = 0; + ReadOptions ro; + ro.total_order_seek = true; + Iterator* iter = db_->NewIterator(ro); + // Verify Iterator::Next() + iter_cnt = 0; + auto data_iter = true_data.begin(); + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { + ASSERT_EQ(iter->key().ToString(), data_iter->first); + ASSERT_EQ(iter->value().ToString(), data_iter->second); + iter_cnt++; + total_reads++; + } + ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / " + << true_data.size(); + + // Verify Iterator::Prev() + iter_cnt = 0; + auto data_rev = true_data.rbegin(); + for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { + ASSERT_EQ(iter->key().ToString(), data_rev->first); + ASSERT_EQ(iter->value().ToString(), data_rev->second); + iter_cnt++; + total_reads++; + } + ASSERT_EQ(data_rev, true_data.rend()) << iter_cnt << " / " + << true_data.size(); + + // Verify Iterator::Seek() + for (auto kv : true_data) { + iter->Seek(kv.first); + ASSERT_EQ(kv.first, iter->key().ToString()); + ASSERT_EQ(kv.second, iter->value().ToString()); + total_reads++; + } + + delete iter; + } + +#ifndef ROCKSDB_LITE + // Tailing iterator + int iter_cnt = 0; ReadOptions ro; + ro.tailing = true; ro.total_order_seek = true; Iterator* iter = db_->NewIterator(ro); - // Verify Iterator::Next() + + // Verify ForwardIterator::Next() + iter_cnt = 0; auto data_iter = true_data.begin(); for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { ASSERT_EQ(iter->key().ToString(), data_iter->first); ASSERT_EQ(iter->value().ToString(), data_iter->second); + iter_cnt++; + total_reads++; } - ASSERT_EQ(data_iter, true_data.end()); + ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / " + << true_data.size(); - // Verify Iterator::Prev() - auto data_rev = true_data.rbegin(); - for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { - ASSERT_EQ(iter->key().ToString(), data_rev->first); - ASSERT_EQ(iter->value().ToString(), data_rev->second); - } - ASSERT_EQ(data_rev, true_data.rend()); - - // Verify Iterator::Seek() + // Verify ForwardIterator::Seek() for (auto kv : true_data) { iter->Seek(kv.first); ASSERT_EQ(kv.first, iter->key().ToString()); ASSERT_EQ(kv.second, iter->value().ToString()); + total_reads++; } delete iter; +#endif // ROCKSDB_LITE + + if (total_reads_res) { + *total_reads_res = total_reads; + } } #ifndef ROCKSDB_LITE diff --git a/db/db_test_util.h b/db/db_test_util.h index 237dc51e4..645619602 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -810,7 +810,8 @@ class DBTestBase : public testing::Test { std::vector ListTableFiles(Env* env, const std::string& path); - void VerifyDBFromMap(std::map true_data); + void VerifyDBFromMap(std::map true_data, + size_t* total_reads_res = nullptr); #ifndef ROCKSDB_LITE Status GenerateAndAddExternalFile(const Options options, diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index f7eb8ca24..f66e1f0af 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -32,10 +32,15 @@ namespace rocksdb { class LevelIterator : public InternalIterator { public: LevelIterator(const ColumnFamilyData* const cfd, - const ReadOptions& read_options, - const std::vector& files) - : cfd_(cfd), read_options_(read_options), files_(files), valid_(false), - file_index_(std::numeric_limits::max()) {} + const ReadOptions& read_options, + const std::vector& files) + : cfd_(cfd), + read_options_(read_options), + files_(files), + valid_(false), + file_index_(std::numeric_limits::max()), + file_iter_(nullptr), + pinned_iters_mgr_(nullptr) {} void SetFileIndex(uint32_t file_index) { assert(file_index < files_.size()); @@ -47,10 +52,20 @@ class LevelIterator : public InternalIterator { } void Reset() { assert(file_index_ < files_.size()); - file_iter_.reset(cfd_->table_cache()->NewIterator( + + // Reset current pointer + if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { + pinned_iters_mgr_->PinIterator(file_iter_); + } else { + delete file_iter_; + } + + file_iter_ = cfd_->table_cache()->NewIterator( read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), files_[file_index_]->fd, nullptr /* table_reader_ptr */, nullptr, - false)); + false); + + file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); } void SeekToLast() override { status_ = Status::NotSupported("LevelIterator::SeekToLast()"); @@ -105,6 +120,20 @@ class LevelIterator : public InternalIterator { } return Status::OK(); } + bool IsKeyPinned() const override { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + file_iter_->IsKeyPinned(); + } + bool IsValuePinned() const override { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + file_iter_->IsValuePinned(); + } + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + if (file_iter_) { + file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); + } + } private: const ColumnFamilyData* const cfd_; @@ -114,7 +143,8 @@ class LevelIterator : public InternalIterator { bool valid_; uint32_t file_index_; Status status_; - std::unique_ptr file_iter_; + InternalIterator* file_iter_; + PinnedIteratorsManager* pinned_iters_mgr_; }; ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, @@ -135,7 +165,8 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, has_iter_trimmed_for_upper_bound_(false), current_over_upper_bound_(false), is_prev_set_(false), - is_prev_inclusive_(false) { + is_prev_inclusive_(false), + pinned_iters_mgr_(nullptr) { if (sv_) { RebuildIterators(false); } @@ -145,6 +176,13 @@ ForwardIterator::~ForwardIterator() { Cleanup(true); } +namespace { +// Used in PinnedIteratorsManager to release pinned SuperVersion +static void ReleaseSuperVersionFunc(void* sv) { + delete reinterpret_cast(sv); +} +} // namespace + void ForwardIterator::SVCleanup() { if (sv_ != nullptr && sv_->Unref()) { // Job id == 0 means that this is not our background process, but rather @@ -157,7 +195,11 @@ void ForwardIterator::SVCleanup() { db_->ScheduleBgLogWriterClose(&job_context); } db_->mutex_.Unlock(); - delete sv_; + if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { + pinned_iters_mgr_->PinPtr(sv_, &ReleaseSuperVersionFunc); + } else { + delete sv_; + } if (job_context.HaveSomethingToDelete()) { db_->PurgeObsoleteFiles( job_context, read_options_.background_purge_on_iterator_cleanup); @@ -168,18 +210,21 @@ void ForwardIterator::SVCleanup() { void ForwardIterator::Cleanup(bool release_sv) { if (mutable_iter_ != nullptr) { - mutable_iter_->~InternalIterator(); + DeleteIterator(mutable_iter_, true /* is_arena */); } + for (auto* m : imm_iters_) { - m->~InternalIterator(); + DeleteIterator(m, true /* is_arena */); } imm_iters_.clear(); + for (auto* f : l0_iters_) { - delete f; + DeleteIterator(f); } l0_iters_.clear(); + for (auto* l : level_iters_) { - delete l; + DeleteIterator(l); } level_iters_.clear(); @@ -280,7 +325,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, l0[i]->largest.user_key()) > 0) { if (read_options_.iterate_upper_bound != nullptr) { has_iter_trimmed_for_upper_bound_ = true; - delete l0_iters_[i]; + DeleteIterator(l0_iters_[i]); l0_iters_[i] = nullptr; } continue; @@ -295,7 +340,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, immutable_min_heap_.push(l0_iters_[i]); } else { has_iter_trimmed_for_upper_bound_ = true; - delete l0_iters_[i]; + DeleteIterator(l0_iters_[i]); l0_iters_[i] = nullptr; } } @@ -372,7 +417,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, } else { // Nothing in this level is interesting. Remove. has_iter_trimmed_for_upper_bound_ = true; - delete level_iters_[level - 1]; + DeleteIterator(level_iters_[level - 1]); level_iters_[level - 1] = nullptr; } } @@ -485,6 +530,50 @@ Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) { return Status::InvalidArgument(); } +void ForwardIterator::SetPinnedItersMgr( + PinnedIteratorsManager* pinned_iters_mgr) { + pinned_iters_mgr_ = pinned_iters_mgr; + UpdateChildrenPinnedItersMgr(); +} + +void ForwardIterator::UpdateChildrenPinnedItersMgr() { + // Set PinnedIteratorsManager for mutable memtable iterator. + if (mutable_iter_) { + mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_); + } + + // Set PinnedIteratorsManager for immutable memtable iterators. + for (InternalIterator* child_iter : imm_iters_) { + if (child_iter) { + child_iter->SetPinnedItersMgr(pinned_iters_mgr_); + } + } + + // Set PinnedIteratorsManager for L0 files iterators. + for (InternalIterator* child_iter : l0_iters_) { + if (child_iter) { + child_iter->SetPinnedItersMgr(pinned_iters_mgr_); + } + } + + // Set PinnedIteratorsManager for L1+ levels iterators. + for (LevelIterator* child_iter : level_iters_) { + if (child_iter) { + child_iter->SetPinnedItersMgr(pinned_iters_mgr_); + } + } +} + +bool ForwardIterator::IsKeyPinned() const { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + current_->IsKeyPinned(); +} + +bool ForwardIterator::IsValuePinned() const { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + current_->IsValuePinned(); +} + void ForwardIterator::RebuildIterators(bool refresh_sv) { // Clean up Cleanup(refresh_sv); @@ -513,6 +602,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { BuildLevelIterators(vstorage); current_ = nullptr; is_prev_set_ = false; + + UpdateChildrenPinnedItersMgr(); } void ForwardIterator::RenewIterators() { @@ -521,10 +612,10 @@ void ForwardIterator::RenewIterators() { svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); if (mutable_iter_ != nullptr) { - mutable_iter_->~InternalIterator(); + DeleteIterator(mutable_iter_, true /* is_arena */); } for (auto* m : imm_iters_) { - m->~InternalIterator(); + DeleteIterator(m, true /* is_arena */); } imm_iters_.clear(); @@ -565,13 +656,13 @@ void ForwardIterator::RenewIterators() { } for (auto* f : l0_iters_) { - delete f; + DeleteIterator(f); } l0_iters_.clear(); l0_iters_ = l0_iters_new; for (auto* l : level_iters_) { - delete l; + DeleteIterator(l); } level_iters_.clear(); BuildLevelIterators(vstorage_new); @@ -579,6 +670,8 @@ void ForwardIterator::RenewIterators() { is_prev_set_ = false; SVCleanup(); sv_ = svnew; + + UpdateChildrenPinnedItersMgr(); } void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) { @@ -608,10 +701,11 @@ void ForwardIterator::ResetIncompleteIterators() { if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) { continue; } - delete l0_iters_[i]; + DeleteIterator(l0_iters_[i]); l0_iters_[i] = cfd_->table_cache()->NewIterator( read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0_files[i]->fd); + l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_); } for (auto* level_iter : level_iters_) { @@ -700,7 +794,7 @@ void ForwardIterator::DeleteCurrentIter() { } if (l0_iters_[i] == current_) { has_iter_trimmed_for_upper_bound_ = true; - delete l0_iters_[i]; + DeleteIterator(l0_iters_[i]); l0_iters_[i] = nullptr; return; } @@ -712,7 +806,7 @@ void ForwardIterator::DeleteCurrentIter() { } if (level_iters_[level - 1] == current_) { has_iter_trimmed_for_upper_bound_ = true; - delete level_iters_[level - 1]; + DeleteIterator(level_iters_[level - 1]); level_iters_[level - 1] = nullptr; } } @@ -776,6 +870,22 @@ uint32_t ForwardIterator::FindFileInRange( return right; } +void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) { + if (iter == nullptr) { + return; + } + + if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { + pinned_iters_mgr_->PinIterator(iter, is_arena); + } else { + if (is_arena) { + iter->~InternalIterator(); + } else { + delete iter; + } + } +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/forward_iterator.h b/db/forward_iterator.h index b5beeceef..32ebf262c 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -72,6 +72,10 @@ class ForwardIterator : public InternalIterator { virtual Slice value() const override; virtual Status status() const override; virtual Status GetProperty(std::string prop_name, std::string* prop) override; + virtual void SetPinnedItersMgr( + PinnedIteratorsManager* pinned_iters_mgr) override; + virtual bool IsKeyPinned() const override; + virtual bool IsValuePinned() const override; bool TEST_CheckDeletedIters(int* deleted_iters, int* num_iters); @@ -92,6 +96,14 @@ class ForwardIterator : public InternalIterator { bool IsOverUpperBound(const Slice& internal_key) const; + // Set PinnedIteratorsManager for all children Iterators, this function should + // be called whenever we update children Iterators or pinned_iters_mgr_. + void UpdateChildrenPinnedItersMgr(); + + // A helper function that will release iter in the proper manner, or pass it + // to pinned_iters_mgr_ to release it later if pinning is enabled. + void DeleteIterator(InternalIterator* iter, bool is_arena = false); + DBImpl* const db_; const ReadOptions read_options_; ColumnFamilyData* const cfd_; @@ -129,6 +141,7 @@ class ForwardIterator : public InternalIterator { bool is_prev_set_; bool is_prev_inclusive_; + PinnedIteratorsManager* pinned_iters_mgr_; Arena arena_; };