Revert "PinnableSlice"
Summary: This reverts commit 54d94e9c2cc0bf6eeb2a165ada33fa9c174f0b16. The pull request was landed by mistake. Closes https://github.com/facebook/rocksdb/pull/1755 Differential Revision: D4391678 Pulled By: maysamyabandeh fbshipit-source-id: 36d5149
This commit is contained in:
parent
54d94e9c2c
commit
d0ba8ec8f9
@ -42,10 +42,10 @@ size_t CompactedDBImpl::FindFile(const Slice& key) {
|
||||
return right;
|
||||
}
|
||||
|
||||
Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
|
||||
const Slice& key, PinnableSlice* pSlice) {
|
||||
Status CompactedDBImpl::Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle*, const Slice& key, std::string* value) {
|
||||
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, key, pSlice, nullptr, nullptr,
|
||||
GetContext::kNotFound, key, value, nullptr, nullptr,
|
||||
nullptr, nullptr);
|
||||
LookupKey lkey(key, kMaxSequenceNumber);
|
||||
files_.files[FindFile(key)].fd.table_reader->Get(
|
||||
@ -75,14 +75,11 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
|
||||
int idx = 0;
|
||||
for (auto* r : reader_list) {
|
||||
if (r != nullptr) {
|
||||
PinnableSlice pSlice;
|
||||
std::string& value = (*values)[idx];
|
||||
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, keys[idx], &pSlice, nullptr,
|
||||
nullptr, nullptr, nullptr);
|
||||
GetContext::kNotFound, keys[idx], &(*values)[idx],
|
||||
nullptr, nullptr, nullptr, nullptr);
|
||||
LookupKey lkey(keys[idx], kMaxSequenceNumber);
|
||||
r->Get(options, lkey.internal_key(), &get_context);
|
||||
value.assign(pSlice.data(), pSlice.size());
|
||||
if (get_context.State() == GetContext::kFound) {
|
||||
statuses[idx] = Status::OK();
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ class CompactedDBImpl : public DBImpl {
|
||||
using DB::Get;
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* value) override;
|
||||
std::string* value) override;
|
||||
using DB::MultiGet;
|
||||
virtual std::vector<Status> MultiGet(
|
||||
const ReadOptions& options,
|
||||
|
@ -3907,8 +3907,8 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
|
||||
|
||||
Status DBImpl::Get(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pSlice) {
|
||||
return GetImpl(read_options, column_family, key, pSlice);
|
||||
std::string* value) {
|
||||
return GetImpl(read_options, column_family, key, value);
|
||||
}
|
||||
|
||||
// JobContext gets created and destructed outside of the lock --
|
||||
@ -3965,7 +3965,7 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
|
||||
|
||||
Status DBImpl::GetImpl(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pSlice, bool* value_found) {
|
||||
std::string* value, bool* value_found) {
|
||||
StopWatch sw(env_, stats_, DB_GET);
|
||||
PERF_TIMER_GUARD(get_snapshot_time);
|
||||
|
||||
@ -3996,12 +3996,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
|
||||
(read_options.read_tier == kPersistedTier && has_unpersisted_data_);
|
||||
bool done = false;
|
||||
if (!skip_memtable) {
|
||||
if (sv->mem->Get(lkey, pSlice, &s, &merge_context, &range_del_agg,
|
||||
if (sv->mem->Get(lkey, value, &s, &merge_context, &range_del_agg,
|
||||
read_options)) {
|
||||
done = true;
|
||||
RecordTick(stats_, MEMTABLE_HIT);
|
||||
} else if ((s.ok() || s.IsMergeInProgress()) &&
|
||||
sv->imm->Get(lkey, pSlice, &s, &merge_context, &range_del_agg,
|
||||
sv->imm->Get(lkey, value, &s, &merge_context, &range_del_agg,
|
||||
read_options)) {
|
||||
done = true;
|
||||
RecordTick(stats_, MEMTABLE_HIT);
|
||||
@ -4012,7 +4012,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
|
||||
}
|
||||
if (!done) {
|
||||
PERF_TIMER_GUARD(get_from_output_files_time);
|
||||
sv->current->Get(read_options, lkey, pSlice, &s, &merge_context,
|
||||
sv->current->Get(read_options, lkey, value, &s, &merge_context,
|
||||
&range_del_agg, value_found);
|
||||
RecordTick(stats_, MEMTABLE_MISS);
|
||||
}
|
||||
@ -4023,9 +4023,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
|
||||
ReturnAndCleanupSuperVersion(cfd, sv);
|
||||
|
||||
RecordTick(stats_, NUMBER_KEYS_READ);
|
||||
size_t size = pSlice->size();
|
||||
RecordTick(stats_, BYTES_READ, size);
|
||||
MeasureTime(stats_, BYTES_PER_READ, size);
|
||||
RecordTick(stats_, BYTES_READ, value->size());
|
||||
MeasureTime(stats_, BYTES_PER_READ, value->size());
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -4101,30 +4100,26 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
bool skip_memtable =
|
||||
(read_options.read_tier == kPersistedTier && has_unpersisted_data_);
|
||||
bool done = false;
|
||||
PinnableSlice pSlice;
|
||||
if (!skip_memtable) {
|
||||
if (super_version->mem->Get(lkey, &pSlice, &s, &merge_context,
|
||||
if (super_version->mem->Get(lkey, value, &s, &merge_context,
|
||||
&range_del_agg, read_options)) {
|
||||
value->assign(pSlice.data(), pSlice.size());
|
||||
done = true;
|
||||
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
|
||||
} else if (super_version->imm->Get(lkey, &pSlice, &s, &merge_context,
|
||||
} else if (super_version->imm->Get(lkey, value, &s, &merge_context,
|
||||
&range_del_agg, read_options)) {
|
||||
value->assign(pSlice.data(), pSlice.size());
|
||||
done = true;
|
||||
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
|
||||
}
|
||||
}
|
||||
if (!done) {
|
||||
PERF_TIMER_GUARD(get_from_output_files_time);
|
||||
super_version->current->Get(read_options, lkey, &pSlice, &s,
|
||||
&merge_context, &range_del_agg);
|
||||
value->assign(pSlice.data(), pSlice.size());
|
||||
super_version->current->Get(read_options, lkey, value, &s, &merge_context,
|
||||
&range_del_agg);
|
||||
// TODO(?): RecordTick(stats_, MEMTABLE_MISS)?
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
bytes_read += pSlice.size();
|
||||
bytes_read += value->size();
|
||||
}
|
||||
}
|
||||
|
||||
@ -4337,12 +4332,7 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options,
|
||||
}
|
||||
ReadOptions roptions = read_options;
|
||||
roptions.read_tier = kBlockCacheTier; // read from block cache only
|
||||
PinnableSlice pSlice;
|
||||
PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
|
||||
auto s = GetImpl(roptions, column_family, key, pSlicePtr, value_found);
|
||||
if (value != nullptr) {
|
||||
value->assign(pSlice.data(), pSlice.size());
|
||||
}
|
||||
auto s = GetImpl(roptions, column_family, key, value, value_found);
|
||||
|
||||
// If block_cache is enabled and the index block of the table didn't
|
||||
// not present in block_cache, the return value will be Status::Incomplete.
|
||||
@ -6405,8 +6395,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
|
||||
*found_record_for_key = false;
|
||||
|
||||
// Check if there is a record for this key in the latest memtable
|
||||
PinnableSlice* pSliceNullPtr = nullptr;
|
||||
sv->mem->Get(lkey, pSliceNullPtr, &s, &merge_context, &range_del_agg, seq,
|
||||
sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq,
|
||||
read_options);
|
||||
|
||||
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
|
||||
@ -6425,7 +6414,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
|
||||
}
|
||||
|
||||
// Check if there is a record for this key in the immutable memtables
|
||||
sv->imm->Get(lkey, pSliceNullPtr, &s, &merge_context, &range_del_agg, seq,
|
||||
sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq,
|
||||
read_options);
|
||||
|
||||
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
|
||||
|
@ -91,7 +91,7 @@ class DBImpl : public DB {
|
||||
using DB::Get;
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* value) override;
|
||||
std::string* value) override;
|
||||
using DB::MultiGet;
|
||||
virtual std::vector<Status> MultiGet(
|
||||
const ReadOptions& options,
|
||||
@ -1088,7 +1088,7 @@ class DBImpl : public DB {
|
||||
// Function that Get and KeyMayExist call with no_io true or false
|
||||
// Note: 'value_found' from KeyMayExist propagates here
|
||||
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, PinnableSlice* pSlice,
|
||||
const Slice& key, std::string* value,
|
||||
bool* value_found = nullptr);
|
||||
|
||||
bool GetIntPropertyInternal(ColumnFamilyData* cfd,
|
||||
|
@ -31,7 +31,7 @@ DBImplReadOnly::~DBImplReadOnly() {
|
||||
// Implementations of the DB interface
|
||||
Status DBImplReadOnly::Get(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pSlice) {
|
||||
std::string* value) {
|
||||
Status s;
|
||||
SequenceNumber snapshot = versions_->LastSequence();
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
@ -40,11 +40,11 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
|
||||
MergeContext merge_context;
|
||||
RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);
|
||||
LookupKey lkey(key, snapshot);
|
||||
if (super_version->mem->Get(lkey, pSlice, &s, &merge_context, &range_del_agg,
|
||||
if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg,
|
||||
read_options)) {
|
||||
} else {
|
||||
PERF_TIMER_GUARD(get_from_output_files_time);
|
||||
super_version->current->Get(read_options, lkey, pSlice, &s, &merge_context,
|
||||
super_version->current->Get(read_options, lkey, value, &s, &merge_context,
|
||||
&range_del_agg);
|
||||
}
|
||||
return s;
|
||||
|
@ -22,7 +22,7 @@ class DBImplReadOnly : public DBImpl {
|
||||
using DB::Get;
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* value) override;
|
||||
std::string* value) override;
|
||||
|
||||
// TODO: Implement ReadOnly MultiGet?
|
||||
|
||||
|
@ -2691,7 +2691,7 @@ class ModelDB : public DB {
|
||||
}
|
||||
using DB::Get;
|
||||
virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf,
|
||||
const Slice& key, PinnableSlice* pSlice) override {
|
||||
const Slice& key, std::string* value) override {
|
||||
return Status::NotSupported(key);
|
||||
}
|
||||
|
||||
|
@ -520,7 +520,7 @@ struct Saver {
|
||||
const LookupKey* key;
|
||||
bool* found_final_value; // Is value set correctly? Used by KeyMayExist
|
||||
bool* merge_in_progress;
|
||||
PinnableSlice* pSlice;
|
||||
std::string* value;
|
||||
SequenceNumber seq;
|
||||
const MergeOperator* merge_operator;
|
||||
// the merge operations encountered;
|
||||
@ -534,10 +534,6 @@ struct Saver {
|
||||
};
|
||||
} // namespace
|
||||
|
||||
//static void UnrefMemTable(void* s, void*) {
|
||||
// reinterpret_cast<MemTable*>(s)->Unref();
|
||||
//}
|
||||
|
||||
static bool SaveValue(void* arg, const char* entry) {
|
||||
Saver* s = reinterpret_cast<Saver*>(arg);
|
||||
MergeContext* merge_context = s->merge_context;
|
||||
@ -575,18 +571,13 @@ static bool SaveValue(void* arg, const char* entry) {
|
||||
}
|
||||
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
|
||||
*(s->status) = Status::OK();
|
||||
if (LIKELY(s->pSlice != nullptr)) {
|
||||
if (*(s->merge_in_progress)) {
|
||||
*(s->status) = MergeHelper::TimedFullMerge(
|
||||
merge_operator, s->key->user_key(), &v,
|
||||
merge_context->GetOperands(), s->pSlice->GetSelf(), s->logger,
|
||||
s->statistics, s->env_);
|
||||
s->pSlice->PinSelf();
|
||||
} else {
|
||||
//s->mem->Ref();
|
||||
//s->pSlice->PinSlice(v, UnrefMemTable, s->mem, nullptr);
|
||||
s->pSlice->PinSelf(v);
|
||||
}
|
||||
if (*(s->merge_in_progress)) {
|
||||
*(s->status) = MergeHelper::TimedFullMerge(
|
||||
merge_operator, s->key->user_key(), &v,
|
||||
merge_context->GetOperands(), s->value, s->logger, s->statistics,
|
||||
s->env_);
|
||||
} else if (s->value != nullptr) {
|
||||
s->value->assign(v.data(), v.size());
|
||||
}
|
||||
if (s->inplace_update_support) {
|
||||
s->mem->GetLock(s->key->user_key())->ReadUnlock();
|
||||
@ -598,14 +589,10 @@ static bool SaveValue(void* arg, const char* entry) {
|
||||
case kTypeSingleDeletion:
|
||||
case kTypeRangeDeletion: {
|
||||
if (*(s->merge_in_progress)) {
|
||||
*(s->status) = Status::OK();
|
||||
if (s->pSlice != nullptr) {
|
||||
*(s->status) = MergeHelper::TimedFullMerge(
|
||||
merge_operator, s->key->user_key(), nullptr,
|
||||
merge_context->GetOperands(), s->pSlice->GetSelf(), s->logger,
|
||||
s->statistics, s->env_);
|
||||
s->pSlice->PinSelf();
|
||||
}
|
||||
*(s->status) = MergeHelper::TimedFullMerge(
|
||||
merge_operator, s->key->user_key(), nullptr,
|
||||
merge_context->GetOperands(), s->value, s->logger, s->statistics,
|
||||
s->env_);
|
||||
} else {
|
||||
*(s->status) = Status::NotFound();
|
||||
}
|
||||
@ -639,7 +626,7 @@ static bool SaveValue(void* arg, const char* entry) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool MemTable::Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
|
||||
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
|
||||
const ReadOptions& read_opts) {
|
||||
@ -677,7 +664,7 @@ bool MemTable::Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
|
||||
saver.found_final_value = &found_final_value;
|
||||
saver.merge_in_progress = &merge_in_progress;
|
||||
saver.key = &key;
|
||||
saver.pSlice = pSlice;
|
||||
saver.value = value;
|
||||
saver.seq = kMaxSequenceNumber;
|
||||
saver.mem = this;
|
||||
saver.merge_context = merge_context;
|
||||
|
@ -186,27 +186,17 @@ class MemTable {
|
||||
// returned). Otherwise, *seq will be set to kMaxSequenceNumber.
|
||||
// On success, *s may be set to OK, NotFound, or MergeInProgress. Any other
|
||||
// status returned indicates a corruption or other unexpected error.
|
||||
bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
|
||||
bool Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
SequenceNumber* seq, const ReadOptions& read_opts);
|
||||
|
||||
inline bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
|
||||
MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts) {
|
||||
bool Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts) {
|
||||
SequenceNumber seq;
|
||||
return Get(key, pSlice, s, merge_context, range_del_agg, &seq, read_opts);
|
||||
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
|
||||
}
|
||||
|
||||
// deprecated. Use Get with PinnableSlice
|
||||
bool Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
SequenceNumber* seq, const ReadOptions& read_opts);
|
||||
// deprecated. Use Get with PinnableSlice
|
||||
bool Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts);
|
||||
|
||||
// Attempts to update the new_value inplace, else does normal Add
|
||||
// Pseudocode
|
||||
// if key exists in current memtable && prev_value is of type kTypeValue
|
||||
|
@ -100,12 +100,12 @@ int MemTableList::NumFlushed() const {
|
||||
// Search all the memtables starting from the most recent one.
|
||||
// Return the most recent value found, if any.
|
||||
// Operands stores the list of merge operations to apply, so far.
|
||||
bool MemTableListVersion::Get(const LookupKey& key, PinnableSlice* pSlice,
|
||||
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
|
||||
Status* s, MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
SequenceNumber* seq,
|
||||
const ReadOptions& read_opts) {
|
||||
return GetFromList(&memlist_, key, pSlice, s, merge_context, range_del_agg,
|
||||
return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg,
|
||||
seq, read_opts);
|
||||
}
|
||||
|
||||
@ -115,26 +115,22 @@ bool MemTableListVersion::GetFromHistory(const LookupKey& key,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
SequenceNumber* seq,
|
||||
const ReadOptions& read_opts) {
|
||||
PinnableSlice pSlice;
|
||||
PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
|
||||
auto res = GetFromList(&memlist_history_, key, pSlicePtr, s, merge_context,
|
||||
range_del_agg, seq, read_opts);
|
||||
if (value != nullptr) {
|
||||
value->assign(pSlice.data(), pSlice.size());
|
||||
}
|
||||
return res;
|
||||
return GetFromList(&memlist_history_, key, value, s, merge_context,
|
||||
range_del_agg, seq, read_opts);
|
||||
}
|
||||
|
||||
bool MemTableListVersion::GetFromList(
|
||||
std::list<MemTable*>* list, const LookupKey& key, PinnableSlice* pSlice,
|
||||
Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
SequenceNumber* seq, const ReadOptions& read_opts) {
|
||||
bool MemTableListVersion::GetFromList(std::list<MemTable*>* list,
|
||||
const LookupKey& key, std::string* value,
|
||||
Status* s, MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
SequenceNumber* seq,
|
||||
const ReadOptions& read_opts) {
|
||||
*seq = kMaxSequenceNumber;
|
||||
|
||||
for (auto& memtable : *list) {
|
||||
SequenceNumber current_seq = kMaxSequenceNumber;
|
||||
|
||||
bool done = memtable->Get(key, pSlice, s, merge_context, range_del_agg,
|
||||
bool done = memtable->Get(key, value, s, merge_context, range_del_agg,
|
||||
¤t_seq, read_opts);
|
||||
if (*seq == kMaxSequenceNumber) {
|
||||
// Store the most recent sequence number of any operation on this key.
|
||||
|
@ -53,27 +53,17 @@ class MemTableListVersion {
|
||||
// If any operation was found for this key, its most recent sequence number
|
||||
// will be stored in *seq on success (regardless of whether true/false is
|
||||
// returned). Otherwise, *seq will be set to kMaxSequenceNumber.
|
||||
bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
|
||||
bool Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
SequenceNumber* seq, const ReadOptions& read_opts);
|
||||
|
||||
inline bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
|
||||
MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts) {
|
||||
bool Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts) {
|
||||
SequenceNumber seq;
|
||||
return Get(key, pSlice, s, merge_context, range_del_agg, &seq, read_opts);
|
||||
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
|
||||
}
|
||||
|
||||
// deprecated. Use Get with PinnableSlice
|
||||
bool Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
SequenceNumber* seq, const ReadOptions& read_opts);
|
||||
// deprecated. Use Get with PinnableSlice
|
||||
bool Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts);
|
||||
|
||||
// Similar to Get(), but searches the Memtable history of memtables that
|
||||
// have already been flushed. Should only be used from in-memory only
|
||||
// queries (such as Transaction validation) as the history may contain
|
||||
@ -82,10 +72,10 @@ class MemTableListVersion {
|
||||
MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
|
||||
const ReadOptions& read_opts);
|
||||
inline bool GetFromHistory(const LookupKey& key, std::string* value,
|
||||
Status* s, MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts) {
|
||||
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts) {
|
||||
SequenceNumber seq;
|
||||
return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq,
|
||||
read_opts);
|
||||
@ -122,8 +112,7 @@ class MemTableListVersion {
|
||||
void TrimHistory(autovector<MemTable*>* to_delete);
|
||||
|
||||
bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
|
||||
PinnableSlice* pSlice, Status* s,
|
||||
MergeContext* merge_context,
|
||||
std::string* value, Status* s, MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
|
||||
const ReadOptions& read_opts);
|
||||
|
||||
|
@ -928,7 +928,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
|
||||
version_number_(version_number) {}
|
||||
|
||||
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
PinnableSlice* pSlice, Status* status,
|
||||
std::string* value, Status* status,
|
||||
MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg, bool* value_found,
|
||||
bool* key_exists, SequenceNumber* seq) {
|
||||
@ -946,7 +946,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
GetContext get_context(
|
||||
user_comparator(), merge_operator_, info_log_, db_statistics_,
|
||||
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
|
||||
pSlice, value_found, merge_context, range_del_agg, this->env_, seq,
|
||||
value, value_found, merge_context, range_del_agg, this->env_, seq,
|
||||
merge_operator_ ? &pinned_iters_mgr : nullptr);
|
||||
|
||||
// Pin blocks that we read to hold merge operands
|
||||
@ -1005,13 +1005,9 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
}
|
||||
// merge_operands are in saver and we hit the beginning of the key history
|
||||
// do a final merge of nullptr and operands;
|
||||
std::string* str_value = pSlice != nullptr ? pSlice->GetSelf() : nullptr;
|
||||
*status = MergeHelper::TimedFullMerge(
|
||||
merge_operator_, user_key, nullptr, merge_context->GetOperands(),
|
||||
str_value, info_log_, db_statistics_, env_);
|
||||
if (LIKELY(pSlice != nullptr)) {
|
||||
pSlice->PinSelf();
|
||||
}
|
||||
*status = MergeHelper::TimedFullMerge(merge_operator_, user_key, nullptr,
|
||||
merge_context->GetOperands(), value,
|
||||
info_log_, db_statistics_, env_);
|
||||
} else {
|
||||
if (key_exists != nullptr) {
|
||||
*key_exists = false;
|
||||
|
@ -457,7 +457,7 @@ class Version {
|
||||
// for the key if a key was found.
|
||||
//
|
||||
// REQUIRES: lock is not held
|
||||
void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* pSlice,
|
||||
void Get(const ReadOptions&, const LookupKey& key, std::string* val,
|
||||
Status* status, MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg, bool* value_found = nullptr,
|
||||
bool* key_exists = nullptr, SequenceNumber* seq = nullptr);
|
||||
|
@ -1,73 +0,0 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
//
|
||||
// An iterator yields a sequence of key/value pairs from a source.
|
||||
// The following class defines the interface. Multiple implementations
|
||||
// are provided by this library. In particular, iterators are provided
|
||||
// to access the contents of a Table or a DB.
|
||||
//
|
||||
// Multiple threads can invoke const methods on an Iterator without
|
||||
// external synchronization, but if any of the threads may call a
|
||||
// non-const method, all threads accessing the same Iterator must use
|
||||
// external synchronization.
|
||||
|
||||
#ifndef INCLUDE_ROCKSDB_CLEANABLE_H_
|
||||
#define INCLUDE_ROCKSDB_CLEANABLE_H_
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class Cleanable {
|
||||
public:
|
||||
Cleanable();
|
||||
~Cleanable();
|
||||
// Clients are allowed to register function/arg1/arg2 triples that
|
||||
// will be invoked when this iterator is destroyed.
|
||||
//
|
||||
// Note that unlike all of the preceding methods, this method is
|
||||
// not abstract and therefore clients should not override it.
|
||||
typedef void (*CleanupFunction)(void* arg1, void* arg2);
|
||||
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
|
||||
void DelegateCleanupsTo(Cleanable* other);
|
||||
// DoCkeanup and also resets the pointers for reuse
|
||||
inline void Reset() {
|
||||
DoCleanup();
|
||||
cleanup_.function = nullptr;
|
||||
cleanup_.next = nullptr;
|
||||
}
|
||||
|
||||
protected:
|
||||
struct Cleanup {
|
||||
CleanupFunction function;
|
||||
void* arg1;
|
||||
void* arg2;
|
||||
Cleanup* next;
|
||||
};
|
||||
Cleanup cleanup_;
|
||||
// It also becomes the owner of c
|
||||
void RegisterCleanup(Cleanup* c);
|
||||
|
||||
private:
|
||||
// Performs all the cleanups. It does not reset the pointers. Making it
|
||||
// private
|
||||
// to prevent misuse
|
||||
inline void DoCleanup() {
|
||||
if (cleanup_.function != nullptr) {
|
||||
(*cleanup_.function)(cleanup_.arg1, cleanup_.arg2);
|
||||
for (Cleanup* c = cleanup_.next; c != nullptr;) {
|
||||
(*c->function)(c->arg1, c->arg2);
|
||||
Cleanup* next = c->next;
|
||||
delete c;
|
||||
c = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // INCLUDE_ROCKSDB_CLEANABLE_H_
|
@ -280,20 +280,9 @@ class DB {
|
||||
// a status for which Status::IsNotFound() returns true.
|
||||
//
|
||||
// May return some other Status on an error.
|
||||
inline Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
std::string* value) {
|
||||
PinnableSlice pSlice;
|
||||
PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
|
||||
auto s = Get(options, column_family, key, pSlicePtr);
|
||||
if (value != nullptr && s.ok()) {
|
||||
value->assign(pSlice.data(), pSlice.size());
|
||||
}
|
||||
return s;
|
||||
}
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* value) = 0;
|
||||
std::string* value) = 0;
|
||||
virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) {
|
||||
return Get(options, DefaultColumnFamily(), key, value);
|
||||
}
|
||||
|
@ -20,12 +20,43 @@
|
||||
#define STORAGE_ROCKSDB_INCLUDE_ITERATOR_H_
|
||||
|
||||
#include <string>
|
||||
#include "rocksdb/cleanable.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/status.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class Cleanable {
|
||||
public:
|
||||
Cleanable();
|
||||
~Cleanable();
|
||||
// Clients are allowed to register function/arg1/arg2 triples that
|
||||
// will be invoked when this iterator is destroyed.
|
||||
//
|
||||
// Note that unlike all of the preceding methods, this method is
|
||||
// not abstract and therefore clients should not override it.
|
||||
typedef void (*CleanupFunction)(void* arg1, void* arg2);
|
||||
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
|
||||
void DelegateCleanupsTo(Cleanable* other);
|
||||
// DoCleanup and also resets the pointers for reuse
|
||||
void Reset();
|
||||
|
||||
protected:
|
||||
struct Cleanup {
|
||||
CleanupFunction function;
|
||||
void* arg1;
|
||||
void* arg2;
|
||||
Cleanup* next;
|
||||
};
|
||||
Cleanup cleanup_;
|
||||
// It also becomes the owner of c
|
||||
void RegisterCleanup(Cleanup* c);
|
||||
|
||||
private:
|
||||
// Performs all the cleanups. It does not reset the pointers. Making it
|
||||
// private to prevent misuse
|
||||
inline void DoCleanup();
|
||||
};
|
||||
|
||||
class Iterator : public Cleanable {
|
||||
public:
|
||||
Iterator() {}
|
||||
|
@ -25,8 +25,6 @@
|
||||
#include <string.h>
|
||||
#include <string>
|
||||
|
||||
#include "rocksdb/cleanable.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class Slice {
|
||||
@ -118,61 +116,6 @@ class Slice {
|
||||
// Intentionally copyable
|
||||
};
|
||||
|
||||
class PinnableSlice : public Slice, public Cleanable {
|
||||
public:
|
||||
PinnableSlice() {}
|
||||
|
||||
inline void PinSlice(const Slice& s, CleanupFunction f, void* arg1,
|
||||
void* arg2) {
|
||||
data_ = s.data();
|
||||
size_ = s.size();
|
||||
RegisterCleanup(f, arg1, arg2);
|
||||
}
|
||||
|
||||
inline void PinSlice(const Slice& s, Cleanable* cleanable) {
|
||||
data_ = s.data();
|
||||
size_ = s.size();
|
||||
cleanable->DelegateCleanupsTo(this);
|
||||
}
|
||||
|
||||
inline void PinHeap(const char* s, const size_t n) {
|
||||
data_ = s;
|
||||
size_ = n;
|
||||
RegisterCleanup(ReleaseCharStrHeap, const_cast<char*>(data_), nullptr);
|
||||
}
|
||||
|
||||
inline void PinHeap(std::string* s) {
|
||||
data_ = s->data();
|
||||
size_ = s->size();
|
||||
RegisterCleanup(ReleaseStringHeap, s, nullptr);
|
||||
}
|
||||
|
||||
inline void PinSelf(const Slice& slice) {
|
||||
self_space.assign(slice.data(), slice.size());
|
||||
data_ = self_space.data();
|
||||
size_ = self_space.size();
|
||||
}
|
||||
|
||||
inline void PinSelf() {
|
||||
data_ = self_space.data();
|
||||
size_ = self_space.size();
|
||||
}
|
||||
|
||||
inline std::string* GetSelf() { return &self_space; }
|
||||
|
||||
inline bool IsPinned() { return cleanup_.function != nullptr; }
|
||||
|
||||
private:
|
||||
friend class PinnableSlice4Test;
|
||||
std::string self_space;
|
||||
static void ReleaseCharStrHeap(void* s, void*) {
|
||||
delete reinterpret_cast<const char*>(s);
|
||||
}
|
||||
static void ReleaseStringHeap(void* s, void*) {
|
||||
delete reinterpret_cast<const std::string*>(s);
|
||||
}
|
||||
};
|
||||
|
||||
// A set of Slices that are virtually concatenated together. 'parts' points
|
||||
// to an array of Slices. The number of elements in the array is 'num_parts'.
|
||||
struct SliceParts {
|
||||
|
@ -56,8 +56,8 @@ class StackableDB : public DB {
|
||||
using DB::Get;
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pSlice) override {
|
||||
return db_->Get(options, column_family, key, pSlice);
|
||||
std::string* value) override {
|
||||
return db_->Get(options, column_family, key, value);
|
||||
}
|
||||
|
||||
using DB::MultiGet;
|
||||
|
@ -1537,6 +1537,9 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
|
||||
BlockIter iiter;
|
||||
NewIndexIterator(read_options, &iiter);
|
||||
|
||||
PinnedIteratorsManager* pinned_iters_mgr = get_context->pinned_iters_mgr();
|
||||
bool pin_blocks = pinned_iters_mgr && pinned_iters_mgr->PinningEnabled();
|
||||
|
||||
bool done = false;
|
||||
for (iiter.Seek(key); iiter.Valid() && !done; iiter.Next()) {
|
||||
Slice handle_value = iiter.value();
|
||||
@ -1577,12 +1580,17 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
|
||||
s = Status::Corruption(Slice());
|
||||
}
|
||||
|
||||
if (!get_context->SaveValue(parsed_key, biter.value(), &biter)) {
|
||||
if (!get_context->SaveValue(parsed_key, biter.value(), pin_blocks)) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
s = biter.status();
|
||||
|
||||
if (pin_blocks && get_context->State() == GetContext::kMerge) {
|
||||
// Pin blocks as long as we are merging
|
||||
biter.DelegateCleanupsTo(pinned_iters_mgr);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
|
@ -47,30 +47,6 @@ TEST_F(CleanableTest, Register) {
|
||||
}
|
||||
// ~Cleanable
|
||||
ASSERT_EQ(6, res);
|
||||
|
||||
// Test the Reset does cleanup
|
||||
res = 1;
|
||||
{
|
||||
Cleanable c1;
|
||||
c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2;
|
||||
c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3;
|
||||
c1.Reset();
|
||||
ASSERT_EQ(6, res);
|
||||
}
|
||||
// ~Cleanable
|
||||
ASSERT_EQ(6, res);
|
||||
|
||||
// Test Clenable is usable after Reset
|
||||
res = 1;
|
||||
{
|
||||
Cleanable c1;
|
||||
c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2;
|
||||
c1.Reset();
|
||||
ASSERT_EQ(2, res);
|
||||
c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3;
|
||||
}
|
||||
// ~Cleanable
|
||||
ASSERT_EQ(6, res);
|
||||
}
|
||||
|
||||
// the first Cleanup is on stack and the rest on heap,
|
||||
@ -198,102 +174,6 @@ TEST_F(CleanableTest, Delegation) {
|
||||
ASSERT_EQ(5, res);
|
||||
}
|
||||
|
||||
class PinnableSlice4Test : public PinnableSlice {
|
||||
public:
|
||||
void TestCharStrIsRegistered(char* s) {
|
||||
ASSERT_EQ(cleanup_.function, ReleaseCharStrHeap);
|
||||
ASSERT_EQ(cleanup_.arg1, s);
|
||||
ASSERT_EQ(cleanup_.arg2, nullptr);
|
||||
ASSERT_EQ(cleanup_.next, nullptr);
|
||||
}
|
||||
|
||||
void TestStringIsRegistered(std::string* s) {
|
||||
ASSERT_EQ(cleanup_.function, ReleaseStringHeap);
|
||||
ASSERT_EQ(cleanup_.arg1, s);
|
||||
ASSERT_EQ(cleanup_.arg2, nullptr);
|
||||
ASSERT_EQ(cleanup_.next, nullptr);
|
||||
}
|
||||
};
|
||||
|
||||
// Putting the PinnableSlice tests here due to similarity to Cleanable tests
|
||||
TEST_F(CleanableTest, PinnableSlice) {
|
||||
int n2 = 2;
|
||||
int res = 1;
|
||||
const std::string const_str = "123";
|
||||
const char* const_s = "123";
|
||||
|
||||
{
|
||||
PinnableSlice4Test pSlice;
|
||||
char* s = strdup(const_s);
|
||||
pSlice.PinHeap(s, strlen(const_s));
|
||||
std::string str;
|
||||
str.assign(pSlice.data(), pSlice.size());
|
||||
ASSERT_EQ(const_s, str);
|
||||
pSlice.TestCharStrIsRegistered(s);
|
||||
}
|
||||
|
||||
{
|
||||
PinnableSlice4Test pSlice;
|
||||
std::string* heap_str = new std::string(const_str);
|
||||
pSlice.PinHeap(heap_str);
|
||||
std::string str;
|
||||
str.assign(pSlice.data(), pSlice.size());
|
||||
ASSERT_EQ(const_str, str);
|
||||
pSlice.TestStringIsRegistered(heap_str);
|
||||
}
|
||||
|
||||
{
|
||||
res = 1;
|
||||
PinnableSlice4Test pSlice;
|
||||
Slice slice(const_str);
|
||||
pSlice.PinSlice(slice, Multiplier, &res, &n2);
|
||||
std::string str;
|
||||
str.assign(pSlice.data(), pSlice.size());
|
||||
ASSERT_EQ(const_str, str);
|
||||
}
|
||||
// ~Cleanable
|
||||
ASSERT_EQ(2, res);
|
||||
|
||||
{
|
||||
res = 1;
|
||||
PinnableSlice4Test pSlice;
|
||||
Slice slice(const_str);
|
||||
{
|
||||
Cleanable c1;
|
||||
c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2;
|
||||
pSlice.PinSlice(slice, &c1);
|
||||
}
|
||||
// ~Cleanable
|
||||
ASSERT_EQ(1, res); // cleanups must have be delegated to pSlice
|
||||
std::string str;
|
||||
str.assign(pSlice.data(), pSlice.size());
|
||||
ASSERT_EQ(const_str, str);
|
||||
}
|
||||
// ~Cleanable
|
||||
ASSERT_EQ(2, res);
|
||||
|
||||
{
|
||||
PinnableSlice4Test pSlice;
|
||||
Slice slice(const_str);
|
||||
pSlice.PinSelf(slice);
|
||||
std::string str;
|
||||
str.assign(pSlice.data(), pSlice.size());
|
||||
ASSERT_EQ(const_str, str);
|
||||
ASSERT_EQ(false, pSlice.IsPinned()); // self pinned
|
||||
}
|
||||
|
||||
{
|
||||
PinnableSlice4Test pSlice;
|
||||
std::string* self_str_ptr = pSlice.GetSelf();
|
||||
self_str_ptr->assign(const_str);
|
||||
pSlice.PinSelf();
|
||||
std::string str;
|
||||
str.assign(pSlice.data(), pSlice.size());
|
||||
ASSERT_EQ(const_str, str);
|
||||
ASSERT_EQ(false, pSlice.IsPinned()); // self pinned
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -123,12 +123,12 @@ class CuckooReaderTest : public testing::Test {
|
||||
ASSERT_OK(reader.status());
|
||||
// Assume no merge/deletion
|
||||
for (uint32_t i = 0; i < num_items; ++i) {
|
||||
PinnableSlice value;
|
||||
std::string value;
|
||||
GetContext get_context(ucomp, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, Slice(user_keys[i]), &value,
|
||||
nullptr, nullptr, nullptr, nullptr);
|
||||
ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context));
|
||||
ASSERT_STREQ(values[i].c_str(), value.data());
|
||||
ASSERT_EQ(values[i], value);
|
||||
}
|
||||
}
|
||||
void UpdateKeys(bool with_zero_seqno) {
|
||||
@ -333,7 +333,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
|
||||
AddHashLookups(not_found_user_key, 0, kNumHashFunc);
|
||||
ParsedInternalKey ikey(not_found_user_key, 1000, kTypeValue);
|
||||
AppendInternalKey(¬_found_key, ikey);
|
||||
PinnableSlice value;
|
||||
std::string value;
|
||||
GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound,
|
||||
Slice(not_found_key), &value, nullptr, nullptr,
|
||||
nullptr, nullptr);
|
||||
@ -346,7 +346,6 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
|
||||
ParsedInternalKey ikey2(not_found_user_key2, 1000, kTypeValue);
|
||||
std::string not_found_key2;
|
||||
AppendInternalKey(¬_found_key2, ikey2);
|
||||
value.Reset();
|
||||
GetContext get_context2(ucmp, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, Slice(not_found_key2), &value,
|
||||
nullptr, nullptr, nullptr, nullptr);
|
||||
@ -361,7 +360,6 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
|
||||
// Add hash values that map to empty buckets.
|
||||
AddHashLookups(ExtractUserKey(unused_key).ToString(),
|
||||
kNumHashFunc, kNumHashFunc);
|
||||
value.Reset();
|
||||
GetContext get_context3(ucmp, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, Slice(unused_key), &value,
|
||||
nullptr, nullptr, nullptr, nullptr);
|
||||
@ -435,13 +433,12 @@ void WriteFile(const std::vector<std::string>& keys,
|
||||
test::Uint64Comparator(), nullptr);
|
||||
ASSERT_OK(reader.status());
|
||||
ReadOptions r_options;
|
||||
PinnableSlice value;
|
||||
std::string value;
|
||||
// Assume only the fast path is triggered
|
||||
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, Slice(), &value, nullptr,
|
||||
nullptr, nullptr, nullptr);
|
||||
for (uint64_t i = 0; i < num; ++i) {
|
||||
value.Reset();
|
||||
value.clear();
|
||||
ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context));
|
||||
ASSERT_TRUE(Slice(keys[i]) == Slice(&keys[i][0], 4));
|
||||
@ -483,7 +480,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
|
||||
}
|
||||
std::random_shuffle(keys.begin(), keys.end());
|
||||
|
||||
PinnableSlice value;
|
||||
std::string value;
|
||||
// Assume only the fast path is triggered
|
||||
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, Slice(), &value, nullptr,
|
||||
|
@ -35,7 +35,7 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
|
||||
GetContext::GetContext(const Comparator* ucmp,
|
||||
const MergeOperator* merge_operator, Logger* logger,
|
||||
Statistics* statistics, GetState init_state,
|
||||
const Slice& user_key, PinnableSlice* pSlice,
|
||||
const Slice& user_key, std::string* ret_value,
|
||||
bool* value_found, MergeContext* merge_context,
|
||||
RangeDelAggregator* _range_del_agg, Env* env,
|
||||
SequenceNumber* seq,
|
||||
@ -46,7 +46,7 @@ GetContext::GetContext(const Comparator* ucmp,
|
||||
statistics_(statistics),
|
||||
state_(init_state),
|
||||
user_key_(user_key),
|
||||
pSlice_(pSlice),
|
||||
value_(ret_value),
|
||||
value_found_(value_found),
|
||||
merge_context_(merge_context),
|
||||
range_del_agg_(_range_del_agg),
|
||||
@ -76,13 +76,13 @@ void GetContext::SaveValue(const Slice& value, SequenceNumber seq) {
|
||||
appendToReplayLog(replay_log_, kTypeValue, value);
|
||||
|
||||
state_ = kFound;
|
||||
if (LIKELY(pSlice_ != nullptr)) {
|
||||
pSlice_->PinSelf(value);
|
||||
if (value_ != nullptr) {
|
||||
value_->assign(value.data(), value.size());
|
||||
}
|
||||
}
|
||||
|
||||
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
|
||||
const Slice& value, Cleanable* value_pinner) {
|
||||
const Slice& value, bool value_pinned) {
|
||||
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
|
||||
merge_context_ != nullptr);
|
||||
if (ucmp_->Equal(parsed_key.user_key, user_key_)) {
|
||||
@ -106,22 +106,17 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
|
||||
assert(state_ == kNotFound || state_ == kMerge);
|
||||
if (kNotFound == state_) {
|
||||
state_ = kFound;
|
||||
if (LIKELY(pSlice_ != nullptr)) {
|
||||
if (LIKELY(value_pinner != nullptr)) {
|
||||
pSlice_->PinSlice(value, value_pinner);
|
||||
} else {
|
||||
pSlice_->PinSelf(value);
|
||||
}
|
||||
if (value_ != nullptr) {
|
||||
value_->assign(value.data(), value.size());
|
||||
}
|
||||
} else if (kMerge == state_) {
|
||||
assert(merge_operator_ != nullptr);
|
||||
state_ = kFound;
|
||||
if (LIKELY(pSlice_ != nullptr)) {
|
||||
if (value_ != nullptr) {
|
||||
Status merge_status = MergeHelper::TimedFullMerge(
|
||||
merge_operator_, user_key_, &value,
|
||||
merge_context_->GetOperands(), pSlice_->GetSelf(), logger_,
|
||||
statistics_, env_);
|
||||
pSlice_->PinSelf();
|
||||
merge_context_->GetOperands(), value_, logger_, statistics_,
|
||||
env_);
|
||||
if (!merge_status.ok()) {
|
||||
state_ = kCorrupt;
|
||||
}
|
||||
@ -139,12 +134,12 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
|
||||
state_ = kDeleted;
|
||||
} else if (kMerge == state_) {
|
||||
state_ = kFound;
|
||||
if (LIKELY(pSlice_ != nullptr)) {
|
||||
Status merge_status = MergeHelper::TimedFullMerge(
|
||||
merge_operator_, user_key_, nullptr,
|
||||
merge_context_->GetOperands(), pSlice_->GetSelf(), logger_,
|
||||
statistics_, env_);
|
||||
pSlice_->PinSelf();
|
||||
if (value_ != nullptr) {
|
||||
Status merge_status =
|
||||
MergeHelper::TimedFullMerge(merge_operator_, user_key_, nullptr,
|
||||
merge_context_->GetOperands(),
|
||||
value_, logger_, statistics_, env_);
|
||||
|
||||
if (!merge_status.ok()) {
|
||||
state_ = kCorrupt;
|
||||
}
|
||||
@ -155,14 +150,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
|
||||
case kTypeMerge:
|
||||
assert(state_ == kNotFound || state_ == kMerge);
|
||||
state_ = kMerge;
|
||||
// value_pinner is not set from plain_table_reader.cc for example.
|
||||
if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
|
||||
value_pinner != nullptr) {
|
||||
value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
|
||||
merge_context_->PushOperand(value, true /*value_pinned*/);
|
||||
} else {
|
||||
merge_context_->PushOperand(value, false);
|
||||
}
|
||||
merge_context_->PushOperand(value, value_pinned);
|
||||
return true;
|
||||
|
||||
default:
|
||||
@ -178,7 +166,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
|
||||
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
|
||||
GetContext* get_context) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
static Cleanable nonToClean;
|
||||
Slice s = replay_log;
|
||||
while (s.size()) {
|
||||
auto type = static_cast<ValueType>(*s.data());
|
||||
@ -191,8 +178,7 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
|
||||
// Since SequenceNumber is not stored and unknown, we will use
|
||||
// kMaxSequenceNumber.
|
||||
get_context->SaveValue(
|
||||
ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
|
||||
&nonToClean);
|
||||
ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, true);
|
||||
}
|
||||
#else // ROCKSDB_LITE
|
||||
assert(false);
|
||||
|
@ -9,7 +9,6 @@
|
||||
#include "db/range_del_aggregator.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/types.h"
|
||||
#include "table/block.h"
|
||||
|
||||
namespace rocksdb {
|
||||
class MergeContext;
|
||||
@ -27,7 +26,7 @@ class GetContext {
|
||||
|
||||
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
|
||||
Logger* logger, Statistics* statistics, GetState init_state,
|
||||
const Slice& user_key, PinnableSlice* pSlice, bool* value_found,
|
||||
const Slice& user_key, std::string* ret_value, bool* value_found,
|
||||
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
|
||||
Env* env, SequenceNumber* seq = nullptr,
|
||||
PinnedIteratorsManager* _pinned_iters_mgr = nullptr);
|
||||
@ -40,7 +39,7 @@ class GetContext {
|
||||
// Returns True if more keys need to be read (due to merges) or
|
||||
// False if the complete value has been found.
|
||||
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value,
|
||||
Cleanable* value_pinner = nullptr);
|
||||
bool value_pinned = false);
|
||||
|
||||
// Simplified version of the previous function. Should only be used when we
|
||||
// know that the operation is a Put.
|
||||
@ -69,7 +68,7 @@ class GetContext {
|
||||
|
||||
GetState state_;
|
||||
Slice user_key_;
|
||||
PinnableSlice* pSlice_;
|
||||
std::string* value_;
|
||||
bool* value_found_; // Is value set correctly? Used by KeyMayExist
|
||||
MergeContext* merge_context_;
|
||||
RangeDelAggregator* range_del_agg_;
|
||||
|
@ -21,6 +21,24 @@ Cleanable::Cleanable() {
|
||||
|
||||
Cleanable::~Cleanable() { DoCleanup(); }
|
||||
|
||||
void Cleanable::Reset() {
|
||||
DoCleanup();
|
||||
cleanup_.function = nullptr;
|
||||
cleanup_.next = nullptr;
|
||||
}
|
||||
|
||||
void Cleanable::DoCleanup() {
|
||||
if (cleanup_.function != nullptr) {
|
||||
(*cleanup_.function)(cleanup_.arg1, cleanup_.arg2);
|
||||
for (Cleanup* c = cleanup_.next; c != nullptr;) {
|
||||
(*c->function)(c->arg1, c->arg2);
|
||||
Cleanup* next = c->next;
|
||||
delete c;
|
||||
c = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If the entire linked list was on heap we could have simply add attach one
|
||||
// link list to another. However the head is an embeded object to avoid the cost
|
||||
// of creating objects for most of the use cases when the Cleanable has only one
|
||||
|
@ -166,7 +166,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
|
||||
std::string key = MakeKey(r1, r2, through_db);
|
||||
uint64_t start_time = Now(env, measured_by_nanosecond);
|
||||
if (!through_db) {
|
||||
PinnableSlice value;
|
||||
std::string value;
|
||||
MergeContext merge_context;
|
||||
RangeDelAggregator range_del_agg(ikc, {} /* snapshots */);
|
||||
GetContext get_context(ioptions.user_comparator,
|
||||
|
@ -1968,12 +1968,12 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) {
|
||||
ASSERT_OK(c3.Reopen(ioptions4));
|
||||
reader = dynamic_cast<BlockBasedTable*>(c3.GetTableReader());
|
||||
ASSERT_TRUE(!reader->TEST_filter_block_preloaded());
|
||||
PinnableSlice value;
|
||||
std::string value;
|
||||
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, user_key, &value, nullptr,
|
||||
nullptr, nullptr, nullptr);
|
||||
ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context));
|
||||
ASSERT_STREQ(value.data(), "hello");
|
||||
ASSERT_EQ(value, "hello");
|
||||
BlockCachePropertiesSnapshot props(options.statistics.get());
|
||||
props.AssertFilterBlockStat(0, 0);
|
||||
c3.ResetTableReader();
|
||||
@ -2051,7 +2051,7 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) {
|
||||
c.Finish(options, ioptions, table_options,
|
||||
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
|
||||
auto reader = c.GetTableReader();
|
||||
PinnableSlice value;
|
||||
std::string value;
|
||||
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, user_key, &value, nullptr,
|
||||
nullptr, nullptr, nullptr);
|
||||
@ -2065,14 +2065,13 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) {
|
||||
ASSERT_EQ(perf_context.block_read_count, 1);
|
||||
}
|
||||
ASSERT_EQ(get_context.State(), GetContext::kFound);
|
||||
ASSERT_STREQ(value.data(), "hello");
|
||||
ASSERT_EQ(value, "hello");
|
||||
|
||||
// Get non-existing key
|
||||
user_key = "does-not-exist";
|
||||
internal_key = InternalKey(user_key, 0, kTypeValue);
|
||||
encoded_key = internal_key.Encode().ToString();
|
||||
|
||||
value.Reset();
|
||||
get_context = GetContext(options.comparator, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, user_key, &value, nullptr,
|
||||
nullptr, nullptr, nullptr);
|
||||
|
@ -229,8 +229,6 @@ DEFINE_bool(reverse_iterator, false,
|
||||
|
||||
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
|
||||
|
||||
DEFINE_bool(pin_slice, false, "use pinnable slice for point lookup");
|
||||
|
||||
DEFINE_int64(batch_size, 1, "Batch size");
|
||||
|
||||
static bool ValidateKeySize(const char* flagname, int32_t value) {
|
||||
@ -3774,7 +3772,6 @@ class Benchmark {
|
||||
std::unique_ptr<const char[]> key_guard;
|
||||
Slice key = AllocateKey(&key_guard);
|
||||
std::string value;
|
||||
PinnableSlice pSlice;
|
||||
|
||||
Duration duration(FLAGS_duration, reads_);
|
||||
while (!duration.Done(1)) {
|
||||
@ -3790,19 +3787,11 @@ class Benchmark {
|
||||
s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
|
||||
&value);
|
||||
} else {
|
||||
if (FLAGS_pin_slice == 1) {
|
||||
pSlice.Reset();
|
||||
s = db_with_cfh->db->Get(
|
||||
options, db_with_cfh->db->DefaultColumnFamily(), key, &pSlice);
|
||||
} else {
|
||||
s = db_with_cfh->db->Get(
|
||||
options, db_with_cfh->db->DefaultColumnFamily(), key, &value);
|
||||
}
|
||||
s = db_with_cfh->db->Get(options, key, &value);
|
||||
}
|
||||
if (s.ok()) {
|
||||
found++;
|
||||
bytes +=
|
||||
key.size() + (FLAGS_pin_slice == 1 ? pSlice.size() : value.size());
|
||||
bytes += key.size() + value.size();
|
||||
} else if (!s.IsNotFound()) {
|
||||
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
|
||||
abort();
|
||||
|
@ -12,59 +12,10 @@
|
||||
#include <cctype>
|
||||
#include <sstream>
|
||||
|
||||
#include "db/memtable_list.h"
|
||||
#include "port/port.h"
|
||||
#include "util/file_reader_writer.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// These methods are deprecated and only being used in tests for backward
|
||||
// compatibility
|
||||
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
|
||||
Status* s, MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
SequenceNumber* seq,
|
||||
const ReadOptions& read_opts) {
|
||||
PinnableSlice pSlice;
|
||||
PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
|
||||
auto res =
|
||||
Get(key, pSlicePtr, s, merge_context, range_del_agg, seq, read_opts);
|
||||
if (value != nullptr) {
|
||||
value->assign(pSlice.data(), pSlice.size());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
|
||||
Status* s, MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts) {
|
||||
SequenceNumber seq;
|
||||
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
|
||||
}
|
||||
|
||||
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
|
||||
const ReadOptions& read_opts) {
|
||||
PinnableSlice pSlice;
|
||||
PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
|
||||
auto res =
|
||||
Get(key, pSlicePtr, s, merge_context, range_del_agg, seq, read_opts);
|
||||
if (value != nullptr) {
|
||||
value->assign(pSlice.data(), pSlice.size());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context,
|
||||
RangeDelAggregator* range_del_agg,
|
||||
const ReadOptions& read_opts) {
|
||||
SequenceNumber seq;
|
||||
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
|
||||
}
|
||||
|
||||
namespace test {
|
||||
|
||||
Slice RandomString(Random* rnd, int len, std::string* dst) {
|
||||
|
@ -1039,7 +1039,7 @@ class DocumentDBImpl : public DocumentDB {
|
||||
// RocksDB functions
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pSlice) override {
|
||||
std::string* value) override {
|
||||
return Status::NotSupported("");
|
||||
}
|
||||
virtual Status Get(const ReadOptions& options, const Slice& key,
|
||||
|
@ -170,17 +170,6 @@ bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
|
||||
return (timestamp_value + ttl) < curtime;
|
||||
}
|
||||
|
||||
// Strips the TS from the end of the slice
|
||||
Status DBWithTTLImpl::StripTS(Slice* slice) {
|
||||
Status st;
|
||||
if (slice->size() < kTSLength) {
|
||||
return Status::Corruption("Bad timestamp in key-value");
|
||||
}
|
||||
// Erasing characters which hold the TS
|
||||
slice->remove_suffix(kTSLength);
|
||||
return st;
|
||||
}
|
||||
|
||||
// Strips the TS from the end of the string
|
||||
Status DBWithTTLImpl::StripTS(std::string* str) {
|
||||
Status st;
|
||||
@ -202,16 +191,16 @@ Status DBWithTTLImpl::Put(const WriteOptions& options,
|
||||
|
||||
Status DBWithTTLImpl::Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pSlice) {
|
||||
Status st = db_->Get(options, column_family, key, pSlice);
|
||||
std::string* value) {
|
||||
Status st = db_->Get(options, column_family, key, value);
|
||||
if (!st.ok()) {
|
||||
return st;
|
||||
}
|
||||
st = SanityCheckTimestamp(*pSlice);
|
||||
st = SanityCheckTimestamp(*value);
|
||||
if (!st.ok()) {
|
||||
return st;
|
||||
}
|
||||
return StripTS(pSlice);
|
||||
return StripTS(value);
|
||||
}
|
||||
|
||||
std::vector<Status> DBWithTTLImpl::MultiGet(
|
||||
|
@ -51,7 +51,7 @@ class DBWithTTLImpl : public DBWithTTL {
|
||||
using StackableDB::Get;
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pSlice) override;
|
||||
std::string* value) override;
|
||||
|
||||
using StackableDB::MultiGet;
|
||||
virtual std::vector<Status> MultiGet(
|
||||
@ -87,8 +87,6 @@ class DBWithTTLImpl : public DBWithTTL {
|
||||
|
||||
static Status StripTS(std::string* str);
|
||||
|
||||
static Status StripTS(Slice* str);
|
||||
|
||||
static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
|
||||
|
||||
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
|
||||
|
Loading…
x
Reference in New Issue
Block a user