Decouple data iterator and range deletion iterator in TableCache
Summary:
Previously we used TableCache::NewIterator() for multiple purposes (data
block iterator and range deletion iterator), and returned non-ok status in
the data block iterator. In one case where the caller only used the range
deletion block iterator (9e7cf3469b/db/version_set.cc (L965-L973)
),
we didn't check/free the data block iterator containing non-ok status, which
caused a valgrind error.
So, this diff decouples creation of data block and range deletion block iterators,
and updates the callers accordingly. Both functions can return non-ok status
in an InternalIterator. Since the non-ok status is returned in an iterator that the
callers will definitely use, it should be more usable/less error-prone.
Closes https://github.com/facebook/rocksdb/pull/1513
Differential Revision: D4181423
Pulled By: ajkr
fbshipit-source-id: 835b8f5
This commit is contained in:
parent
4b0aa3c4c8
commit
48e8baebc0
@ -182,7 +182,8 @@ Status BuildTable(
|
||||
if (s.ok() && !empty) {
|
||||
// Verify that the table is usable
|
||||
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
|
||||
ReadOptions(), env_options, internal_comparator, meta->fd, nullptr,
|
||||
ReadOptions(), env_options, internal_comparator, meta->fd,
|
||||
nullptr /* range_del_agg */, nullptr,
|
||||
(internal_stats == nullptr) ? nullptr
|
||||
: internal_stats->GetFileReadHist(0),
|
||||
false /* for_compaction */, nullptr /* arena */,
|
||||
|
@ -1028,7 +1028,8 @@ Status CompactionJob::FinishCompactionOutputFile(
|
||||
// Verify that the table is usable
|
||||
InternalIterator* iter = cfd->table_cache()->NewIterator(
|
||||
ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
|
||||
nullptr, cfd->internal_stats()->GetFileReadHist(
|
||||
nullptr /* range_del_agg */, nullptr,
|
||||
cfd->internal_stats()->GetFileReadHist(
|
||||
compact_->compaction->output_level()),
|
||||
false);
|
||||
s = iter->status();
|
||||
|
@ -306,9 +306,9 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
// Preloading iterator issues one table cache lookup and creates
|
||||
// a new table reader. One file is created for flush and one for compaction.
|
||||
// Compaction inputs make no table cache look-up for data iterators or
|
||||
// range tombstone iterators since they're already cached.
|
||||
ASSERT_EQ(num_table_cache_lookup, 2);
|
||||
// Compaction inputs make no table cache look-up for data iterators and one
|
||||
// look-up per compaction input file (three).
|
||||
ASSERT_EQ(num_table_cache_lookup, 5);
|
||||
// Create new iterator for:
|
||||
// (1) 1 for verifying flush results
|
||||
// (2) 3 for compaction input files
|
||||
@ -329,8 +329,9 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
|
||||
cro.target_level = 2;
|
||||
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
||||
db_->CompactRange(cro, nullptr, nullptr);
|
||||
// Only verifying compaction outputs issues one table cache lookup.
|
||||
ASSERT_EQ(num_table_cache_lookup, 1);
|
||||
// Only verifying compaction outputs issues two table cache lookup
|
||||
// (one for data block, one for range deletion block).
|
||||
ASSERT_EQ(num_table_cache_lookup, 2);
|
||||
// One for compaction input, one for verifying compaction results.
|
||||
ASSERT_EQ(num_new_table_reader, 2);
|
||||
|
||||
|
@ -71,8 +71,8 @@ class LevelIterator : public InternalIterator {
|
||||
|
||||
file_iter_ = cfd_->table_cache()->NewIterator(
|
||||
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
|
||||
files_[file_index_]->fd, nullptr /* table_reader_ptr */, nullptr,
|
||||
false);
|
||||
files_[file_index_]->fd, nullptr /* range_del_agg */,
|
||||
nullptr /* table_reader_ptr */, nullptr, false);
|
||||
|
||||
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
|
||||
}
|
||||
@ -574,7 +574,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
|
||||
continue;
|
||||
}
|
||||
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
|
||||
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd));
|
||||
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd,
|
||||
nullptr /* range_del_agg */));
|
||||
}
|
||||
BuildLevelIterators(vstorage);
|
||||
current_ = nullptr;
|
||||
@ -629,7 +630,7 @@ void ForwardIterator::RenewIterators() {
|
||||
}
|
||||
l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
|
||||
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
|
||||
l0_files_new[inew]->fd));
|
||||
l0_files_new[inew]->fd, nullptr /* range_del_agg */));
|
||||
}
|
||||
|
||||
for (auto* f : l0_iters_) {
|
||||
@ -681,7 +682,7 @@ void ForwardIterator::ResetIncompleteIterators() {
|
||||
DeleteIterator(l0_iters_[i]);
|
||||
l0_iters_[i] = cfd_->table_cache()->NewIterator(
|
||||
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
|
||||
l0_files[i]->fd);
|
||||
l0_files[i]->fd, nullptr /* range_del_agg */);
|
||||
l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
|
||||
}
|
||||
|
||||
|
@ -467,7 +467,8 @@ class Repairer {
|
||||
}
|
||||
if (status.ok()) {
|
||||
InternalIterator* iter = table_cache_->NewIterator(
|
||||
ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd);
|
||||
ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd,
|
||||
nullptr /* range_del_agg */);
|
||||
bool empty = true;
|
||||
ParsedInternalKey parsed;
|
||||
t->min_sequence = 0;
|
||||
|
@ -169,13 +169,23 @@ Status TableCache::FindTable(const EnvOptions& env_options,
|
||||
InternalIterator* TableCache::NewIterator(
|
||||
const ReadOptions& options, const EnvOptions& env_options,
|
||||
const InternalKeyComparator& icomparator, const FileDescriptor& fd,
|
||||
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
|
||||
bool for_compaction, Arena* arena, bool skip_filters, int level,
|
||||
RangeDelAggregator* range_del_agg /* = nullptr */,
|
||||
bool is_range_del_only /* = false */) {
|
||||
assert(!is_range_del_only || range_del_agg != nullptr);
|
||||
RangeDelAggregator* range_del_agg, TableReader** table_reader_ptr,
|
||||
HistogramImpl* file_read_hist, bool for_compaction, Arena* arena,
|
||||
bool skip_filters, int level) {
|
||||
PERF_TIMER_GUARD(new_table_iterator_nanos);
|
||||
|
||||
if (range_del_agg != nullptr && !options.ignore_range_deletions) {
|
||||
std::unique_ptr<InternalIterator> range_del_iter(NewRangeDeletionIterator(
|
||||
options, icomparator, fd, file_read_hist, skip_filters, level));
|
||||
Status s = range_del_iter->status();
|
||||
if (s.ok()) {
|
||||
s = range_del_agg->AddTombstones(std::move(range_del_iter));
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return NewErrorInternalIterator(s, arena);
|
||||
}
|
||||
}
|
||||
|
||||
if (table_reader_ptr != nullptr) {
|
||||
*table_reader_ptr = nullptr;
|
||||
}
|
||||
@ -185,9 +195,6 @@ InternalIterator* TableCache::NewIterator(
|
||||
|
||||
size_t readahead = 0;
|
||||
bool create_new_table_reader = false;
|
||||
// pointless to create a new table reader for range tombstones only since the
|
||||
// reader isn't reused
|
||||
if (!is_range_del_only) {
|
||||
if (for_compaction) {
|
||||
if (ioptions_.new_table_reader_for_compaction_inputs) {
|
||||
readahead = ioptions_.compaction_readahead_size;
|
||||
@ -197,7 +204,6 @@ InternalIterator* TableCache::NewIterator(
|
||||
readahead = options.readahead_size;
|
||||
create_new_table_reader = readahead > 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (create_new_table_reader) {
|
||||
unique_ptr<TableReader> table_reader_unique_ptr;
|
||||
@ -223,18 +229,8 @@ InternalIterator* TableCache::NewIterator(
|
||||
}
|
||||
}
|
||||
|
||||
if (range_del_agg != nullptr && !options.ignore_range_deletions) {
|
||||
std::unique_ptr<InternalIterator> iter(
|
||||
table_reader->NewRangeTombstoneIterator(options));
|
||||
Status s = range_del_agg->AddTombstones(std::move(iter));
|
||||
if (!s.ok()) {
|
||||
return NewErrorInternalIterator(s, arena);
|
||||
}
|
||||
}
|
||||
|
||||
InternalIterator* result = nullptr;
|
||||
if (!is_range_del_only) {
|
||||
result = table_reader->NewIterator(options, arena, skip_filters);
|
||||
InternalIterator* result =
|
||||
table_reader->NewIterator(options, arena, skip_filters);
|
||||
if (create_new_table_reader) {
|
||||
assert(handle == nullptr);
|
||||
result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);
|
||||
@ -248,15 +244,36 @@ InternalIterator* TableCache::NewIterator(
|
||||
if (table_reader_ptr != nullptr) {
|
||||
*table_reader_ptr = table_reader;
|
||||
}
|
||||
} else {
|
||||
assert(!create_new_table_reader);
|
||||
// don't need the table reader at all since the iterator over the meta-block
|
||||
// doesn't require it
|
||||
if (handle != nullptr) {
|
||||
UnrefEntry(cache_, handle);
|
||||
return result;
|
||||
}
|
||||
|
||||
InternalIterator* TableCache::NewRangeDeletionIterator(
|
||||
const ReadOptions& options, const InternalKeyComparator& icmp,
|
||||
const FileDescriptor& fd, HistogramImpl* file_read_hist, bool skip_filters,
|
||||
int level) {
|
||||
if (options.ignore_range_deletions) {
|
||||
return NewEmptyInternalIterator();
|
||||
}
|
||||
Status s;
|
||||
TableReader* table_reader = fd.table_reader;
|
||||
Cache::Handle* cache_handle = nullptr;
|
||||
if (table_reader == nullptr) {
|
||||
s = FindTable(env_options_, icmp, fd, &cache_handle,
|
||||
options.read_tier == kBlockCacheTier /* no_io */,
|
||||
true /* record_read_stats */, file_read_hist, skip_filters,
|
||||
level);
|
||||
if (s.ok()) {
|
||||
table_reader = GetTableReaderFromHandle(cache_handle);
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
auto* result = table_reader->NewRangeTombstoneIterator(options);
|
||||
if (cache_handle != nullptr) {
|
||||
result->RegisterCleanup(&UnrefEntry, cache_, cache_handle);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return NewErrorInternalIterator(s);
|
||||
}
|
||||
|
||||
Status TableCache::Get(const ReadOptions& options,
|
||||
@ -264,6 +281,19 @@ Status TableCache::Get(const ReadOptions& options,
|
||||
const FileDescriptor& fd, const Slice& k,
|
||||
GetContext* get_context, HistogramImpl* file_read_hist,
|
||||
bool skip_filters, int level) {
|
||||
if (get_context->range_del_agg() != nullptr &&
|
||||
!options.ignore_range_deletions) {
|
||||
std::unique_ptr<InternalIterator> range_del_iter(NewRangeDeletionIterator(
|
||||
options, internal_comparator, fd, file_read_hist, skip_filters, level));
|
||||
Status s = range_del_iter->status();
|
||||
if (s.ok()) {
|
||||
s = get_context->range_del_agg()->AddTombstones(
|
||||
std::move(range_del_iter));
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
TableReader* t = fd.table_reader;
|
||||
Status s;
|
||||
Cache::Handle* handle = nullptr;
|
||||
|
@ -46,24 +46,34 @@ class TableCache {
|
||||
// the returned iterator. The returned "*tableptr" object is owned by
|
||||
// the cache and should not be deleted, and is valid for as long as the
|
||||
// returned iterator is live.
|
||||
// @param range_del_agg If non-nullptr, adds range deletions to the
|
||||
// aggregator. If an error occurs, returns it in a NewErrorInternalIterator
|
||||
// @param skip_filters Disables loading/accessing the filter block
|
||||
// @param level The level this table is at, -1 for "not set / don't know"
|
||||
// @param range_del_agg When non-nullptr, creates a range tombstone iterator
|
||||
// over this file's meta-block and gives it to this object
|
||||
// @param is_range_del_only When set, this function only gives a range
|
||||
// tombstone iterator to range_del_agg and then returns nullptr
|
||||
InternalIterator* NewIterator(
|
||||
const ReadOptions& options, const EnvOptions& toptions,
|
||||
const InternalKeyComparator& internal_comparator,
|
||||
const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr,
|
||||
const FileDescriptor& file_fd, RangeDelAggregator* range_del_agg,
|
||||
TableReader** table_reader_ptr = nullptr,
|
||||
HistogramImpl* file_read_hist = nullptr, bool for_compaction = false,
|
||||
Arena* arena = nullptr, bool skip_filters = false, int level = -1,
|
||||
RangeDelAggregator* range_del_agg = nullptr,
|
||||
bool is_range_del_only = false);
|
||||
Arena* arena = nullptr, bool skip_filters = false, int level = -1);
|
||||
|
||||
// Return an iterator over the range deletion meta-block for the specified
|
||||
// file number.
|
||||
// @param skip_filters Disables loading/accessing the filter block
|
||||
// @param level The level this table is at, -1 for "not set / don't know"
|
||||
InternalIterator* NewRangeDeletionIterator(const ReadOptions& options,
|
||||
const InternalKeyComparator& icmp,
|
||||
const FileDescriptor& fd,
|
||||
HistogramImpl* file_read_hist,
|
||||
bool skip_filters, int level);
|
||||
|
||||
// If a seek to internal key "k" in specified file finds an entry,
|
||||
// call (*handle_result)(arg, found_key, found_value) repeatedly until
|
||||
// it returns false.
|
||||
// @param get_context State for get operation. If its range_del_agg() returns
|
||||
// non-nullptr, adds range deletions to the aggregator. If an error occurs,
|
||||
// returns non-ok status.
|
||||
// @param skip_filters Disables loading/accessing the filter block
|
||||
// @param level The level this table is at, -1 for "not set / don't know"
|
||||
Status Get(const ReadOptions& options,
|
||||
|
@ -518,10 +518,9 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
|
||||
const FileDescriptor* fd =
|
||||
reinterpret_cast<const FileDescriptor*>(meta_handle.data());
|
||||
return table_cache_->NewIterator(
|
||||
read_options_, env_options_, icomparator_, *fd,
|
||||
read_options_, env_options_, icomparator_, *fd, range_del_agg_,
|
||||
nullptr /* don't need reference to table */, file_read_hist_,
|
||||
for_compaction_, nullptr /* arena */, skip_filters_, level_,
|
||||
range_del_agg_, false /* is_range_del_only */);
|
||||
for_compaction_, nullptr /* arena */, skip_filters_, level_);
|
||||
}
|
||||
|
||||
bool PrefixMayMatch(const Slice& internal_key) override {
|
||||
@ -834,9 +833,9 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
|
||||
for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
|
||||
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
|
||||
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
|
||||
read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr,
|
||||
cfd_->internal_stats()->GetFileReadHist(0), false, arena,
|
||||
false /* skip_filters */, 0 /* level */, range_del_agg));
|
||||
read_options, soptions, cfd_->internal_comparator(), file.fd,
|
||||
range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0),
|
||||
false, arena, false /* skip_filters */, 0 /* level */));
|
||||
}
|
||||
} else {
|
||||
// For levels > 0, we can use a concatenating iterator that sequentially
|
||||
@ -961,18 +960,6 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
user_comparator(), internal_comparator());
|
||||
FdWithKeyRange* f = fp.GetNextFile();
|
||||
while (f != nullptr) {
|
||||
if (!read_options.ignore_range_deletions) {
|
||||
table_cache_->NewIterator(
|
||||
read_options, vset_->env_options(), *internal_comparator(), f->fd,
|
||||
nullptr /* table_reader_ptr */,
|
||||
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
|
||||
false /* for_compaction */, nullptr /* arena */,
|
||||
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
|
||||
fp.IsHitFileLastInLevel()),
|
||||
fp.GetCurrentLevel() /* level */, range_del_agg,
|
||||
true /* is_range_del_only */);
|
||||
}
|
||||
|
||||
*status = table_cache_->Get(
|
||||
read_options, *internal_comparator(), f->fd, ikey, &get_context,
|
||||
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
|
||||
@ -3328,7 +3315,7 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
|
||||
TableReader* table_reader_ptr;
|
||||
InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
|
||||
ReadOptions(), env_options_, v->cfd_->internal_comparator(), f.fd,
|
||||
&table_reader_ptr);
|
||||
nullptr /* range_del_agg */, &table_reader_ptr);
|
||||
if (table_reader_ptr != nullptr) {
|
||||
result = table_reader_ptr->ApproximateOffsetOf(key);
|
||||
}
|
||||
@ -3399,10 +3386,11 @@ InternalIterator* VersionSet::MakeInputIterator(
|
||||
for (size_t i = 0; i < flevel->num_files; i++) {
|
||||
list[num++] = cfd->table_cache()->NewIterator(
|
||||
read_options, env_options_compactions_,
|
||||
cfd->internal_comparator(), flevel->files[i].fd, nullptr,
|
||||
nullptr, /* no per level latency histogram*/
|
||||
cfd->internal_comparator(), flevel->files[i].fd, range_del_agg,
|
||||
nullptr /* table_reader_ptr */,
|
||||
nullptr /* no per level latency histogram */,
|
||||
true /* for_compaction */, nullptr /* arena */,
|
||||
false /* skip_filters */, (int)which /* level */, range_del_agg);
|
||||
false /* skip_filters */, (int)which /* level */);
|
||||
}
|
||||
} else {
|
||||
// Create concatenating iterator for the files from this level
|
||||
|
@ -37,7 +37,7 @@ GetContext::GetContext(const Comparator* ucmp,
|
||||
Statistics* statistics, GetState init_state,
|
||||
const Slice& user_key, std::string* ret_value,
|
||||
bool* value_found, MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg, Env* env,
|
||||
RangeDelAggregator* _range_del_agg, Env* env,
|
||||
SequenceNumber* seq,
|
||||
PinnedIteratorsManager* _pinned_iters_mgr)
|
||||
: ucmp_(ucmp),
|
||||
@ -49,7 +49,7 @@ GetContext::GetContext(const Comparator* ucmp,
|
||||
value_(ret_value),
|
||||
value_found_(value_found),
|
||||
merge_context_(merge_context),
|
||||
range_del_agg_(range_del_agg),
|
||||
range_del_agg_(_range_del_agg),
|
||||
env_(env),
|
||||
seq_(seq),
|
||||
replay_log_(nullptr),
|
||||
|
@ -47,6 +47,8 @@ class GetContext {
|
||||
|
||||
GetState State() const { return state_; }
|
||||
|
||||
RangeDelAggregator* range_del_agg() { return range_del_agg_; }
|
||||
|
||||
PinnedIteratorsManager* pinned_iters_mgr() { return pinned_iters_mgr_; }
|
||||
|
||||
// If a non-null string is passed, all the SaveValue calls will be
|
||||
|
Loading…
Reference in New Issue
Block a user