From 48e8baebc03716adcf98a0c4cc05f17cd1820f04 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 15 Nov 2016 17:18:32 -0800 Subject: [PATCH] Decouple data iterator and range deletion iterator in TableCache Summary: Previously we used TableCache::NewIterator() for multiple purposes (data block iterator and range deletion iterator), and returned non-ok status in the data block iterator. In one case where the caller only used the range deletion block iterator (https://github.com/facebook/rocksdb/blob/9e7cf3469bc626b092ec48366d12873ecab22b4e/db/version_set.cc#L965-L973), we didn't check/free the data block iterator containing non-ok status, which caused a valgrind error. So, this diff decouples creation of data block and range deletion block iterators, and updates the callers accordingly. Both functions can return non-ok status in an InternalIterator. Since the non-ok status is returned in an iterator that the callers will definitely use, it should be more usable/less error-prone. Closes https://github.com/facebook/rocksdb/pull/1513 Differential Revision: D4181423 Pulled By: ajkr fbshipit-source-id: 835b8f5 --- db/builder.cc | 3 +- db/compaction_job.cc | 5 +- db/db_compaction_test.cc | 11 ++-- db/forward_iterator.cc | 11 ++-- db/repair.cc | 3 +- db/table_cache.cc | 122 ++++++++++++++++++++++++--------------- db/table_cache.h | 26 ++++++--- db/version_set.cc | 32 ++++------ table/get_context.cc | 4 +- table/get_context.h | 2 + 10 files changed, 127 insertions(+), 92 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 4c5e27b11..90f1b2150 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -182,7 +182,8 @@ Status BuildTable( if (s.ok() && !empty) { // Verify that the table is usable std::unique_ptr it(table_cache->NewIterator( - ReadOptions(), env_options, internal_comparator, meta->fd, nullptr, + ReadOptions(), env_options, internal_comparator, meta->fd, + nullptr /* range_del_agg */, nullptr, (internal_stats == nullptr) ? nullptr : internal_stats->GetFileReadHist(0), false /* for_compaction */, nullptr /* arena */, diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 091c91856..c4fb012b5 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1028,8 +1028,9 @@ Status CompactionJob::FinishCompactionOutputFile( // Verify that the table is usable InternalIterator* iter = cfd->table_cache()->NewIterator( ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd, - nullptr, cfd->internal_stats()->GetFileReadHist( - compact_->compaction->output_level()), + nullptr /* range_del_agg */, nullptr, + cfd->internal_stats()->GetFileReadHist( + compact_->compaction->output_level()), false); s = iter->status(); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 80fc12ff4..0c2492216 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -306,9 +306,9 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { dbfull()->TEST_WaitForCompact(); // Preloading iterator issues one table cache lookup and creates // a new table reader. One file is created for flush and one for compaction. - // Compaction inputs make no table cache look-up for data iterators or - // range tombstone iterators since they're already cached. - ASSERT_EQ(num_table_cache_lookup, 2); + // Compaction inputs make no table cache look-up for data iterators and one + // look-up per compaction input file (three). + ASSERT_EQ(num_table_cache_lookup, 5); // Create new iterator for: // (1) 1 for verifying flush results // (2) 3 for compaction input files @@ -329,8 +329,9 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { cro.target_level = 2; cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; db_->CompactRange(cro, nullptr, nullptr); - // Only verifying compaction outputs issues one table cache lookup. - ASSERT_EQ(num_table_cache_lookup, 1); + // Only verifying compaction outputs issues two table cache lookup + // (one for data block, one for range deletion block). + ASSERT_EQ(num_table_cache_lookup, 2); // One for compaction input, one for verifying compaction results. ASSERT_EQ(num_new_table_reader, 2); diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index f725d74b8..271086125 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -71,8 +71,8 @@ class LevelIterator : public InternalIterator { file_iter_ = cfd_->table_cache()->NewIterator( read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), - files_[file_index_]->fd, nullptr /* table_reader_ptr */, nullptr, - false); + files_[file_index_]->fd, nullptr /* range_del_agg */, + nullptr /* table_reader_ptr */, nullptr, false); file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); } @@ -574,7 +574,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { continue; } l0_iters_.push_back(cfd_->table_cache()->NewIterator( - read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd)); + read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd, + nullptr /* range_del_agg */)); } BuildLevelIterators(vstorage); current_ = nullptr; @@ -629,7 +630,7 @@ void ForwardIterator::RenewIterators() { } l0_iters_new.push_back(cfd_->table_cache()->NewIterator( read_options_, *cfd_->soptions(), cfd_->internal_comparator(), - l0_files_new[inew]->fd)); + l0_files_new[inew]->fd, nullptr /* range_del_agg */)); } for (auto* f : l0_iters_) { @@ -681,7 +682,7 @@ void ForwardIterator::ResetIncompleteIterators() { DeleteIterator(l0_iters_[i]); l0_iters_[i] = cfd_->table_cache()->NewIterator( read_options_, *cfd_->soptions(), cfd_->internal_comparator(), - l0_files[i]->fd); + l0_files[i]->fd, nullptr /* range_del_agg */); l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_); } diff --git a/db/repair.cc b/db/repair.cc index c211f852e..89a1c74db 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -467,7 +467,8 @@ class Repairer { } if (status.ok()) { InternalIterator* iter = table_cache_->NewIterator( - ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd); + ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd, + nullptr /* range_del_agg */); bool empty = true; ParsedInternalKey parsed; t->min_sequence = 0; diff --git a/db/table_cache.cc b/db/table_cache.cc index 728e20bc0..7de8681ff 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -169,13 +169,23 @@ Status TableCache::FindTable(const EnvOptions& env_options, InternalIterator* TableCache::NewIterator( const ReadOptions& options, const EnvOptions& env_options, const InternalKeyComparator& icomparator, const FileDescriptor& fd, - TableReader** table_reader_ptr, HistogramImpl* file_read_hist, - bool for_compaction, Arena* arena, bool skip_filters, int level, - RangeDelAggregator* range_del_agg /* = nullptr */, - bool is_range_del_only /* = false */) { - assert(!is_range_del_only || range_del_agg != nullptr); + RangeDelAggregator* range_del_agg, TableReader** table_reader_ptr, + HistogramImpl* file_read_hist, bool for_compaction, Arena* arena, + bool skip_filters, int level) { PERF_TIMER_GUARD(new_table_iterator_nanos); + if (range_del_agg != nullptr && !options.ignore_range_deletions) { + std::unique_ptr range_del_iter(NewRangeDeletionIterator( + options, icomparator, fd, file_read_hist, skip_filters, level)); + Status s = range_del_iter->status(); + if (s.ok()) { + s = range_del_agg->AddTombstones(std::move(range_del_iter)); + } + if (!s.ok()) { + return NewErrorInternalIterator(s, arena); + } + } + if (table_reader_ptr != nullptr) { *table_reader_ptr = nullptr; } @@ -185,18 +195,14 @@ InternalIterator* TableCache::NewIterator( size_t readahead = 0; bool create_new_table_reader = false; - // pointless to create a new table reader for range tombstones only since the - // reader isn't reused - if (!is_range_del_only) { - if (for_compaction) { - if (ioptions_.new_table_reader_for_compaction_inputs) { - readahead = ioptions_.compaction_readahead_size; - create_new_table_reader = true; - } - } else { - readahead = options.readahead_size; - create_new_table_reader = readahead > 0; + if (for_compaction) { + if (ioptions_.new_table_reader_for_compaction_inputs) { + readahead = ioptions_.compaction_readahead_size; + create_new_table_reader = true; } + } else { + readahead = options.readahead_size; + create_new_table_reader = readahead > 0; } if (create_new_table_reader) { @@ -223,47 +229,71 @@ InternalIterator* TableCache::NewIterator( } } - if (range_del_agg != nullptr && !options.ignore_range_deletions) { - std::unique_ptr iter( - table_reader->NewRangeTombstoneIterator(options)); - Status s = range_del_agg->AddTombstones(std::move(iter)); - if (!s.ok()) { - return NewErrorInternalIterator(s, arena); - } + InternalIterator* result = + table_reader->NewIterator(options, arena, skip_filters); + if (create_new_table_reader) { + assert(handle == nullptr); + result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); + } else if (handle != nullptr) { + result->RegisterCleanup(&UnrefEntry, cache_, handle); } - InternalIterator* result = nullptr; - if (!is_range_del_only) { - result = table_reader->NewIterator(options, arena, skip_filters); - if (create_new_table_reader) { - assert(handle == nullptr); - result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); - } else if (handle != nullptr) { - result->RegisterCleanup(&UnrefEntry, cache_, handle); - } - - if (for_compaction) { - table_reader->SetupForCompaction(); - } - if (table_reader_ptr != nullptr) { - *table_reader_ptr = table_reader; - } - } else { - assert(!create_new_table_reader); - // don't need the table reader at all since the iterator over the meta-block - // doesn't require it - if (handle != nullptr) { - UnrefEntry(cache_, handle); - } + if (for_compaction) { + table_reader->SetupForCompaction(); + } + if (table_reader_ptr != nullptr) { + *table_reader_ptr = table_reader; } return result; } +InternalIterator* TableCache::NewRangeDeletionIterator( + const ReadOptions& options, const InternalKeyComparator& icmp, + const FileDescriptor& fd, HistogramImpl* file_read_hist, bool skip_filters, + int level) { + if (options.ignore_range_deletions) { + return NewEmptyInternalIterator(); + } + Status s; + TableReader* table_reader = fd.table_reader; + Cache::Handle* cache_handle = nullptr; + if (table_reader == nullptr) { + s = FindTable(env_options_, icmp, fd, &cache_handle, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record_read_stats */, file_read_hist, skip_filters, + level); + if (s.ok()) { + table_reader = GetTableReaderFromHandle(cache_handle); + } + } + if (s.ok()) { + auto* result = table_reader->NewRangeTombstoneIterator(options); + if (cache_handle != nullptr) { + result->RegisterCleanup(&UnrefEntry, cache_, cache_handle); + } + return result; + } + return NewErrorInternalIterator(s); +} + Status TableCache::Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, const Slice& k, GetContext* get_context, HistogramImpl* file_read_hist, bool skip_filters, int level) { + if (get_context->range_del_agg() != nullptr && + !options.ignore_range_deletions) { + std::unique_ptr range_del_iter(NewRangeDeletionIterator( + options, internal_comparator, fd, file_read_hist, skip_filters, level)); + Status s = range_del_iter->status(); + if (s.ok()) { + s = get_context->range_del_agg()->AddTombstones( + std::move(range_del_iter)); + } + if (!s.ok()) { + return s; + } + } TableReader* t = fd.table_reader; Status s; Cache::Handle* handle = nullptr; diff --git a/db/table_cache.h b/db/table_cache.h index 2bf7f3c77..e5f31770b 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -46,24 +46,34 @@ class TableCache { // the returned iterator. The returned "*tableptr" object is owned by // the cache and should not be deleted, and is valid for as long as the // returned iterator is live. + // @param range_del_agg If non-nullptr, adds range deletions to the + // aggregator. If an error occurs, returns it in a NewErrorInternalIterator // @param skip_filters Disables loading/accessing the filter block // @param level The level this table is at, -1 for "not set / don't know" - // @param range_del_agg When non-nullptr, creates a range tombstone iterator - // over this file's meta-block and gives it to this object - // @param is_range_del_only When set, this function only gives a range - // tombstone iterator to range_del_agg and then returns nullptr InternalIterator* NewIterator( const ReadOptions& options, const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, - const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr, + const FileDescriptor& file_fd, RangeDelAggregator* range_del_agg, + TableReader** table_reader_ptr = nullptr, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, - Arena* arena = nullptr, bool skip_filters = false, int level = -1, - RangeDelAggregator* range_del_agg = nullptr, - bool is_range_del_only = false); + Arena* arena = nullptr, bool skip_filters = false, int level = -1); + + // Return an iterator over the range deletion meta-block for the specified + // file number. + // @param skip_filters Disables loading/accessing the filter block + // @param level The level this table is at, -1 for "not set / don't know" + InternalIterator* NewRangeDeletionIterator(const ReadOptions& options, + const InternalKeyComparator& icmp, + const FileDescriptor& fd, + HistogramImpl* file_read_hist, + bool skip_filters, int level); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until // it returns false. + // @param get_context State for get operation. If its range_del_agg() returns + // non-nullptr, adds range deletions to the aggregator. If an error occurs, + // returns non-ok status. // @param skip_filters Disables loading/accessing the filter block // @param level The level this table is at, -1 for "not set / don't know" Status Get(const ReadOptions& options, diff --git a/db/version_set.cc b/db/version_set.cc index dbd65dca8..ad57974b3 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -518,10 +518,9 @@ class LevelFileIteratorState : public TwoLevelIteratorState { const FileDescriptor* fd = reinterpret_cast(meta_handle.data()); return table_cache_->NewIterator( - read_options_, env_options_, icomparator_, *fd, + read_options_, env_options_, icomparator_, *fd, range_del_agg_, nullptr /* don't need reference to table */, file_read_hist_, - for_compaction_, nullptr /* arena */, skip_filters_, level_, - range_del_agg_, false /* is_range_del_only */); + for_compaction_, nullptr /* arena */, skip_filters_, level_); } bool PrefixMayMatch(const Slice& internal_key) override { @@ -834,9 +833,9 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) { const auto& file = storage_info_.LevelFilesBrief(0).files[i]; merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( - read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, - cfd_->internal_stats()->GetFileReadHist(0), false, arena, - false /* skip_filters */, 0 /* level */, range_del_agg)); + read_options, soptions, cfd_->internal_comparator(), file.fd, + range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0), + false, arena, false /* skip_filters */, 0 /* level */)); } } else { // For levels > 0, we can use a concatenating iterator that sequentially @@ -961,18 +960,6 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, user_comparator(), internal_comparator()); FdWithKeyRange* f = fp.GetNextFile(); while (f != nullptr) { - if (!read_options.ignore_range_deletions) { - table_cache_->NewIterator( - read_options, vset_->env_options(), *internal_comparator(), f->fd, - nullptr /* table_reader_ptr */, - cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), - false /* for_compaction */, nullptr /* arena */, - IsFilterSkipped(static_cast(fp.GetHitFileLevel()), - fp.IsHitFileLastInLevel()), - fp.GetCurrentLevel() /* level */, range_del_agg, - true /* is_range_del_only */); - } - *status = table_cache_->Get( read_options, *internal_comparator(), f->fd, ikey, &get_context, cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), @@ -3328,7 +3315,7 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f, TableReader* table_reader_ptr; InternalIterator* iter = v->cfd_->table_cache()->NewIterator( ReadOptions(), env_options_, v->cfd_->internal_comparator(), f.fd, - &table_reader_ptr); + nullptr /* range_del_agg */, &table_reader_ptr); if (table_reader_ptr != nullptr) { result = table_reader_ptr->ApproximateOffsetOf(key); } @@ -3399,10 +3386,11 @@ InternalIterator* VersionSet::MakeInputIterator( for (size_t i = 0; i < flevel->num_files; i++) { list[num++] = cfd->table_cache()->NewIterator( read_options, env_options_compactions_, - cfd->internal_comparator(), flevel->files[i].fd, nullptr, - nullptr, /* no per level latency histogram*/ + cfd->internal_comparator(), flevel->files[i].fd, range_del_agg, + nullptr /* table_reader_ptr */, + nullptr /* no per level latency histogram */, true /* for_compaction */, nullptr /* arena */, - false /* skip_filters */, (int)which /* level */, range_del_agg); + false /* skip_filters */, (int)which /* level */); } } else { // Create concatenating iterator for the files from this level diff --git a/table/get_context.cc b/table/get_context.cc index 662833f4c..280206c54 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -37,7 +37,7 @@ GetContext::GetContext(const Comparator* ucmp, Statistics* statistics, GetState init_state, const Slice& user_key, std::string* ret_value, bool* value_found, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, Env* env, + RangeDelAggregator* _range_del_agg, Env* env, SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr) : ucmp_(ucmp), @@ -49,7 +49,7 @@ GetContext::GetContext(const Comparator* ucmp, value_(ret_value), value_found_(value_found), merge_context_(merge_context), - range_del_agg_(range_del_agg), + range_del_agg_(_range_del_agg), env_(env), seq_(seq), replay_log_(nullptr), diff --git a/table/get_context.h b/table/get_context.h index f76c9b48f..e57c7352c 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -47,6 +47,8 @@ class GetContext { GetState State() const { return state_; } + RangeDelAggregator* range_del_agg() { return range_del_agg_; } + PinnedIteratorsManager* pinned_iters_mgr() { return pinned_iters_mgr_; } // If a non-null string is passed, all the SaveValue calls will be