diff --git a/db/compacted_db_impl.cc b/db/compacted_db_impl.cc index e6a8a9e38..f2eb02740 100644 --- a/db/compacted_db_impl.cc +++ b/db/compacted_db_impl.cc @@ -42,10 +42,10 @@ size_t CompactedDBImpl::FindFile(const Slice& key) { return right; } -Status CompactedDBImpl::Get(const ReadOptions& options, - ColumnFamilyHandle*, const Slice& key, std::string* value) { +Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, + const Slice& key, PinnableSlice* pSlice) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, - GetContext::kNotFound, key, value, nullptr, nullptr, + GetContext::kNotFound, key, pSlice, nullptr, nullptr, nullptr, nullptr); LookupKey lkey(key, kMaxSequenceNumber); files_.files[FindFile(key)].fd.table_reader->Get( @@ -75,11 +75,14 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, int idx = 0; for (auto* r : reader_list) { if (r != nullptr) { + PinnableSlice pSlice; + std::string& value = (*values)[idx]; GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, - GetContext::kNotFound, keys[idx], &(*values)[idx], - nullptr, nullptr, nullptr, nullptr); + GetContext::kNotFound, keys[idx], &pSlice, nullptr, + nullptr, nullptr, nullptr); LookupKey lkey(keys[idx], kMaxSequenceNumber); r->Get(options, lkey.internal_key(), &get_context); + value.assign(pSlice.data(), pSlice.size()); if (get_context.State() == GetContext::kFound) { statuses[idx] = Status::OK(); } diff --git a/db/compacted_db_impl.h b/db/compacted_db_impl.h index 9c42010a6..35962d486 100644 --- a/db/compacted_db_impl.h +++ b/db/compacted_db_impl.h @@ -23,7 +23,7 @@ class CompactedDBImpl : public DBImpl { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, diff --git a/db/db_impl.cc b/db/db_impl.cc index 7b737f66f..d8cbd8ed4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3907,8 +3907,8 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { - return GetImpl(read_options, column_family, key, value); + PinnableSlice* pSlice) { + return GetImpl(read_options, column_family, key, pSlice); } // JobContext gets created and destructed outside of the lock -- @@ -3965,7 +3965,7 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( Status DBImpl::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value, bool* value_found) { + PinnableSlice* pSlice, bool* value_found) { StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); @@ -3996,12 +3996,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, (read_options.read_tier == kPersistedTier && has_unpersisted_data_); bool done = false; if (!skip_memtable) { - if (sv->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, + if (sv->mem->Get(lkey, pSlice, &s, &merge_context, &range_del_agg, read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); } else if ((s.ok() || s.IsMergeInProgress()) && - sv->imm->Get(lkey, value, &s, &merge_context, &range_del_agg, + sv->imm->Get(lkey, pSlice, &s, &merge_context, &range_del_agg, read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); @@ -4012,7 +4012,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); - sv->current->Get(read_options, lkey, value, &s, &merge_context, + sv->current->Get(read_options, lkey, pSlice, &s, &merge_context, &range_del_agg, value_found); RecordTick(stats_, MEMTABLE_MISS); } @@ -4023,8 +4023,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, ReturnAndCleanupSuperVersion(cfd, sv); RecordTick(stats_, NUMBER_KEYS_READ); - RecordTick(stats_, BYTES_READ, value->size()); - MeasureTime(stats_, BYTES_PER_READ, value->size()); + size_t size = pSlice->size(); + RecordTick(stats_, BYTES_READ, size); + MeasureTime(stats_, BYTES_PER_READ, size); } return s; } @@ -4100,26 +4101,30 @@ std::vector DBImpl::MultiGet( bool skip_memtable = (read_options.read_tier == kPersistedTier && has_unpersisted_data_); bool done = false; + PinnableSlice pSlice; if (!skip_memtable) { - if (super_version->mem->Get(lkey, value, &s, &merge_context, + if (super_version->mem->Get(lkey, &pSlice, &s, &merge_context, &range_del_agg, read_options)) { + value->assign(pSlice.data(), pSlice.size()); done = true; // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? - } else if (super_version->imm->Get(lkey, value, &s, &merge_context, + } else if (super_version->imm->Get(lkey, &pSlice, &s, &merge_context, &range_del_agg, read_options)) { + value->assign(pSlice.data(), pSlice.size()); done = true; // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? } } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, value, &s, &merge_context, - &range_del_agg); + super_version->current->Get(read_options, lkey, &pSlice, &s, + &merge_context, &range_del_agg); + value->assign(pSlice.data(), pSlice.size()); // TODO(?): RecordTick(stats_, MEMTABLE_MISS)? } if (s.ok()) { - bytes_read += value->size(); + bytes_read += pSlice.size(); } } @@ -4332,7 +4337,12 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options, } ReadOptions roptions = read_options; roptions.read_tier = kBlockCacheTier; // read from block cache only - auto s = GetImpl(roptions, column_family, key, value, value_found); + PinnableSlice pSlice; + PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr; + auto s = GetImpl(roptions, column_family, key, pSlicePtr, value_found); + if (value != nullptr) { + value->assign(pSlice.data(), pSlice.size()); + } // If block_cache is enabled and the index block of the table didn't // not present in block_cache, the return value will be Status::Incomplete. @@ -6395,7 +6405,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, *found_record_for_key = false; // Check if there is a record for this key in the latest memtable - sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, + PinnableSlice* pSliceNullPtr = nullptr; + sv->mem->Get(lkey, pSliceNullPtr, &s, &merge_context, &range_del_agg, seq, read_options); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { @@ -6414,7 +6425,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, } // Check if there is a record for this key in the immutable memtables - sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, + sv->imm->Get(lkey, pSliceNullPtr, &s, &merge_context, &range_del_agg, seq, read_options); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { diff --git a/db/db_impl.h b/db/db_impl.h index dfd5f1482..57aa1354b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -91,7 +91,7 @@ class DBImpl : public DB { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, @@ -1088,7 +1088,7 @@ class DBImpl : public DB { // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value, + const Slice& key, PinnableSlice* pSlice, bool* value_found = nullptr); bool GetIntPropertyInternal(ColumnFamilyData* cfd, diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index f185209c1..47df6c176 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -31,7 +31,7 @@ DBImplReadOnly::~DBImplReadOnly() { // Implementations of the DB interface Status DBImplReadOnly::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { + PinnableSlice* pSlice) { Status s; SequenceNumber snapshot = versions_->LastSequence(); auto cfh = reinterpret_cast(column_family); @@ -40,11 +40,11 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, MergeContext merge_context; RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); LookupKey lkey(key, snapshot); - if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, + if (super_version->mem->Get(lkey, pSlice, &s, &merge_context, &range_del_agg, read_options)) { } else { PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, value, &s, &merge_context, + super_version->current->Get(read_options, lkey, pSlice, &s, &merge_context, &range_del_agg); } return s; diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index a410a4e32..d638b2cde 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -22,7 +22,7 @@ class DBImplReadOnly : public DBImpl { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; // TODO: Implement ReadOnly MultiGet? diff --git a/db/db_test.cc b/db/db_test.cc index d033a0a2e..b59a9b9bd 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2691,7 +2691,7 @@ class ModelDB : public DB { } using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf, - const Slice& key, std::string* value) override { + const Slice& key, PinnableSlice* pSlice) override { return Status::NotSupported(key); } diff --git a/db/memtable.cc b/db/memtable.cc index 23cb3398e..bdf186476 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -520,7 +520,7 @@ struct Saver { const LookupKey* key; bool* found_final_value; // Is value set correctly? Used by KeyMayExist bool* merge_in_progress; - std::string* value; + PinnableSlice* pSlice; SequenceNumber seq; const MergeOperator* merge_operator; // the merge operations encountered; @@ -534,6 +534,10 @@ struct Saver { }; } // namespace +//static void UnrefMemTable(void* s, void*) { +// reinterpret_cast(s)->Unref(); +//} + static bool SaveValue(void* arg, const char* entry) { Saver* s = reinterpret_cast(arg); MergeContext* merge_context = s->merge_context; @@ -571,13 +575,18 @@ static bool SaveValue(void* arg, const char* entry) { } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->status) = Status::OK(); - if (*(s->merge_in_progress)) { - *(s->status) = MergeHelper::TimedFullMerge( - merge_operator, s->key->user_key(), &v, - merge_context->GetOperands(), s->value, s->logger, s->statistics, - s->env_); - } else if (s->value != nullptr) { - s->value->assign(v.data(), v.size()); + if (LIKELY(s->pSlice != nullptr)) { + if (*(s->merge_in_progress)) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), &v, + merge_context->GetOperands(), s->pSlice->GetSelf(), s->logger, + s->statistics, s->env_); + s->pSlice->PinSelf(); + } else { + //s->mem->Ref(); + //s->pSlice->PinSlice(v, UnrefMemTable, s->mem, nullptr); + s->pSlice->PinSelf(v); + } } if (s->inplace_update_support) { s->mem->GetLock(s->key->user_key())->ReadUnlock(); @@ -589,10 +598,14 @@ static bool SaveValue(void* arg, const char* entry) { case kTypeSingleDeletion: case kTypeRangeDeletion: { if (*(s->merge_in_progress)) { - *(s->status) = MergeHelper::TimedFullMerge( - merge_operator, s->key->user_key(), nullptr, - merge_context->GetOperands(), s->value, s->logger, s->statistics, - s->env_); + *(s->status) = Status::OK(); + if (s->pSlice != nullptr) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), nullptr, + merge_context->GetOperands(), s->pSlice->GetSelf(), s->logger, + s->statistics, s->env_); + s->pSlice->PinSelf(); + } } else { *(s->status) = Status::NotFound(); } @@ -626,7 +639,7 @@ static bool SaveValue(void* arg, const char* entry) { return false; } -bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, +bool MemTable::Get(const LookupKey& key, PinnableSlice* pSlice, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts) { @@ -664,7 +677,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.found_final_value = &found_final_value; saver.merge_in_progress = &merge_in_progress; saver.key = &key; - saver.value = value; + saver.pSlice = pSlice; saver.seq = kMaxSequenceNumber; saver.mem = this; saver.merge_context = merge_context; diff --git a/db/memtable.h b/db/memtable.h index a6ad3f243..079c41b85 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -186,16 +186,26 @@ class MemTable { // returned). Otherwise, *seq will be set to kMaxSequenceNumber. // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other // status returned indicates a corruption or other unexpected error. - bool Get(const LookupKey& key, std::string* value, Status* s, + bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts); + inline bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { + SequenceNumber seq; + return Get(key, pSlice, s, merge_context, range_del_agg, &seq, read_opts); + } + + // deprecated. Use Get with PinnableSlice bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts) { - SequenceNumber seq; - return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); - } + SequenceNumber* seq, const ReadOptions& read_opts); + // deprecated. Use Get with PinnableSlice + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts); // Attempts to update the new_value inplace, else does normal Add // Pseudocode diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 7252c9730..b2bf5c5f8 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -100,12 +100,12 @@ int MemTableList::NumFlushed() const { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. // Operands stores the list of merge operations to apply, so far. -bool MemTableListVersion::Get(const LookupKey& key, std::string* value, +bool MemTableListVersion::Get(const LookupKey& key, PinnableSlice* pSlice, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts) { - return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg, + return GetFromList(&memlist_, key, pSlice, s, merge_context, range_del_agg, seq, read_opts); } @@ -115,22 +115,26 @@ bool MemTableListVersion::GetFromHistory(const LookupKey& key, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts) { - return GetFromList(&memlist_history_, key, value, s, merge_context, - range_del_agg, seq, read_opts); + PinnableSlice pSlice; + PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr; + auto res = GetFromList(&memlist_history_, key, pSlicePtr, s, merge_context, + range_del_agg, seq, read_opts); + if (value != nullptr) { + value->assign(pSlice.data(), pSlice.size()); + } + return res; } -bool MemTableListVersion::GetFromList(std::list* list, - const LookupKey& key, std::string* value, - Status* s, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, - SequenceNumber* seq, - const ReadOptions& read_opts) { +bool MemTableListVersion::GetFromList( + std::list* list, const LookupKey& key, PinnableSlice* pSlice, + Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, + SequenceNumber* seq, const ReadOptions& read_opts) { *seq = kMaxSequenceNumber; for (auto& memtable : *list) { SequenceNumber current_seq = kMaxSequenceNumber; - bool done = memtable->Get(key, value, s, merge_context, range_del_agg, + bool done = memtable->Get(key, pSlice, s, merge_context, range_del_agg, ¤t_seq, read_opts); if (*seq == kMaxSequenceNumber) { // Store the most recent sequence number of any operation on this key. diff --git a/db/memtable_list.h b/db/memtable_list.h index 67ef95bd3..341192899 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -53,16 +53,26 @@ class MemTableListVersion { // If any operation was found for this key, its most recent sequence number // will be stored in *seq on success (regardless of whether true/false is // returned). Otherwise, *seq will be set to kMaxSequenceNumber. - bool Get(const LookupKey& key, std::string* value, Status* s, + bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts); + inline bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { + SequenceNumber seq; + return Get(key, pSlice, s, merge_context, range_del_agg, &seq, read_opts); + } + + // deprecated. Use Get with PinnableSlice bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts) { - SequenceNumber seq; - return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); - } + SequenceNumber* seq, const ReadOptions& read_opts); + // deprecated. Use Get with PinnableSlice + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts); // Similar to Get(), but searches the Memtable history of memtables that // have already been flushed. Should only be used from in-memory only @@ -72,10 +82,10 @@ class MemTableListVersion { MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts); - bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, - RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts) { + inline bool GetFromHistory(const LookupKey& key, std::string* value, + Status* s, MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { SequenceNumber seq; return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq, read_opts); @@ -112,7 +122,8 @@ class MemTableListVersion { void TrimHistory(autovector* to_delete); bool GetFromList(std::list* list, const LookupKey& key, - std::string* value, Status* s, MergeContext* merge_context, + PinnableSlice* pSlice, Status* s, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts); diff --git a/db/version_set.cc b/db/version_set.cc index ad57974b3..1cc59d72f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -928,7 +928,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, version_number_(version_number) {} void Version::Get(const ReadOptions& read_options, const LookupKey& k, - std::string* value, Status* status, + PinnableSlice* pSlice, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found, bool* key_exists, SequenceNumber* seq) { @@ -946,7 +946,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context, range_del_agg, this->env_, seq, + pSlice, value_found, merge_context, range_del_agg, this->env_, seq, merge_operator_ ? &pinned_iters_mgr : nullptr); // Pin blocks that we read to hold merge operands @@ -1005,9 +1005,13 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; - *status = MergeHelper::TimedFullMerge(merge_operator_, user_key, nullptr, - merge_context->GetOperands(), value, - info_log_, db_statistics_, env_); + std::string* str_value = pSlice != nullptr ? pSlice->GetSelf() : nullptr; + *status = MergeHelper::TimedFullMerge( + merge_operator_, user_key, nullptr, merge_context->GetOperands(), + str_value, info_log_, db_statistics_, env_); + if (LIKELY(pSlice != nullptr)) { + pSlice->PinSelf(); + } } else { if (key_exists != nullptr) { *key_exists = false; diff --git a/db/version_set.h b/db/version_set.h index 0d7b85e8c..9e6ce8fa0 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -457,7 +457,7 @@ class Version { // for the key if a key was found. // // REQUIRES: lock is not held - void Get(const ReadOptions&, const LookupKey& key, std::string* val, + void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* pSlice, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found = nullptr, bool* key_exists = nullptr, SequenceNumber* seq = nullptr); diff --git a/include/rocksdb/cleanable.h b/include/rocksdb/cleanable.h new file mode 100644 index 000000000..5df585560 --- /dev/null +++ b/include/rocksdb/cleanable.h @@ -0,0 +1,73 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// An iterator yields a sequence of key/value pairs from a source. +// The following class defines the interface. Multiple implementations +// are provided by this library. In particular, iterators are provided +// to access the contents of a Table or a DB. +// +// Multiple threads can invoke const methods on an Iterator without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same Iterator must use +// external synchronization. + +#ifndef INCLUDE_ROCKSDB_CLEANABLE_H_ +#define INCLUDE_ROCKSDB_CLEANABLE_H_ + +namespace rocksdb { + +class Cleanable { + public: + Cleanable(); + ~Cleanable(); + // Clients are allowed to register function/arg1/arg2 triples that + // will be invoked when this iterator is destroyed. + // + // Note that unlike all of the preceding methods, this method is + // not abstract and therefore clients should not override it. + typedef void (*CleanupFunction)(void* arg1, void* arg2); + void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); + void DelegateCleanupsTo(Cleanable* other); + // DoCkeanup and also resets the pointers for reuse + inline void Reset() { + DoCleanup(); + cleanup_.function = nullptr; + cleanup_.next = nullptr; + } + + protected: + struct Cleanup { + CleanupFunction function; + void* arg1; + void* arg2; + Cleanup* next; + }; + Cleanup cleanup_; + // It also becomes the owner of c + void RegisterCleanup(Cleanup* c); + + private: + // Performs all the cleanups. It does not reset the pointers. Making it + // private + // to prevent misuse + inline void DoCleanup() { + if (cleanup_.function != nullptr) { + (*cleanup_.function)(cleanup_.arg1, cleanup_.arg2); + for (Cleanup* c = cleanup_.next; c != nullptr;) { + (*c->function)(c->arg1, c->arg2); + Cleanup* next = c->next; + delete c; + c = next; + } + } + } +}; + +} // namespace rocksdb + +#endif // INCLUDE_ROCKSDB_CLEANABLE_H_ diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 2a0304e47..46f38c912 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -280,9 +280,20 @@ class DB { // a status for which Status::IsNotFound() returns true. // // May return some other Status on an error. + inline Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) { + PinnableSlice pSlice; + PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr; + auto s = Get(options, column_family, key, pSlicePtr); + if (value != nullptr && s.ok()) { + value->assign(pSlice.data(), pSlice.size()); + } + return s; + } virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) = 0; + PinnableSlice* value) = 0; virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) { return Get(options, DefaultColumnFamily(), key, value); } diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 9d17989a7..9bfb0e3d6 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -20,43 +20,12 @@ #define STORAGE_ROCKSDB_INCLUDE_ITERATOR_H_ #include +#include "rocksdb/cleanable.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" namespace rocksdb { -class Cleanable { - public: - Cleanable(); - ~Cleanable(); - // Clients are allowed to register function/arg1/arg2 triples that - // will be invoked when this iterator is destroyed. - // - // Note that unlike all of the preceding methods, this method is - // not abstract and therefore clients should not override it. - typedef void (*CleanupFunction)(void* arg1, void* arg2); - void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); - void DelegateCleanupsTo(Cleanable* other); - // DoCleanup and also resets the pointers for reuse - void Reset(); - - protected: - struct Cleanup { - CleanupFunction function; - void* arg1; - void* arg2; - Cleanup* next; - }; - Cleanup cleanup_; - // It also becomes the owner of c - void RegisterCleanup(Cleanup* c); - - private: - // Performs all the cleanups. It does not reset the pointers. Making it - // private to prevent misuse - inline void DoCleanup(); -}; - class Iterator : public Cleanable { public: Iterator() {} diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index 38d494ed9..167cafaa2 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -25,6 +25,8 @@ #include #include +#include "rocksdb/cleanable.h" + namespace rocksdb { class Slice { @@ -116,6 +118,61 @@ class Slice { // Intentionally copyable }; +class PinnableSlice : public Slice, public Cleanable { + public: + PinnableSlice() {} + + inline void PinSlice(const Slice& s, CleanupFunction f, void* arg1, + void* arg2) { + data_ = s.data(); + size_ = s.size(); + RegisterCleanup(f, arg1, arg2); + } + + inline void PinSlice(const Slice& s, Cleanable* cleanable) { + data_ = s.data(); + size_ = s.size(); + cleanable->DelegateCleanupsTo(this); + } + + inline void PinHeap(const char* s, const size_t n) { + data_ = s; + size_ = n; + RegisterCleanup(ReleaseCharStrHeap, const_cast(data_), nullptr); + } + + inline void PinHeap(std::string* s) { + data_ = s->data(); + size_ = s->size(); + RegisterCleanup(ReleaseStringHeap, s, nullptr); + } + + inline void PinSelf(const Slice& slice) { + self_space.assign(slice.data(), slice.size()); + data_ = self_space.data(); + size_ = self_space.size(); + } + + inline void PinSelf() { + data_ = self_space.data(); + size_ = self_space.size(); + } + + inline std::string* GetSelf() { return &self_space; } + + inline bool IsPinned() { return cleanup_.function != nullptr; } + + private: + friend class PinnableSlice4Test; + std::string self_space; + static void ReleaseCharStrHeap(void* s, void*) { + delete reinterpret_cast(s); + } + static void ReleaseStringHeap(void* s, void*) { + delete reinterpret_cast(s); + } +}; + // A set of Slices that are virtually concatenated together. 'parts' points // to an array of Slices. The number of elements in the array is 'num_parts'. struct SliceParts { diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index a712d1848..efdec00d1 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -56,8 +56,8 @@ class StackableDB : public DB { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override { - return db_->Get(options, column_family, key, value); + PinnableSlice* pSlice) override { + return db_->Get(options, column_family, key, pSlice); } using DB::MultiGet; diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index f85d570b8..86c6dedcd 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1537,9 +1537,6 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, BlockIter iiter; NewIndexIterator(read_options, &iiter); - PinnedIteratorsManager* pinned_iters_mgr = get_context->pinned_iters_mgr(); - bool pin_blocks = pinned_iters_mgr && pinned_iters_mgr->PinningEnabled(); - bool done = false; for (iiter.Seek(key); iiter.Valid() && !done; iiter.Next()) { Slice handle_value = iiter.value(); @@ -1580,17 +1577,12 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, s = Status::Corruption(Slice()); } - if (!get_context->SaveValue(parsed_key, biter.value(), pin_blocks)) { + if (!get_context->SaveValue(parsed_key, biter.value(), &biter)) { done = true; break; } } s = biter.status(); - - if (pin_blocks && get_context->State() == GetContext::kMerge) { - // Pin blocks as long as we are merging - biter.DelegateCleanupsTo(pinned_iters_mgr); - } } } if (s.ok()) { diff --git a/table/cleanable_test.cc b/table/cleanable_test.cc index 717e20ea6..ebf5ce0ae 100644 --- a/table/cleanable_test.cc +++ b/table/cleanable_test.cc @@ -47,6 +47,30 @@ TEST_F(CleanableTest, Register) { } // ~Cleanable ASSERT_EQ(6, res); + + // Test the Reset does cleanup + res = 1; + { + Cleanable c1; + c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2; + c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3; + c1.Reset(); + ASSERT_EQ(6, res); + } + // ~Cleanable + ASSERT_EQ(6, res); + + // Test Clenable is usable after Reset + res = 1; + { + Cleanable c1; + c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2; + c1.Reset(); + ASSERT_EQ(2, res); + c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3; + } + // ~Cleanable + ASSERT_EQ(6, res); } // the first Cleanup is on stack and the rest on heap, @@ -174,6 +198,102 @@ TEST_F(CleanableTest, Delegation) { ASSERT_EQ(5, res); } +class PinnableSlice4Test : public PinnableSlice { + public: + void TestCharStrIsRegistered(char* s) { + ASSERT_EQ(cleanup_.function, ReleaseCharStrHeap); + ASSERT_EQ(cleanup_.arg1, s); + ASSERT_EQ(cleanup_.arg2, nullptr); + ASSERT_EQ(cleanup_.next, nullptr); + } + + void TestStringIsRegistered(std::string* s) { + ASSERT_EQ(cleanup_.function, ReleaseStringHeap); + ASSERT_EQ(cleanup_.arg1, s); + ASSERT_EQ(cleanup_.arg2, nullptr); + ASSERT_EQ(cleanup_.next, nullptr); + } +}; + +// Putting the PinnableSlice tests here due to similarity to Cleanable tests +TEST_F(CleanableTest, PinnableSlice) { + int n2 = 2; + int res = 1; + const std::string const_str = "123"; + const char* const_s = "123"; + + { + PinnableSlice4Test pSlice; + char* s = strdup(const_s); + pSlice.PinHeap(s, strlen(const_s)); + std::string str; + str.assign(pSlice.data(), pSlice.size()); + ASSERT_EQ(const_s, str); + pSlice.TestCharStrIsRegistered(s); + } + + { + PinnableSlice4Test pSlice; + std::string* heap_str = new std::string(const_str); + pSlice.PinHeap(heap_str); + std::string str; + str.assign(pSlice.data(), pSlice.size()); + ASSERT_EQ(const_str, str); + pSlice.TestStringIsRegistered(heap_str); + } + + { + res = 1; + PinnableSlice4Test pSlice; + Slice slice(const_str); + pSlice.PinSlice(slice, Multiplier, &res, &n2); + std::string str; + str.assign(pSlice.data(), pSlice.size()); + ASSERT_EQ(const_str, str); + } + // ~Cleanable + ASSERT_EQ(2, res); + + { + res = 1; + PinnableSlice4Test pSlice; + Slice slice(const_str); + { + Cleanable c1; + c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2; + pSlice.PinSlice(slice, &c1); + } + // ~Cleanable + ASSERT_EQ(1, res); // cleanups must have be delegated to pSlice + std::string str; + str.assign(pSlice.data(), pSlice.size()); + ASSERT_EQ(const_str, str); + } + // ~Cleanable + ASSERT_EQ(2, res); + + { + PinnableSlice4Test pSlice; + Slice slice(const_str); + pSlice.PinSelf(slice); + std::string str; + str.assign(pSlice.data(), pSlice.size()); + ASSERT_EQ(const_str, str); + ASSERT_EQ(false, pSlice.IsPinned()); // self pinned + } + + { + PinnableSlice4Test pSlice; + std::string* self_str_ptr = pSlice.GetSelf(); + self_str_ptr->assign(const_str); + pSlice.PinSelf(); + std::string str; + str.assign(pSlice.data(), pSlice.size()); + ASSERT_EQ(const_str, str); + ASSERT_EQ(false, pSlice.IsPinned()); // self pinned + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/table/cuckoo_table_reader_test.cc b/table/cuckoo_table_reader_test.cc index e207c62bd..5d82d58fc 100644 --- a/table/cuckoo_table_reader_test.cc +++ b/table/cuckoo_table_reader_test.cc @@ -123,12 +123,12 @@ class CuckooReaderTest : public testing::Test { ASSERT_OK(reader.status()); // Assume no merge/deletion for (uint32_t i = 0; i < num_items; ++i) { - std::string value; + PinnableSlice value; GetContext get_context(ucomp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(user_keys[i]), &value, nullptr, nullptr, nullptr, nullptr); ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context)); - ASSERT_EQ(values[i], value); + ASSERT_STREQ(values[i].c_str(), value.data()); } } void UpdateKeys(bool with_zero_seqno) { @@ -333,7 +333,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { AddHashLookups(not_found_user_key, 0, kNumHashFunc); ParsedInternalKey ikey(not_found_user_key, 1000, kTypeValue); AppendInternalKey(¬_found_key, ikey); - std::string value; + PinnableSlice value; GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key), &value, nullptr, nullptr, nullptr, nullptr); @@ -346,6 +346,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { ParsedInternalKey ikey2(not_found_user_key2, 1000, kTypeValue); std::string not_found_key2; AppendInternalKey(¬_found_key2, ikey2); + value.Reset(); GetContext get_context2(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key2), &value, nullptr, nullptr, nullptr, nullptr); @@ -360,6 +361,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { // Add hash values that map to empty buckets. AddHashLookups(ExtractUserKey(unused_key).ToString(), kNumHashFunc, kNumHashFunc); + value.Reset(); GetContext get_context3(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(unused_key), &value, nullptr, nullptr, nullptr, nullptr); @@ -433,12 +435,13 @@ void WriteFile(const std::vector& keys, test::Uint64Comparator(), nullptr); ASSERT_OK(reader.status()); ReadOptions r_options; - std::string value; + PinnableSlice value; // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, nullptr, nullptr, nullptr); for (uint64_t i = 0; i < num; ++i) { + value.Reset(); value.clear(); ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context)); ASSERT_TRUE(Slice(keys[i]) == Slice(&keys[i][0], 4)); @@ -480,7 +483,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) { } std::random_shuffle(keys.begin(), keys.end()); - std::string value; + PinnableSlice value; // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, diff --git a/table/get_context.cc b/table/get_context.cc index 280206c54..383112dc6 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -35,7 +35,7 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { GetContext::GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, - const Slice& user_key, std::string* ret_value, + const Slice& user_key, PinnableSlice* pSlice, bool* value_found, MergeContext* merge_context, RangeDelAggregator* _range_del_agg, Env* env, SequenceNumber* seq, @@ -46,7 +46,7 @@ GetContext::GetContext(const Comparator* ucmp, statistics_(statistics), state_(init_state), user_key_(user_key), - value_(ret_value), + pSlice_(pSlice), value_found_(value_found), merge_context_(merge_context), range_del_agg_(_range_del_agg), @@ -76,13 +76,13 @@ void GetContext::SaveValue(const Slice& value, SequenceNumber seq) { appendToReplayLog(replay_log_, kTypeValue, value); state_ = kFound; - if (value_ != nullptr) { - value_->assign(value.data(), value.size()); + if (LIKELY(pSlice_ != nullptr)) { + pSlice_->PinSelf(value); } } bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, - const Slice& value, bool value_pinned) { + const Slice& value, Cleanable* value_pinner) { assert((state_ != kMerge && parsed_key.type != kTypeMerge) || merge_context_ != nullptr); if (ucmp_->Equal(parsed_key.user_key, user_key_)) { @@ -106,17 +106,22 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(state_ == kNotFound || state_ == kMerge); if (kNotFound == state_) { state_ = kFound; - if (value_ != nullptr) { - value_->assign(value.data(), value.size()); + if (LIKELY(pSlice_ != nullptr)) { + if (LIKELY(value_pinner != nullptr)) { + pSlice_->PinSlice(value, value_pinner); + } else { + pSlice_->PinSelf(value); + } } } else if (kMerge == state_) { assert(merge_operator_ != nullptr); state_ = kFound; - if (value_ != nullptr) { + if (LIKELY(pSlice_ != nullptr)) { Status merge_status = MergeHelper::TimedFullMerge( merge_operator_, user_key_, &value, - merge_context_->GetOperands(), value_, logger_, statistics_, - env_); + merge_context_->GetOperands(), pSlice_->GetSelf(), logger_, + statistics_, env_); + pSlice_->PinSelf(); if (!merge_status.ok()) { state_ = kCorrupt; } @@ -134,12 +139,12 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kDeleted; } else if (kMerge == state_) { state_ = kFound; - if (value_ != nullptr) { - Status merge_status = - MergeHelper::TimedFullMerge(merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), - value_, logger_, statistics_, env_); - + if (LIKELY(pSlice_ != nullptr)) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pSlice_->GetSelf(), logger_, + statistics_, env_); + pSlice_->PinSelf(); if (!merge_status.ok()) { state_ = kCorrupt; } @@ -150,7 +155,14 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, case kTypeMerge: assert(state_ == kNotFound || state_ == kMerge); state_ = kMerge; - merge_context_->PushOperand(value, value_pinned); + // value_pinner is not set from plain_table_reader.cc for example. + if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && + value_pinner != nullptr) { + value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); + merge_context_->PushOperand(value, true /*value_pinned*/); + } else { + merge_context_->PushOperand(value, false); + } return true; default: @@ -166,6 +178,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, void replayGetContextLog(const Slice& replay_log, const Slice& user_key, GetContext* get_context) { #ifndef ROCKSDB_LITE + static Cleanable nonToClean; Slice s = replay_log; while (s.size()) { auto type = static_cast(*s.data()); @@ -178,7 +191,8 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key, // Since SequenceNumber is not stored and unknown, we will use // kMaxSequenceNumber. get_context->SaveValue( - ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, true); + ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, + &nonToClean); } #else // ROCKSDB_LITE assert(false); diff --git a/table/get_context.h b/table/get_context.h index e57c7352c..1ad48174d 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -9,6 +9,7 @@ #include "db/range_del_aggregator.h" #include "rocksdb/env.h" #include "rocksdb/types.h" +#include "table/block.h" namespace rocksdb { class MergeContext; @@ -26,7 +27,7 @@ class GetContext { GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, - const Slice& user_key, std::string* ret_value, bool* value_found, + const Slice& user_key, PinnableSlice* pSlice, bool* value_found, MergeContext* merge_context, RangeDelAggregator* range_del_agg, Env* env, SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr); @@ -39,7 +40,7 @@ class GetContext { // Returns True if more keys need to be read (due to merges) or // False if the complete value has been found. bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value, - bool value_pinned = false); + Cleanable* value_pinner = nullptr); // Simplified version of the previous function. Should only be used when we // know that the operation is a Put. @@ -68,7 +69,7 @@ class GetContext { GetState state_; Slice user_key_; - std::string* value_; + PinnableSlice* pSlice_; bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; RangeDelAggregator* range_del_agg_; diff --git a/table/iterator.cc b/table/iterator.cc index a90c720d6..91f7135c0 100644 --- a/table/iterator.cc +++ b/table/iterator.cc @@ -21,24 +21,6 @@ Cleanable::Cleanable() { Cleanable::~Cleanable() { DoCleanup(); } -void Cleanable::Reset() { - DoCleanup(); - cleanup_.function = nullptr; - cleanup_.next = nullptr; -} - -void Cleanable::DoCleanup() { - if (cleanup_.function != nullptr) { - (*cleanup_.function)(cleanup_.arg1, cleanup_.arg2); - for (Cleanup* c = cleanup_.next; c != nullptr;) { - (*c->function)(c->arg1, c->arg2); - Cleanup* next = c->next; - delete c; - c = next; - } - } -} - // If the entire linked list was on heap we could have simply add attach one // link list to another. However the head is an embeded object to avoid the cost // of creating objects for most of the use cases when the Cleanable has only one diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index 77adb8877..7984cdb1f 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -166,7 +166,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, std::string key = MakeKey(r1, r2, through_db); uint64_t start_time = Now(env, measured_by_nanosecond); if (!through_db) { - std::string value; + PinnableSlice value; MergeContext merge_context; RangeDelAggregator range_del_agg(ikc, {} /* snapshots */); GetContext get_context(ioptions.user_comparator, diff --git a/table/table_test.cc b/table/table_test.cc index 841fd9e02..bade43492 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1968,12 +1968,12 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ASSERT_OK(c3.Reopen(ioptions4)); reader = dynamic_cast(c3.GetTableReader()); ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); - std::string value; + PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr, nullptr); ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context)); - ASSERT_EQ(value, "hello"); + ASSERT_STREQ(value.data(), "hello"); BlockCachePropertiesSnapshot props(options.statistics.get()); props.AssertFilterBlockStat(0, 0); c3.ResetTableReader(); @@ -2051,7 +2051,7 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) { c.Finish(options, ioptions, table_options, GetPlainInternalComparator(options.comparator), &keys, &kvmap); auto reader = c.GetTableReader(); - std::string value; + PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr, nullptr); @@ -2065,13 +2065,14 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) { ASSERT_EQ(perf_context.block_read_count, 1); } ASSERT_EQ(get_context.State(), GetContext::kFound); - ASSERT_EQ(value, "hello"); + ASSERT_STREQ(value.data(), "hello"); // Get non-existing key user_key = "does-not-exist"; internal_key = InternalKey(user_key, 0, kTypeValue); encoded_key = internal_key.Encode().ToString(); + value.Reset(); get_context = GetContext(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr, nullptr); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index a46dc8606..8c1733aa3 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -229,6 +229,8 @@ DEFINE_bool(reverse_iterator, false, DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator"); +DEFINE_bool(pin_slice, false, "use pinnable slice for point lookup"); + DEFINE_int64(batch_size, 1, "Batch size"); static bool ValidateKeySize(const char* flagname, int32_t value) { @@ -3772,6 +3774,7 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); std::string value; + PinnableSlice pSlice; Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { @@ -3787,11 +3790,19 @@ class Benchmark { s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key, &value); } else { - s = db_with_cfh->db->Get(options, key, &value); + if (FLAGS_pin_slice == 1) { + pSlice.Reset(); + s = db_with_cfh->db->Get( + options, db_with_cfh->db->DefaultColumnFamily(), key, &pSlice); + } else { + s = db_with_cfh->db->Get( + options, db_with_cfh->db->DefaultColumnFamily(), key, &value); + } } if (s.ok()) { found++; - bytes += key.size() + value.size(); + bytes += + key.size() + (FLAGS_pin_slice == 1 ? pSlice.size() : value.size()); } else if (!s.IsNotFound()) { fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); abort(); diff --git a/util/testutil.cc b/util/testutil.cc index 8642df295..5670b3cd2 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -12,10 +12,59 @@ #include #include +#include "db/memtable_list.h" #include "port/port.h" #include "util/file_reader_writer.h" namespace rocksdb { + +// These methods are deprecated and only being used in tests for backward +// compatibility +bool MemTableListVersion::Get(const LookupKey& key, std::string* value, + Status* s, MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + SequenceNumber* seq, + const ReadOptions& read_opts) { + PinnableSlice pSlice; + PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr; + auto res = + Get(key, pSlicePtr, s, merge_context, range_del_agg, seq, read_opts); + if (value != nullptr) { + value->assign(pSlice.data(), pSlice.size()); + } + return res; +} + +bool MemTableListVersion::Get(const LookupKey& key, std::string* value, + Status* s, MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { + SequenceNumber seq; + return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); +} + +bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, SequenceNumber* seq, + const ReadOptions& read_opts) { + PinnableSlice pSlice; + PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr; + auto res = + Get(key, pSlicePtr, s, merge_context, range_del_agg, seq, read_opts); + if (value != nullptr) { + value->assign(pSlice.data(), pSlice.size()); + } + return res; +} + +bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { + SequenceNumber seq; + return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); +} + namespace test { Slice RandomString(Random* rnd, int len, std::string* dst) { diff --git a/utilities/document/document_db.cc b/utilities/document/document_db.cc index 85330b123..97913e603 100644 --- a/utilities/document/document_db.cc +++ b/utilities/document/document_db.cc @@ -1039,7 +1039,7 @@ class DocumentDBImpl : public DocumentDB { // RocksDB functions virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override { + PinnableSlice* pSlice) override { return Status::NotSupported(""); } virtual Status Get(const ReadOptions& options, const Slice& key, diff --git a/utilities/ttl/db_ttl_impl.cc b/utilities/ttl/db_ttl_impl.cc index b9edb3cf3..aeea709d5 100644 --- a/utilities/ttl/db_ttl_impl.cc +++ b/utilities/ttl/db_ttl_impl.cc @@ -170,6 +170,17 @@ bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) { return (timestamp_value + ttl) < curtime; } +// Strips the TS from the end of the slice +Status DBWithTTLImpl::StripTS(Slice* slice) { + Status st; + if (slice->size() < kTSLength) { + return Status::Corruption("Bad timestamp in key-value"); + } + // Erasing characters which hold the TS + slice->remove_suffix(kTSLength); + return st; +} + // Strips the TS from the end of the string Status DBWithTTLImpl::StripTS(std::string* str) { Status st; @@ -191,16 +202,16 @@ Status DBWithTTLImpl::Put(const WriteOptions& options, Status DBWithTTLImpl::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { - Status st = db_->Get(options, column_family, key, value); + PinnableSlice* pSlice) { + Status st = db_->Get(options, column_family, key, pSlice); if (!st.ok()) { return st; } - st = SanityCheckTimestamp(*value); + st = SanityCheckTimestamp(*pSlice); if (!st.ok()) { return st; } - return StripTS(value); + return StripTS(pSlice); } std::vector DBWithTTLImpl::MultiGet( diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index a9b445e65..597ad85dc 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -51,7 +51,7 @@ class DBWithTTLImpl : public DBWithTTL { using StackableDB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* pSlice) override; using StackableDB::MultiGet; virtual std::vector MultiGet( @@ -87,6 +87,8 @@ class DBWithTTLImpl : public DBWithTTL { static Status StripTS(std::string* str); + static Status StripTS(Slice* str); + static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8