Using PinnableSlice*

This commit is contained in:
Vijay Nadimpalli 2019-07-18 11:18:46 -07:00
parent 2c75f185dd
commit 15a8476291
13 changed files with 267 additions and 195 deletions

View File

@ -1359,10 +1359,7 @@ TEST_F(DBBasicTest, GetMergeOperands) {
private:
size_t limit_ = 0;
};
std::vector<int> rest;
int* a = new int();
*a = 5;
rest.push_back(*a);
Options options;
options.create_if_missing = true;
// Use only the latest two merge operands.
@ -1370,22 +1367,26 @@ TEST_F(DBBasicTest, GetMergeOperands) {
std::make_shared<LimitedStringAppendMergeOp>(2, ',');
options.env = env_;
Reopen(options);
int size = 4;
// All K1 values are in memtable.
ASSERT_OK(Merge("k1", "a"));
Put("k1", "asd");
Put("k1", "x");
ASSERT_OK(Merge("k1", "b"));
ASSERT_OK(Merge("k1", "c"));
ASSERT_OK(Merge("k1", "d"));
std::vector<PinnableSlice> values;
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", &values);
std::vector<PinnableSlice> values(size);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), size);
ASSERT_EQ(values[0], "x");
ASSERT_EQ(values[1], "b");
ASSERT_EQ(values[2], "c");
ASSERT_EQ(values[3], "d");
for(PinnableSlice& value: values) {
std::cout << *value.GetSelf() << "\n";
}
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
// Make sure that only the latest two merge operands are used. If this was
// not the case the value would be "a,b,c,d".
ASSERT_EQ(value, "c,d");
// Size is less than number of merge operands so status should be Aborted.
Status status = db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), size-1);
ASSERT_EQ(status.IsAborted(), true);
// All K2 values are flushed to L0 into a single file.
ASSERT_OK(Merge("k2", "a"));
@ -1393,13 +1394,14 @@ TEST_F(DBBasicTest, GetMergeOperands) {
ASSERT_OK(Merge("k2", "c"));
ASSERT_OK(Merge("k2", "d"));
ASSERT_OK(Flush());
std::vector<PinnableSlice> values2(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", &values2);
for(PinnableSlice& psl: values2) {
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", values.data(), size);
ASSERT_EQ(values[0], "a");
ASSERT_EQ(values[1], "b");
ASSERT_EQ(values[2], "c");
ASSERT_EQ(values[3], "d");
for(PinnableSlice& psl: values) {
std::cout << *psl.GetSelf() << "\n";
}
ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
ASSERT_EQ(value, "c,d");
// All K3 values are flushed and are in different files.
ASSERT_OK(Merge("k3", "ab"));
@ -1409,15 +1411,15 @@ TEST_F(DBBasicTest, GetMergeOperands) {
ASSERT_OK(Merge("k3", "cd"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "de"));
std::vector<PinnableSlice> values3(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", &values3);
for(PinnableSlice& psl: values3) {
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", values.data(), size);
ASSERT_EQ(values[0], "ab");
ASSERT_EQ(values[1], "bc");
ASSERT_EQ(values[2], "cd");
ASSERT_EQ(values[3], "de");
for(PinnableSlice& psl: values) {
std::cout << *psl.GetSelf() << "\n";
}
ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
ASSERT_EQ(value, "cd,de");
// All K4 values are in different levels
ASSERT_OK(Merge("k4", "ab"));
ASSERT_OK(Flush());
@ -1429,14 +1431,32 @@ TEST_F(DBBasicTest, GetMergeOperands) {
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_OK(Merge("k4", "de"));
ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
ASSERT_EQ(value, "cd,de");
std::vector<PinnableSlice> values4(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", &values4);
for(PinnableSlice& psl: values4) {
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", values.data(), size);
ASSERT_EQ(values[0], "ab");
ASSERT_EQ(values[1], "bc");
ASSERT_EQ(values[2], "cd");
ASSERT_EQ(values[3], "de");
for(PinnableSlice& psl: values) {
std::cout << *psl.GetSelf() << "\n";
}
// ASSERT_OK(Merge("k5", "a"));
// ASSERT_OK(Merge("k5", "b"));
// ASSERT_OK(Merge("k5", "c"));
// ASSERT_OK(Merge("k5", "d"));
// rocksdb::SyncPoint::GetInstance()->LoadDependency(
// {{"DBBasicTest.GetMergeOperands", "FlushJob::Start"}}
// );
// rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// ASSERT_OK(Flush());
// std::vector<PinnableSlice> values5(4);
// db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k5", values5.data(), 4);
// for(PinnableSlice& psl: values5) {
// std::cout << *psl.GetSelf() << "\n";
// }
// TEST_SYNC_POINT("DBBasicTest.GetMergeOperands");
// rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
class DBBasicTestWithParallelIO

View File

@ -1457,6 +1457,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool* value_found,
ReadCallback* callback, bool* is_blob_index) {
// return GetValOrGetMergeOperands(read_options, column_family, key, pinnable_val,
// 0, value_found, callback, is_blob_index, true);
assert(pinnable_val != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_GET);
@ -1564,9 +1566,11 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
return s;
}
Status DBImpl::GetMergeOperands(const ReadOptions& read_options,
Status DBImpl::GetValOrGetMergeOperands(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* pinnable_val) {
PinnableSlice* pinnable_val, int num_records, bool* value_found,
ReadCallback* callback, bool* is_blob_index, bool get_val) {
assert(pinnable_val != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_GET);
@ -1587,10 +1591,18 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options,
// Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd);
TEST_SYNC_POINT("DBImpl::GetImpl:1");
TEST_SYNC_POINT("DBImpl::GetImpl:2");
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
if (callback) {
// Already calculated based on read_options.snapshot
snapshot = callback->max_visible_seq();
} else {
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
}
} else {
// Note that the snapshot is assigned AFTER referencing the super
// version because otherwise a flush happening in between may compact away
@ -1600,7 +1612,12 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options,
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
if (callback) {
callback->Refresh(snapshot);
}
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
TEST_SYNC_POINT("DBImpl::GetImpl:4");
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
@ -1617,12 +1634,32 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options,
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
if (!skip_memtable) {
if (sv->mem->GetMergeOperands(lkey, pinnable_val, &s, &merge_context,
if (get_val) {
if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options, callback,
is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options, callback,
is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
}
} else {
if (sv->mem->GetMergeOperands(lkey, pinnable_val, num_records, &s, &merge_context,
&max_covering_tombstone_seq, read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->GetMergeOperands(lkey, pinnable_val, &s, &merge_context,
sv->imm->GetMergeOperands(lkey, pinnable_val, num_records, &s, &merge_context,
&max_covering_tombstone_seq, read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
@ -1632,11 +1669,19 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options,
return s;
}
}
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->GetMergeOperands(read_options, lkey, pinnable_val, &s, &merge_context,
if (get_val) {
sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
&max_covering_tombstone_seq, value_found, nullptr, nullptr,
callback, is_blob_index);
} else {
sv->current->GetMergeOperands(read_options, lkey, pinnable_val, num_records, &s, &merge_context,
&max_covering_tombstone_seq, nullptr, nullptr, nullptr,
nullptr, nullptr);
}
RecordTick(stats_, MEMTABLE_MISS);
}
@ -1648,13 +1693,31 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options,
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (s.ok()) {
if (get_val) {
size = pinnable_val->size();
} else {
int itr = 0;
while (itr < num_records) {
size += pinnable_val->size();
itr++;
pinnable_val++;
}
}
RecordTick(stats_, BYTES_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
RecordInHistogram(stats_, BYTES_PER_READ, size);
}
return s;
}
Status DBImpl::GetMergeOperands(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* slice, int size) {
return GetValOrGetMergeOperands(read_options, column_family, key, slice, size,
nullptr, nullptr, nullptr, false);
}
std::vector<Status> DBImpl::MultiGet(

View File

@ -162,7 +162,7 @@ class DBImpl : public DB {
using DB::GetMergeOperands;
virtual Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* value) override;
PinnableSlice* slice, int size) override;
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
@ -406,6 +406,11 @@ class DBImpl : public DB {
bool* value_found = nullptr, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
Status GetValOrGetMergeOperands(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, int num_records = 0, bool* value_found = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr, bool get_val = false);
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,

View File

@ -86,15 +86,9 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
Reopen(options);
// All K1 values are in memtable.
ASSERT_OK(Merge("k1", "a"));
Put("k1", "asd");
ASSERT_OK(Merge("k1", "b"));
ASSERT_OK(Merge("k1", "c"));
ASSERT_OK(Merge("k1", "d"));
std::vector<PinnableSlice> values;
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", &values);
for(PinnableSlice& value: values) {
std::cout << *value.GetSelf() << "\n";
}
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
// Make sure that only the latest two merge operands are used. If this was
@ -107,11 +101,6 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
ASSERT_OK(Merge("k2", "c"));
ASSERT_OK(Merge("k2", "d"));
ASSERT_OK(Flush());
std::vector<PinnableSlice> values2(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", &values2);
for(PinnableSlice& psl: values2) {
std::cout << *psl.GetSelf() << "\n";
}
ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
ASSERT_EQ(value, "c,d");
@ -123,12 +112,6 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
ASSERT_OK(Merge("k3", "cd"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "de"));
std::vector<PinnableSlice> values3(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", &values3);
for(PinnableSlice& psl: values3) {
std::cout << *psl.GetSelf() << "\n";
}
ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
ASSERT_EQ(value, "cd,de");
@ -146,12 +129,6 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
ASSERT_EQ(value, "cd,de");
std::vector<PinnableSlice> values4(4);
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", &values4);
for(PinnableSlice& psl: values4) {
std::cout << *psl.GetSelf() << "\n";
}
}
TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {

View File

@ -2469,12 +2469,13 @@ class ModelDB : public DB {
using DB::GetMergeOperands;
virtual Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* value) override {
PinnableSlice* slice, int size) override {
(void)options;
(void) column_family;
(void) key;
(void) value;
(void) slice;
(void) size;
return Status::NotSupported(key);
}

View File

@ -830,7 +830,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
}
bool MemTable::GetMergeOperands(const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
PinnableSlice* slice, int size,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts,
@ -894,10 +894,13 @@ bool MemTable::GetMergeOperands(const LookupKey& key,
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
table_->Get(key, &saver, SaveValue, false);
PinnableSlice* psliceptr = pinnable_val->data();
for (Slice slice : saver.merge_context->GetOperands()) {
psliceptr->PinSelf(slice);
psliceptr++;
if (saver.merge_context->GetNumOperands() > (unsigned)size) {
*s = Status::Aborted("Number of merge operands: "
+std::to_string(saver.merge_context->GetNumOperands())+" more than size of vector");
}
for (Slice sl : saver.merge_context->GetOperands()) {
slice->PinSelf(sl);
slice++;
}
}

View File

@ -205,7 +205,7 @@ class MemTable {
}
bool GetMergeOperands(const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
PinnableSlice* slice, int size,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts,

View File

@ -110,14 +110,14 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
}
bool MemTableListVersion::GetMergeOperands(const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
PinnableSlice* slice, int size,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts,
ReadCallback* callback,
bool* is_blob_index) {
for (MemTable* memtable : memlist_) {
bool done = memtable->GetMergeOperands(key, pinnable_val, s, merge_context,
bool done = memtable->GetMergeOperands(key, slice, size, s, merge_context,
max_covering_tombstone_seq, read_opts, callback, is_blob_index);
if (done) {
return true;

View File

@ -72,7 +72,7 @@ class MemTableListVersion {
}
bool GetMergeOperands(const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
PinnableSlice* slice, int size,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts,

View File

@ -1784,7 +1784,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
}
void Version::GetMergeOperands(const ReadOptions& read_options, const LookupKey& k,
std::vector<PinnableSlice>* pinnable_val, Status* status,
PinnableSlice* slice, int size, Status* status,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
@ -1804,7 +1804,7 @@ void Version::GetMergeOperands(const ReadOptions& read_options, const LookupKey&
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
nullptr, value_found, merge_context, max_covering_tombstone_seq, this->env_,
seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, false);
seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, 0, false);
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
@ -1890,10 +1890,13 @@ void Version::GetMergeOperands(const ReadOptions& read_options, const LookupKey&
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
PinnableSlice* pin_slice = pinnable_val->data();
for (Slice slice : merge_context->GetOperands()){
pin_slice->PinSelf(slice);
pin_slice++;
if (merge_context->GetNumOperands() > (unsigned)size) {
*status = Status::Aborted("NUmber of merge operands: "
+std::to_string(merge_context->GetNumOperands())+" more than size of vector");
}
for (Slice sl : merge_context->GetOperands()){
slice->PinSelf(sl);
slice++;
}
}

View File

@ -584,7 +584,7 @@ class Version {
bool* is_blob = nullptr);
void GetMergeOperands(const ReadOptions&, const LookupKey& key,
std::vector<PinnableSlice>* pinnable_val,
PinnableSlice* slice, int size,
Status* status, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
bool* value_found = nullptr, bool* key_exists = nullptr,

View File

@ -405,7 +405,7 @@ class DB {
virtual Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* value) = 0;
PinnableSlice* slice, int size) = 0;
// If keys[i] does not exist in the database, then the i'th returned
// status will be one for which Status::IsNotFound() is true, and

View File

@ -91,8 +91,8 @@ class StackableDB : public DB {
using DB::GetMergeOperands;
virtual Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::vector<PinnableSlice>* value) override {
return db_->GetMergeOperands(options, column_family, key, value);
PinnableSlice* slice, int size) override {
return db_->GetMergeOperands(options, column_family, key, slice, size);
}
using DB::MultiGet;