This commit is contained in:
Vijay Nadimpalli 2019-07-16 14:13:50 -07:00
parent 3a6e83b56b
commit 2c75f185dd
24 changed files with 567 additions and 60 deletions

View File

@ -16,6 +16,9 @@
#if !defined(ROCKSDB_LITE)
#include "test_util/sync_point.h"
#endif
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
namespace rocksdb {
@ -1336,6 +1339,106 @@ TEST_F(DBBasicTest, GetAllKeyVersions) {
}
#endif // !ROCKSDB_LITE
TEST_F(DBBasicTest, GetMergeOperands) {
class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
public:
LimitedStringAppendMergeOp(int limit, char delim)
: StringAppendTESTOperator(delim), limit_(limit) {}
const char* Name() const override {
return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
}
bool ShouldMerge(const std::vector<Slice>& operands) const override {
if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
return true;
}
return false;
}
private:
size_t limit_ = 0;
};
std::vector<int> rest;
int* a = new int();
*a = 5;
rest.push_back(*a);
Options options;
options.create_if_missing = true;
// Use only the latest two merge operands.
options.merge_operator =
std::make_shared<LimitedStringAppendMergeOp>(2, ',');
options.env = env_;
Reopen(options);
// All K1 values are in memtable.
ASSERT_OK(Merge("k1", "a"));
Put("k1", "asd");
ASSERT_OK(Merge("k1", "b"));
ASSERT_OK(Merge("k1", "c"));
ASSERT_OK(Merge("k1", "d"));
std::vector<PinnableSlice> values;
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", &values);
for(PinnableSlice& value: values) {
std::cout << *value.GetSelf() << "\n";
}
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
// Make sure that only the latest two merge operands are used. If this was
// not the case the value would be "a,b,c,d".
ASSERT_EQ(value, "c,d");
// All K2 values are flushed to L0 into a single file.
ASSERT_OK(Merge("k2", "a"));
ASSERT_OK(Merge("k2", "b"));
ASSERT_OK(Merge("k2", "c"));
ASSERT_OK(Merge("k2", "d"));
ASSERT_OK(Flush());
std::vector<PinnableSlice> values2(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", &values2);
for(PinnableSlice& psl: values2) {
std::cout << *psl.GetSelf() << "\n";
}
ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
ASSERT_EQ(value, "c,d");
// All K3 values are flushed and are in different files.
ASSERT_OK(Merge("k3", "ab"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "bc"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "cd"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "de"));
std::vector<PinnableSlice> values3(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", &values3);
for(PinnableSlice& psl: values3) {
std::cout << *psl.GetSelf() << "\n";
}
ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
ASSERT_EQ(value, "cd,de");
// All K4 values are in different levels
ASSERT_OK(Merge("k4", "ab"));
ASSERT_OK(Flush());
MoveFilesToLevel(4);
ASSERT_OK(Merge("k4", "bc"));
ASSERT_OK(Flush());
MoveFilesToLevel(3);
ASSERT_OK(Merge("k4", "cd"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_OK(Merge("k4", "de"));
ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
ASSERT_EQ(value, "cd,de");
std::vector<PinnableSlice> values4(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", &values4);
for(PinnableSlice& psl: values4) {
std::cout << *psl.GetSelf() << "\n";
}
}
class DBBasicTestWithParallelIO
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool,bool,bool,bool>> {
@ -1624,6 +1727,7 @@ INSTANTIATE_TEST_CASE_P(
std::make_tuple(true, true, true, false),
std::make_tuple(false, true, false, false)));
class DBBasicTestWithTimestampWithParam
: public DBTestBase,
public testing::WithParamInterface<bool> {

View File

@ -4454,6 +4454,8 @@ TEST_F(DBCompactionTest, PartialManualCompaction) {
uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
ASSERT_OK(dbfull()->SetOptions(
{{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
ASSERT_OK(dbfull()->SetOptions(
{{"compaction_readahead_size", std::to_string(2097152)}}));
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;

View File

@ -1564,6 +1564,99 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
return s;
}
Status DBImpl::GetMergeOperands(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* pinnable_val) {
assert(pinnable_val != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Get(column_family, key);
}
}
// Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd);
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
} else {
// Note that the snapshot is assigned AFTER referencing the super
// version because otherwise a flush happening in between may compact away
// data for the snapshot, so the reader would see neither data that was be
// visible to the snapshot before compaction nor the newer data inserted
// afterwards.
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
}
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0;
Status s;
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
PERF_TIMER_STOP(get_snapshot_time);
bool skip_memtable = (read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
if (!skip_memtable) {
if (sv->mem->GetMergeOperands(lkey, pinnable_val, &s, &merge_context,
&max_covering_tombstone_seq, read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->GetMergeOperands(lkey, pinnable_val, &s, &merge_context,
&max_covering_tombstone_seq, read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
}
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->GetMergeOperands(read_options, lkey, pinnable_val, &s, &merge_context,
&max_covering_tombstone_seq, nullptr, nullptr, nullptr,
nullptr, nullptr);
RecordTick(stats_, MEMTABLE_MISS);
}
{
PERF_TIMER_GUARD(get_post_process_time);
ReturnAndCleanupSuperVersion(cfd, sv);
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (s.ok()) {
size = pinnable_val->size();
RecordTick(stats_, BYTES_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
RecordInHistogram(stats_, BYTES_PER_READ, size);
}
return s;
}
std::vector<Status> DBImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,

View File

@ -159,6 +159,11 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using DB::GetMergeOperands;
virtual Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* value) override;
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,

View File

@ -42,8 +42,8 @@ class MockMemTableRep : public MemTableRep {
bool Contains(const char* key) const override { return rep_->Contains(key); }
void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) override {
rep_->Get(k, callback_args, callback_func);
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override {
rep_->Get(k, callback_args, callback_func, do_merge);
}
size_t ApproximateMemoryUsage() override {

View File

@ -86,9 +86,15 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
Reopen(options);
// All K1 values are in memtable.
ASSERT_OK(Merge("k1", "a"));
Put("k1", "asd");
ASSERT_OK(Merge("k1", "b"));
ASSERT_OK(Merge("k1", "c"));
ASSERT_OK(Merge("k1", "d"));
std::vector<PinnableSlice> values;
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", &values);
for(PinnableSlice& value: values) {
std::cout << *value.GetSelf() << "\n";
}
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
// Make sure that only the latest two merge operands are used. If this was
@ -101,6 +107,11 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
ASSERT_OK(Merge("k2", "c"));
ASSERT_OK(Merge("k2", "d"));
ASSERT_OK(Flush());
std::vector<PinnableSlice> values2(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", &values2);
for(PinnableSlice& psl: values2) {
std::cout << *psl.GetSelf() << "\n";
}
ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
ASSERT_EQ(value, "c,d");
@ -112,6 +123,12 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
ASSERT_OK(Merge("k3", "cd"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "de"));
std::vector<PinnableSlice> values3(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", &values3);
for(PinnableSlice& psl: values3) {
std::cout << *psl.GetSelf() << "\n";
}
ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
ASSERT_EQ(value, "cd,de");
@ -128,6 +145,13 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
ASSERT_OK(Merge("k4", "de"));
ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
ASSERT_EQ(value, "cd,de");
std::vector<PinnableSlice> values4(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", &values4);
for(PinnableSlice& psl: values4) {
std::cout << *psl.GetSelf() << "\n";
}
}
TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {

View File

@ -2466,6 +2466,18 @@ class ModelDB : public DB {
return Status::NotSupported(key);
}
using DB::GetMergeOperands;
virtual Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* value) override {
(void)options;
(void) column_family;
(void) key;
(void) value;
return Status::NotSupported(key);
}
using DB::MultiGet;
std::vector<Status> MultiGet(
const ReadOptions& /*options*/,

View File

@ -155,8 +155,8 @@ class SpecialMemTableRep : public MemTableRep {
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg,
const char* entry)) override {
memtable_->Get(k, callback_args, callback_func);
const char* entry, bool do_merge), bool do_merge) override {
memtable_->Get(k, callback_args, callback_func, do_merge);
}
uint64_t ApproximateNumEntries(const Slice& start_ikey,

View File

@ -614,7 +614,7 @@ struct Saver {
};
} // namespace
static bool SaveValue(void* arg, const char* entry) {
static bool SaveValue(void* arg, const char* entry, bool do_merge) {
Saver* s = reinterpret_cast<Saver*>(arg);
assert(s != nullptr);
MergeContext* merge_context = s->merge_context;
@ -627,7 +627,7 @@ static bool SaveValue(void* arg, const char* entry) {
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// vlength varint32f
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
@ -677,12 +677,18 @@ static bool SaveValue(void* arg, const char* entry) {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
if (!do_merge) {
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
}
else {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
}
} else if (s->value != nullptr) {
s->value->assign(v.data(), v.size());
}
@ -726,7 +732,7 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->merge_in_progress) = true;
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
if (merge_operator->ShouldMerge(merge_context->GetOperandsDirectionBackward())) {
if (do_merge && merge_operator->ShouldMerge(merge_context->GetOperandsDirectionBackward())) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger, s->statistics,
@ -810,7 +816,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.env_ = env_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
table_->Get(key, &saver, SaveValue);
table_->Get(key, &saver, SaveValue, true);
*seq = saver.seq;
}
@ -823,6 +829,87 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
return found_final_value;
}
bool MemTable::GetMergeOperands(const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts,
ReadCallback* callback,
bool* is_blob_index) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
return false;
}
PERF_TIMER_GUARD(get_from_memtable_time);
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
NewRangeTombstoneIterator(read_opts,
GetInternalKeySeqno(key.internal_key())));
if (range_del_iter != nullptr) {
*max_covering_tombstone_seq =
std::max(*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
}
Slice user_key = key.user_key();
bool found_final_value = false;
bool merge_in_progress = s->IsMergeInProgress();
bool may_contain = true;
size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
if (bloom_filter_) {
// when both memtable_whole_key_filtering and prefix_extractor_ are set,
// only do whole key filtering for Get() to save CPU
if (moptions_.memtable_whole_key_filtering) {
may_contain =
bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
} else {
assert(prefix_extractor_);
may_contain =
!prefix_extractor_->InDomain(user_key) ||
bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
}
}
if (bloom_filter_ && !may_contain) {
// iter is null if prefix bloom says the key does not exist
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
} else {
if (bloom_filter_) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
Saver saver;
saver.status = s;
saver.found_final_value = &found_final_value;
saver.merge_in_progress = &merge_in_progress;
saver.key = &key;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.max_covering_tombstone_seq = *max_covering_tombstone_seq;
saver.merge_operator = moptions_.merge_operator;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = moptions_.statistics;
saver.env_ = env_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
table_->Get(key, &saver, SaveValue, false);
PinnableSlice* psliceptr = pinnable_val->data();
for (Slice slice : saver.merge_context->GetOperands()) {
psliceptr->PinSelf(slice);
psliceptr++;
}
}
// No change to value, since we have not yet found a Put/Delete
if (!found_final_value && merge_in_progress) {
*s = Status::MergeInProgress();
}
PERF_COUNTER_ADD(get_from_memtable_count, 1);
return found_final_value;
}
void MemTable::Update(SequenceNumber seq,
const Slice& key,
const Slice& value) {
@ -996,10 +1083,10 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
}
void MemTableRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) {
auto iter = GetDynamicPrefixIterator();
for (iter->Seek(k.internal_key(), k.memtable_key().data());
iter->Valid() && callback_func(callback_args, iter->key());
iter->Valid() && callback_func(callback_args, iter->key(), do_merge);
iter->Next()) {
}
}

View File

@ -204,6 +204,14 @@ class MemTable {
read_opts, callback, is_blob_index);
}
bool GetMergeOperands(const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts,
ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
// Attempts to update the new_value inplace, else does normal Add
// Pseudocode
// if key exists in current memtable && prev_value is of type kTypeValue

View File

@ -109,6 +109,23 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
is_blob_index);
}
bool MemTableListVersion::GetMergeOperands(const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts,
ReadCallback* callback,
bool* is_blob_index) {
for (MemTable* memtable : memlist_) {
bool done = memtable->GetMergeOperands(key, pinnable_val, s, merge_context,
max_covering_tombstone_seq, read_opts, callback, is_blob_index);
if (done) {
return true;
}
}
return false;
}
bool MemTableListVersion::GetFromHistory(
const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,

View File

@ -71,6 +71,14 @@ class MemTableListVersion {
read_opts, callback, is_blob_index);
}
bool GetMergeOperands(const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts,
ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
// Similar to Get(), but searches the Memtable history of memtables that
// have already been flushed. Should only be used from in-memory only
// queries (such as Transaction validation) as the history may contain

View File

@ -1783,6 +1783,120 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
}
}
void Version::GetMergeOperands(const ReadOptions& read_options, const LookupKey& k,
std::vector<PinnableSlice>* pinnable_val, Status* status,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
bool* is_blob) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
assert(status->ok() || status->IsMergeInProgress());
if (key_exists != nullptr) {
// will falsify below if not found
*key_exists = true;
}
PinnedIteratorsManager pinned_iters_mgr;
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
nullptr, value_found, merge_context, max_covering_tombstone_seq, this->env_,
seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, false);
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
pinned_iters_mgr.StartPinning();
}
FilePicker fp(
storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
if (*max_covering_tombstone_seq > 0) {
// The remaining files we look at will only contain covered keys, so we
// stop here.
break;
}
if (get_context.sample()) {
sample_file_read_inc(f->file_metadata);
}
bool timer_enabled =
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(env_, timer_enabled /* auto_start */);
*status = table_cache_->Get(
read_options, *internal_comparator(), *f->file_metadata, ikey,
&get_context, mutable_cf_options_.prefix_extractor.get(),
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetCurrentLevel());
// TODO: examine the behavior for corrupted key
if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
fp.GetCurrentLevel());
}
if (!status->ok()) {
return;
}
// report the counters before returning
if (get_context.State() != GetContext::kNotFound &&
get_context.State() != GetContext::kMerge &&
db_statistics_ != nullptr) {
get_context.ReportCounters();
}
switch (get_context.State()) {
case GetContext::kNotFound:
// Keep searching in other files
break;
case GetContext::kMerge:
// TODO: update per-level perfcontext user_key_return_count for kMerge
break;
case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
RecordTick(db_statistics_, GET_HIT_L1);
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel());
break;
case GetContext::kDeleted:
// Use empty error message for speed
*status = Status::NotFound();
break;
case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
return;
case GetContext::kBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
return;
}
f = fp.GetNextFile();
}
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
PinnableSlice* pin_slice = pinnable_val->data();
for (Slice slice : merge_context->GetOperands()){
pin_slice->PinSelf(slice);
pin_slice++;
}
}
void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob) {
PinnedIteratorsManager pinned_iters_mgr;

View File

@ -63,7 +63,6 @@ class VersionSet;
class WriteBufferManager;
class MergeContext;
class ColumnFamilySet;
class TableCache;
class MergeIteratorBuilder;
// Return the smallest index i such that file_level.files[i]->largest >= key.
@ -584,6 +583,14 @@ class Version {
SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr,
bool* is_blob = nullptr);
void GetMergeOperands(const ReadOptions&, const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
Status* status, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
bool* value_found = nullptr, bool* key_exists = nullptr,
SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr,
bool* is_blob = nullptr);
void MultiGet(const ReadOptions&, MultiGetRange* range,
ReadCallback* callback = nullptr, bool* is_blob = nullptr);

View File

@ -403,6 +403,10 @@ class DB {
return Get(options, DefaultColumnFamily(), key, value);
}
virtual Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* value) = 0;
// If keys[i] does not exist in the database, then the i'th returned
// status will be one for which Status::IsNotFound() is true, and
// (*values)[i] will be set to some arbitrary value (often ""). Otherwise,

View File

@ -165,7 +165,7 @@ class MemTableRep {
// Get() function with a default value of dynamically construct an iterator,
// seek and call the call back function.
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry));
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge);
virtual uint64_t ApproximateNumEntries(const Slice& /*start_ikey*/,
const Slice& /*end_key*/) {

View File

@ -88,6 +88,13 @@ class StackableDB : public DB {
return db_->Get(options, column_family, key, value);
}
using DB::GetMergeOperands;
virtual Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* value) override {
return db_->GetMergeOperands(options, column_family, key, value);
}
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,

View File

@ -177,7 +177,7 @@ class HashLinkListRep : public MemTableRep {
size_t ApproximateMemoryUsage() override;
void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) override;
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override;
~HashLinkListRep() override;
@ -714,7 +714,7 @@ size_t HashLinkListRep::ApproximateMemoryUsage() {
}
void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) {
auto transformed = transform_->Transform(k.user_key());
auto bucket = GetBucket(transformed);
@ -723,7 +723,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
// Is a skip list
MemtableSkipList::Iterator iter(&skip_list_header->skip_list);
for (iter.Seek(k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key());
iter.Valid() && callback_func(callback_args, iter.key(), do_merge);
iter.Next()) {
}
} else {
@ -731,7 +731,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
if (link_list_head != nullptr) {
LinkListIterator iter(this, link_list_head);
for (iter.Seek(k.internal_key(), nullptr);
iter.Valid() && callback_func(callback_args, iter.key());
iter.Valid() && callback_func(callback_args, iter.key(), do_merge);
iter.Next()) {
}
}

View File

@ -35,7 +35,7 @@ class HashSkipListRep : public MemTableRep {
size_t ApproximateMemoryUsage() override;
void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) override;
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override;
~HashSkipListRep() override;
@ -287,13 +287,13 @@ size_t HashSkipListRep::ApproximateMemoryUsage() {
}
void HashSkipListRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) {
auto transformed = transform_->Transform(k.user_key());
auto bucket = GetBucket(transformed);
if (bucket != nullptr) {
Bucket::Iterator iter(bucket);
for (iter.Seek(k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key());
iter.Valid() && callback_func(callback_args, iter.key(), do_merge);
iter.Next()) {
}
}

View File

@ -293,7 +293,7 @@ class ReadBenchmarkThread : public BenchmarkThread {
: BenchmarkThread(table, key_gen, bytes_written, bytes_read, sequence,
num_ops, read_hits) {}
static bool callback(void* arg, const char* entry) {
static bool callback(void* arg, const char* entry, bool /*do_merge*/) {
CallbackVerifyArgs* callback_args = static_cast<CallbackVerifyArgs*>(arg);
assert(callback_args != nullptr);
uint32_t key_length;
@ -318,7 +318,7 @@ class ReadBenchmarkThread : public BenchmarkThread {
verify_args.key = &lookup_key;
verify_args.table = table_;
verify_args.comparator = &internal_key_comp;
table_->Get(lookup_key, &verify_args, callback);
table_->Get(lookup_key, &verify_args, callback, true);
if (verify_args.found) {
*bytes_read_ += VarintLength(16) + 16 + FLAGS_item_size;
++*read_hits_;

View File

@ -69,11 +69,11 @@ public:
}
void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) override {
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override {
SkipListRep::Iterator iter(&skip_list_);
Slice dummy_slice;
for (iter.Seek(dummy_slice, k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) {
iter.Valid() && callback_func(callback_args, iter.key(), do_merge); iter.Next()) {
}
}

View File

@ -41,7 +41,7 @@ class VectorRep : public MemTableRep {
size_t ApproximateMemoryUsage() override;
void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) override;
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override;
~VectorRep() override {}
@ -248,7 +248,7 @@ void VectorRep::Iterator::SeekToLast() {
}
void VectorRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) {
rwlock_.ReadLock();
VectorRep* vector_rep;
std::shared_ptr<Bucket> bucket;
@ -262,7 +262,7 @@ void VectorRep::Get(const LookupKey& k, void* callback_args,
rwlock_.ReadUnlock();
for (iter.Seek(k.user_key(), k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) {
iter.Valid() && callback_func(callback_args, iter.key(), do_merge); iter.Next()) {
}
}

View File

@ -44,7 +44,7 @@ GetContext::GetContext(
PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
SequenceNumber* _max_covering_tombstone_seq, Env* env, SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
bool* is_blob_index, uint64_t tracing_get_id)
bool* is_blob_index, uint64_t tracing_get_id, bool do_merge)
: ucmp_(ucmp),
merge_operator_(merge_operator),
logger_(logger),
@ -61,7 +61,8 @@ GetContext::GetContext(
pinned_iters_mgr_(_pinned_iters_mgr),
callback_(callback),
is_blob_index_(is_blob_index),
tracing_get_id_(tracing_get_id) {
tracing_get_id_(tracing_get_id),
do_merge_(do_merge){
if (seq_) {
*seq_ = kMaxSequenceNumber;
}
@ -230,14 +231,23 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
assert(merge_operator_ != nullptr);
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, &value,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, env_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, &value,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, env_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
} else {
assert (do_merge_ == false);
if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
value_pinner != nullptr) {
value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
merge_context_->PushOperand(value, true /*value_pinned*/);
} else {
merge_context_->PushOperand(value, false);
}
}
}
if (is_blob_index_ != nullptr) {
@ -256,14 +266,16 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
} else if (kMerge == state_) {
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, env_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
if (do_merge_) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, env_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
}
}
}
return false;
@ -279,18 +291,20 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
} else {
merge_context_->PushOperand(value, false);
}
if (merge_operator_ != nullptr &&
if (do_merge_ && merge_operator_ != nullptr &&
merge_operator_->ShouldMerge(merge_context_->GetOperandsDirectionBackward())) {
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, env_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
if (do_merge_) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, env_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
}
}
return false;
}

View File

@ -86,7 +86,7 @@ class GetContext {
SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
uint64_t tracing_get_id = 0);
uint64_t tracing_get_id = 0, bool do_merge = true);
GetContext() = default;
@ -164,6 +164,7 @@ class GetContext {
// Used for block cache tracing only. A tracing get id uniquely identifies a
// Get or a MultiGet.
const uint64_t tracing_get_id_;
bool do_merge_;
};
// Call this to replay a log and bring the get_context up to date. The replay