Range deletion microoptimizations
Summary: - Made RangeDelAggregator's InternalKeyComparator member a reference-to-const so we don't need to copy-construct it. Also added InternalKeyComparator to ImmutableCFOptions so we don't need to construct one for each DBIter. - Made MemTable::NewRangeTombstoneIterator and the table readers' NewRangeTombstoneIterator() functions return nullptr instead of NewEmptyInternalIterator to avoid the allocation. Updated callers accordingly. Closes https://github.com/facebook/rocksdb/pull/1548 Differential Revision: D4208169 Pulled By: ajkr fbshipit-source-id: 2fd65cf
This commit is contained in:
parent
23a18ca5ad
commit
fd43ee09da
@ -82,7 +82,6 @@ Status BuildTable(
|
||||
Status s;
|
||||
meta->fd.file_size = 0;
|
||||
iter->SeekToFirst();
|
||||
range_del_iter->SeekToFirst();
|
||||
std::unique_ptr<RangeDelAggregator> range_del_agg(
|
||||
new RangeDelAggregator(internal_comparator, snapshots));
|
||||
s = range_del_agg->AddTombstones(std::move(range_del_iter));
|
||||
|
@ -261,8 +261,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
|
||||
int total = 0;
|
||||
Arena arena;
|
||||
{
|
||||
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
|
||||
{} /* snapshots */);
|
||||
InternalKeyComparator icmp(options.comparator);
|
||||
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
|
||||
ScopedArenaIterator iter(
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
|
||||
iter->SeekToFirst();
|
||||
@ -351,8 +351,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
|
||||
// level Lmax because this record is at the tip
|
||||
count = 0;
|
||||
{
|
||||
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
|
||||
{} /* snapshots */);
|
||||
InternalKeyComparator icmp(options.comparator);
|
||||
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
|
||||
ScopedArenaIterator iter(
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
|
||||
iter->SeekToFirst();
|
||||
@ -570,8 +570,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
|
||||
int count = 0;
|
||||
int total = 0;
|
||||
Arena arena;
|
||||
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
|
||||
{} /* snapshots */);
|
||||
InternalKeyComparator icmp(options.comparator);
|
||||
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
|
||||
ScopedArenaIterator iter(
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg));
|
||||
iter->SeekToFirst();
|
||||
|
@ -123,7 +123,7 @@ class DBIter: public Iterator {
|
||||
prefix_same_as_start_(prefix_same_as_start),
|
||||
pin_thru_lifetime_(pin_data),
|
||||
total_order_seek_(total_order_seek),
|
||||
range_del_agg_(InternalKeyComparator(cmp), s) {
|
||||
range_del_agg_(ioptions.internal_comparator, s) {
|
||||
RecordTick(statistics_, NO_ITERATORS);
|
||||
prefix_extractor_ = ioptions.prefix_extractor;
|
||||
max_skip_ = max_sequential_skip_in_iterations;
|
||||
|
@ -590,8 +590,8 @@ std::string DBTestBase::Contents(int cf) {
|
||||
std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
|
||||
Arena arena;
|
||||
auto options = CurrentOptions();
|
||||
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
|
||||
{} /* snapshots */);
|
||||
InternalKeyComparator icmp(options.comparator);
|
||||
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
|
||||
ScopedArenaIterator iter;
|
||||
if (cf == 0) {
|
||||
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg));
|
||||
@ -999,8 +999,8 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
|
||||
ScopedArenaIterator iter;
|
||||
Arena arena;
|
||||
auto options = CurrentOptions();
|
||||
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
|
||||
{} /* snapshots */);
|
||||
InternalKeyComparator icmp(options.comparator);
|
||||
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
|
||||
if (cf != 0) {
|
||||
iter.set(
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[cf]));
|
||||
|
@ -256,12 +256,14 @@ Status FlushJob::WriteLevel0Table() {
|
||||
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
|
||||
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
|
||||
memtables.push_back(m->NewIterator(ro, &arena));
|
||||
range_del_iters.push_back(m->NewRangeTombstoneIterator(ro));
|
||||
auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
|
||||
if (range_del_iter != nullptr) {
|
||||
range_del_iters.push_back(range_del_iter);
|
||||
}
|
||||
total_num_entries += m->num_entries();
|
||||
total_num_deletes += m->num_deletes();
|
||||
total_memory_usage += m->ApproximateMemoryUsage();
|
||||
}
|
||||
assert(memtables.size() == range_del_iters.size());
|
||||
|
||||
event_logger_->Log() << "job" << job_context_->job_id << "event"
|
||||
<< "flush_started"
|
||||
|
@ -75,6 +75,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
|
||||
range_del_table_(SkipListFactory().CreateMemTableRep(
|
||||
comparator_, &allocator_, nullptr /* transform */,
|
||||
ioptions.info_log)),
|
||||
is_range_del_table_empty_(true),
|
||||
data_size_(0),
|
||||
num_entries_(0),
|
||||
num_deletes_(0),
|
||||
@ -375,8 +376,8 @@ InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
|
||||
|
||||
InternalIterator* MemTable::NewRangeTombstoneIterator(
|
||||
const ReadOptions& read_options) {
|
||||
if (read_options.ignore_range_deletions) {
|
||||
return NewEmptyInternalIterator();
|
||||
if (read_options.ignore_range_deletions || is_range_del_table_empty_) {
|
||||
return nullptr;
|
||||
}
|
||||
return new MemTableIterator(*this, read_options, nullptr /* arena */,
|
||||
true /* use_range_del_table */);
|
||||
@ -508,6 +509,9 @@ void MemTable::Add(SequenceNumber s, ValueType type,
|
||||
!first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
|
||||
}
|
||||
}
|
||||
if (is_range_del_table_empty_ && type == kTypeRangeDeletion) {
|
||||
is_range_del_table_empty_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Callback from MemTable::Get()
|
||||
|
@ -352,6 +352,7 @@ class MemTable {
|
||||
MemTableAllocator allocator_;
|
||||
unique_ptr<MemTableRep> table_;
|
||||
unique_ptr<MemTableRep> range_del_table_;
|
||||
bool is_range_del_table_empty_;
|
||||
|
||||
// Total data size of all data inserted
|
||||
std::atomic<uint64_t> data_size_;
|
||||
|
@ -105,7 +105,7 @@ class RangeDelAggregator {
|
||||
|
||||
SequenceNumber upper_bound_;
|
||||
std::unique_ptr<Rep> rep_;
|
||||
const InternalKeyComparator icmp_;
|
||||
const InternalKeyComparator& icmp_;
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -178,7 +178,9 @@ InternalIterator* TableCache::NewIterator(
|
||||
if (range_del_agg != nullptr && !options.ignore_range_deletions) {
|
||||
std::unique_ptr<InternalIterator> range_del_iter(NewRangeDeletionIterator(
|
||||
options, icomparator, fd, file_read_hist, skip_filters, level));
|
||||
if (range_del_iter != nullptr) {
|
||||
s = range_del_iter->status();
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = range_del_agg->AddTombstones(std::move(range_del_iter));
|
||||
}
|
||||
@ -253,7 +255,7 @@ InternalIterator* TableCache::NewRangeDeletionIterator(
|
||||
const FileDescriptor& fd, HistogramImpl* file_read_hist, bool skip_filters,
|
||||
int level) {
|
||||
if (options.ignore_range_deletions) {
|
||||
return NewEmptyInternalIterator();
|
||||
return nullptr;
|
||||
}
|
||||
Status s;
|
||||
TableReader* table_reader = fd.table_reader;
|
||||
@ -270,8 +272,12 @@ InternalIterator* TableCache::NewRangeDeletionIterator(
|
||||
if (s.ok()) {
|
||||
auto* result = table_reader->NewRangeTombstoneIterator(options);
|
||||
if (cache_handle != nullptr) {
|
||||
if (result == nullptr) {
|
||||
ReleaseHandle(cache_handle);
|
||||
} else {
|
||||
result->RegisterCleanup(&UnrefEntry, cache_, cache_handle);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return NewErrorInternalIterator(s);
|
||||
@ -287,7 +293,9 @@ Status TableCache::Get(const ReadOptions& options,
|
||||
!options.ignore_range_deletions) {
|
||||
std::unique_ptr<InternalIterator> range_del_iter(NewRangeDeletionIterator(
|
||||
options, internal_comparator, fd, file_read_hist, skip_filters, level));
|
||||
if (range_del_iter != nullptr) {
|
||||
s = range_del_iter->status();
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = get_context->range_del_agg()->AddTombstones(
|
||||
std::move(range_del_iter));
|
||||
|
@ -55,6 +55,9 @@ static std::string PrintContents(WriteBatch* b) {
|
||||
iter = mem->NewRangeTombstoneIterator(ReadOptions());
|
||||
iter_guard.reset(iter);
|
||||
}
|
||||
if (iter == nullptr) {
|
||||
continue;
|
||||
}
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ParsedInternalKey ikey;
|
||||
memset((void*)&ikey, 0, sizeof(ikey));
|
||||
|
@ -1489,7 +1489,7 @@ InternalIterator* BlockBasedTable::NewIterator(const ReadOptions& read_options,
|
||||
InternalIterator* BlockBasedTable::NewRangeTombstoneIterator(
|
||||
const ReadOptions& read_options) {
|
||||
if (rep_->range_del_handle.IsNull()) {
|
||||
return NewEmptyInternalIterator();
|
||||
return nullptr;
|
||||
}
|
||||
std::string str;
|
||||
rep_->range_del_handle.EncodeTo(&str);
|
||||
@ -1967,6 +1967,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
||||
}
|
||||
// Output range deletions block
|
||||
auto* range_del_iter = NewRangeTombstoneIterator(ReadOptions());
|
||||
if (range_del_iter != nullptr) {
|
||||
range_del_iter->SeekToFirst();
|
||||
if (range_del_iter->Valid()) {
|
||||
out_file->Append(
|
||||
@ -1979,6 +1980,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
||||
out_file->Append("\n");
|
||||
}
|
||||
delete range_del_iter;
|
||||
}
|
||||
// Output Data blocks
|
||||
s = DumpDataBlocks(out_file);
|
||||
|
||||
|
@ -44,7 +44,7 @@ class TableReader {
|
||||
|
||||
virtual InternalIterator* NewRangeTombstoneIterator(
|
||||
const ReadOptions& read_options) {
|
||||
return NewEmptyInternalIterator();
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Given a key, return an approximate byte offset in the file where
|
||||
|
@ -2459,6 +2459,9 @@ TEST_F(MemTableTest, Simple) {
|
||||
iter = memtable->NewRangeTombstoneIterator(ReadOptions());
|
||||
iter_guard.reset(iter);
|
||||
}
|
||||
if (iter == nullptr) {
|
||||
continue;
|
||||
}
|
||||
iter->SeekToFirst();
|
||||
while (iter->Valid()) {
|
||||
fprintf(stderr, "key: '%s' -> '%s'\n", iter->key().ToString().c_str(),
|
||||
|
@ -1187,8 +1187,8 @@ void InternalDumpCommand::DoCommand() {
|
||||
uint64_t s1=0,s2=0;
|
||||
// Setup internal key iterator
|
||||
Arena arena;
|
||||
RangeDelAggregator range_del_agg(InternalKeyComparator(options_.comparator),
|
||||
{} /* snapshots */);
|
||||
auto icmp = InternalKeyComparator(options_.comparator);
|
||||
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
|
||||
ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg));
|
||||
Status st = iter->status();
|
||||
if (!st.ok()) {
|
||||
|
@ -31,6 +31,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
|
||||
compaction_options_fifo(cf_options.compaction_options_fifo),
|
||||
prefix_extractor(cf_options.prefix_extractor.get()),
|
||||
user_comparator(cf_options.comparator),
|
||||
internal_comparator(InternalKeyComparator(cf_options.comparator)),
|
||||
merge_operator(cf_options.merge_operator.get()),
|
||||
compaction_filter(cf_options.compaction_filter),
|
||||
compaction_filter_factory(cf_options.compaction_filter_factory.get()),
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "db/dbformat.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "util/compression.h"
|
||||
#include "util/db_options.h"
|
||||
@ -35,6 +36,7 @@ struct ImmutableCFOptions {
|
||||
const SliceTransform* prefix_extractor;
|
||||
|
||||
const Comparator* user_comparator;
|
||||
InternalKeyComparator internal_comparator;
|
||||
|
||||
MergeOperator* merge_operator;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user