From 11526252cc0b5e1db82bf30dad7e202616f90dd4 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 13 Mar 2017 11:44:50 -0700 Subject: [PATCH] Pinnableslice (2nd attempt) Summary: PinnableSlice Summary: Currently the point lookup values are copied to a string provided by the user. This incures an extra memcpy cost. This patch allows doing point lookup via a PinnableSlice which pins the source memory location (instead of copying their content) and releases them after the content is consumed by the user. The old API of Get(string) is translated to the new API underneath. Here is the summary for improvements: value 100 byte: 1.8% regular, 1.2% merge values value 1k byte: 11.5% regular, 7.5% merge values value 10k byte: 26% regular, 29.9% merge values The improvement for merge could be more if we extend this approach to pin the merge output and delay the full merge operation until the user actually needs it. We have put that for future work. PS: Sometimes we observe a small decrease in performance when switching from t5452014 to this patch but with the old Get(string) API. The d Closes https://github.com/facebook/rocksdb/pull/1756 Differential Revision: D4391738 Pulled By: maysamyabandeh fbshipit-source-id: 6f3edd3 --- db/compacted_db_impl.cc | 9 ++- db/compacted_db_impl.h | 2 +- db/db_impl.cc | 33 ++++++--- db/db_impl.h | 4 +- db/db_impl_readonly.cc | 12 +-- db/db_impl_readonly.h | 2 +- db/db_test.cc | 2 +- db/version_set.cc | 12 ++- db/version_set.h | 2 +- include/rocksdb/cleanable.h | 73 ++++++++++++++++++ include/rocksdb/db.h | 15 +++- include/rocksdb/iterator.h | 33 +-------- include/rocksdb/slice.h | 77 +++++++++++++++++++ include/rocksdb/utilities/stackable_db.h | 2 +- table/block_based_table_reader.cc | 10 +-- table/cleanable_test.cc | 94 ++++++++++++++++++++++++ table/cuckoo_table_reader_test.cc | 13 ++-- table/get_context.cc | 52 ++++++++----- table/get_context.h | 7 +- table/iterator.cc | 18 ----- table/table_reader_bench.cc | 2 +- table/table_test.cc | 9 ++- tools/db_bench_tool.cc | 16 +++- util/testutil.cc | 1 + utilities/document/document_db.cc | 4 +- utilities/ttl/db_ttl_impl.cc | 13 +++- utilities/ttl/db_ttl_impl.h | 4 +- 27 files changed, 393 insertions(+), 128 deletions(-) create mode 100644 include/rocksdb/cleanable.h diff --git a/db/compacted_db_impl.cc b/db/compacted_db_impl.cc index e6a8a9e38..49ca61e3c 100644 --- a/db/compacted_db_impl.cc +++ b/db/compacted_db_impl.cc @@ -42,8 +42,8 @@ size_t CompactedDBImpl::FindFile(const Slice& key) { return right; } -Status CompactedDBImpl::Get(const ReadOptions& options, - ColumnFamilyHandle*, const Slice& key, std::string* value) { +Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, + const Slice& key, PinnableSlice* value) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, key, value, nullptr, nullptr, nullptr, nullptr); @@ -75,11 +75,14 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, int idx = 0; for (auto* r : reader_list) { if (r != nullptr) { + PinnableSlice pinnable_val; + std::string& value = (*values)[idx]; GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, - GetContext::kNotFound, keys[idx], &(*values)[idx], + GetContext::kNotFound, keys[idx], &pinnable_val, nullptr, nullptr, nullptr, nullptr); LookupKey lkey(keys[idx], kMaxSequenceNumber); r->Get(options, lkey.internal_key(), &get_context); + value.assign(pinnable_val.data(), pinnable_val.size()); if (get_context.State() == GetContext::kFound) { statuses[idx] = Status::OK(); } diff --git a/db/compacted_db_impl.h b/db/compacted_db_impl.h index 6d6d512fd..906ba9579 100644 --- a/db/compacted_db_impl.h +++ b/db/compacted_db_impl.h @@ -23,7 +23,7 @@ class CompactedDBImpl : public DBImpl { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, diff --git a/db/db_impl.cc b/db/db_impl.cc index a6c00fa3c..4158aa08b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3940,7 +3940,7 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { + PinnableSlice* value) { return GetImpl(read_options, column_family, key, value); } @@ -3998,7 +3998,8 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( Status DBImpl::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value, bool* value_found) { + PinnableSlice* pinnable_val, bool* value_found) { + assert(pinnable_val != nullptr); StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); @@ -4046,14 +4047,16 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; if (!skip_memtable) { - if (sv->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, - read_options)) { + if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &range_del_agg, read_options)) { done = true; + pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } else if ((s.ok() || s.IsMergeInProgress()) && - sv->imm->Get(lkey, value, &s, &merge_context, &range_del_agg, - read_options)) { + sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &range_del_agg, read_options)) { done = true; + pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } if (!done && !s.ok() && !s.IsMergeInProgress()) { @@ -4062,7 +4065,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); - sv->current->Get(read_options, lkey, value, &s, &merge_context, + sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, &range_del_agg, value_found); RecordTick(stats_, MEMTABLE_MISS); } @@ -4073,8 +4076,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, ReturnAndCleanupSuperVersion(cfd, sv); RecordTick(stats_, NUMBER_KEYS_READ); - RecordTick(stats_, BYTES_READ, value->size()); - MeasureTime(stats_, BYTES_PER_READ, value->size()); + size_t size = pinnable_val->size(); + RecordTick(stats_, BYTES_READ, size); + MeasureTime(stats_, BYTES_PER_READ, size); } return s; } @@ -4163,9 +4167,11 @@ std::vector DBImpl::MultiGet( } } if (!done) { + PinnableSlice pinnable_val; PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, value, &s, &merge_context, - &range_del_agg); + super_version->current->Get(read_options, lkey, &pinnable_val, &s, + &merge_context, &range_del_agg); + value->assign(pinnable_val.data(), pinnable_val.size()); // TODO(?): RecordTick(stats_, MEMTABLE_MISS)? } @@ -4377,13 +4383,16 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { bool DBImpl::KeyMayExist(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found) { + assert(value != nullptr); if (value_found != nullptr) { // falsify later if key-may-exist but can't fetch value *value_found = true; } ReadOptions roptions = read_options; roptions.read_tier = kBlockCacheTier; // read from block cache only - auto s = GetImpl(roptions, column_family, key, value, value_found); + PinnableSlice pinnable_val; + auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found); + value->assign(pinnable_val.data(), pinnable_val.size()); // If block_cache is enabled and the index block of the table didn't // not present in block_cache, the return value will be Status::Incomplete. diff --git a/db/db_impl.h b/db/db_impl.h index 54562f7e9..3ae37adae 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -91,7 +91,7 @@ class DBImpl : public DB { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, @@ -1104,7 +1104,7 @@ class DBImpl : public DB { // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value, + const Slice& key, PinnableSlice* value, bool* value_found = nullptr); bool GetIntPropertyInternal(ColumnFamilyData* cfd, diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index f185209c1..f92ee7c3c 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -31,7 +31,8 @@ DBImplReadOnly::~DBImplReadOnly() { // Implementations of the DB interface Status DBImplReadOnly::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { + PinnableSlice* pinnable_val) { + assert(pinnable_val != nullptr); Status s; SequenceNumber snapshot = versions_->LastSequence(); auto cfh = reinterpret_cast(column_family); @@ -40,12 +41,13 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, MergeContext merge_context; RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); LookupKey lkey(key, snapshot); - if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, - read_options)) { + if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &range_del_agg, read_options)) { + pinnable_val->PinSelf(); } else { PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, value, &s, &merge_context, - &range_del_agg); + super_version->current->Get(read_options, lkey, pinnable_val, &s, + &merge_context, &range_del_agg); } return s; } diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 56d207e6d..b46652060 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -22,7 +22,7 @@ class DBImplReadOnly : public DBImpl { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; // TODO: Implement ReadOnly MultiGet? diff --git a/db/db_test.cc b/db/db_test.cc index 0083b37b8..d282de7c0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2212,7 +2212,7 @@ class ModelDB : public DB { } using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf, - const Slice& key, std::string* value) override { + const Slice& key, PinnableSlice* value) override { return Status::NotSupported(key); } diff --git a/db/version_set.cc b/db/version_set.cc index 31606c396..42aeac039 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -927,7 +927,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, version_number_(version_number) {} void Version::Get(const ReadOptions& read_options, const LookupKey& k, - std::string* value, Status* status, + PinnableSlice* value, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found, bool* key_exists, SequenceNumber* seq) { @@ -1004,9 +1004,13 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; - *status = MergeHelper::TimedFullMerge(merge_operator_, user_key, nullptr, - merge_context->GetOperands(), value, - info_log_, db_statistics_, env_); + std::string* str_value = value != nullptr ? value->GetSelf() : nullptr; + *status = MergeHelper::TimedFullMerge( + merge_operator_, user_key, nullptr, merge_context->GetOperands(), + str_value, info_log_, db_statistics_, env_); + if (LIKELY(value != nullptr)) { + value->PinSelf(); + } } else { if (key_exists != nullptr) { *key_exists = false; diff --git a/db/version_set.h b/db/version_set.h index bcf6951a7..818a86fea 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -477,7 +477,7 @@ class Version { // for the key if a key was found. // // REQUIRES: lock is not held - void Get(const ReadOptions&, const LookupKey& key, std::string* val, + void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found = nullptr, bool* key_exists = nullptr, SequenceNumber* seq = nullptr); diff --git a/include/rocksdb/cleanable.h b/include/rocksdb/cleanable.h new file mode 100644 index 000000000..5df585560 --- /dev/null +++ b/include/rocksdb/cleanable.h @@ -0,0 +1,73 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// An iterator yields a sequence of key/value pairs from a source. +// The following class defines the interface. Multiple implementations +// are provided by this library. In particular, iterators are provided +// to access the contents of a Table or a DB. +// +// Multiple threads can invoke const methods on an Iterator without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same Iterator must use +// external synchronization. + +#ifndef INCLUDE_ROCKSDB_CLEANABLE_H_ +#define INCLUDE_ROCKSDB_CLEANABLE_H_ + +namespace rocksdb { + +class Cleanable { + public: + Cleanable(); + ~Cleanable(); + // Clients are allowed to register function/arg1/arg2 triples that + // will be invoked when this iterator is destroyed. + // + // Note that unlike all of the preceding methods, this method is + // not abstract and therefore clients should not override it. + typedef void (*CleanupFunction)(void* arg1, void* arg2); + void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); + void DelegateCleanupsTo(Cleanable* other); + // DoCkeanup and also resets the pointers for reuse + inline void Reset() { + DoCleanup(); + cleanup_.function = nullptr; + cleanup_.next = nullptr; + } + + protected: + struct Cleanup { + CleanupFunction function; + void* arg1; + void* arg2; + Cleanup* next; + }; + Cleanup cleanup_; + // It also becomes the owner of c + void RegisterCleanup(Cleanup* c); + + private: + // Performs all the cleanups. It does not reset the pointers. Making it + // private + // to prevent misuse + inline void DoCleanup() { + if (cleanup_.function != nullptr) { + (*cleanup_.function)(cleanup_.arg1, cleanup_.arg2); + for (Cleanup* c = cleanup_.next; c != nullptr;) { + (*c->function)(c->arg1, c->arg2); + Cleanup* next = c->next; + delete c; + c = next; + } + } + } +}; + +} // namespace rocksdb + +#endif // INCLUDE_ROCKSDB_CLEANABLE_H_ diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index cdbb8abf1..e74f7adef 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -16,6 +16,7 @@ #include #include #include +#include "port/likely.h" #include "rocksdb/iterator.h" #include "rocksdb/listener.h" #include "rocksdb/metadata.h" @@ -280,9 +281,21 @@ class DB { // a status for which Status::IsNotFound() returns true. // // May return some other Status on an error. + inline Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) { + assert(value != nullptr); + PinnableSlice pinnable_val(value); + assert(!pinnable_val.IsPinned()); + auto s = Get(options, column_family, key, &pinnable_val); + if (LIKELY(s.ok()) && pinnable_val.IsPinned()) { + value->assign(pinnable_val.data(), pinnable_val.size()); + } // else value is already assigned + return s; + } virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) = 0; + PinnableSlice* value) = 0; virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) { return Get(options, DefaultColumnFamily(), key, value); } diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 9d17989a7..9bfb0e3d6 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -20,43 +20,12 @@ #define STORAGE_ROCKSDB_INCLUDE_ITERATOR_H_ #include +#include "rocksdb/cleanable.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" namespace rocksdb { -class Cleanable { - public: - Cleanable(); - ~Cleanable(); - // Clients are allowed to register function/arg1/arg2 triples that - // will be invoked when this iterator is destroyed. - // - // Note that unlike all of the preceding methods, this method is - // not abstract and therefore clients should not override it. - typedef void (*CleanupFunction)(void* arg1, void* arg2); - void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); - void DelegateCleanupsTo(Cleanable* other); - // DoCleanup and also resets the pointers for reuse - void Reset(); - - protected: - struct Cleanup { - CleanupFunction function; - void* arg1; - void* arg2; - Cleanup* next; - }; - Cleanup cleanup_; - // It also becomes the owner of c - void RegisterCleanup(Cleanup* c); - - private: - // Performs all the cleanups. It does not reset the pointers. Making it - // private to prevent misuse - inline void DoCleanup(); -}; - class Iterator : public Cleanable { public: Iterator() {} diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index 38d494ed9..2c858a506 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -25,6 +25,8 @@ #include #include +#include "rocksdb/cleanable.h" + namespace rocksdb { class Slice { @@ -116,6 +118,81 @@ class Slice { // Intentionally copyable }; +/** + * A Slice that can be pinned with some cleanup tasks, which will be run upon + * ::Reset() or object destruction, whichever is invoked first. This can be used + * to avoid memcpy by having the PinnsableSlice object referring to the data + * that is locked in the memory and release them after the data is consuned. + */ +class PinnableSlice : public Slice, public Cleanable { + public: + PinnableSlice() { buf_ = &self_space_; } + explicit PinnableSlice(std::string* buf) { buf_ = buf; } + + inline void PinSlice(const Slice& s, CleanupFunction f, void* arg1, + void* arg2) { + assert(!pinned_); + pinned_ = true; + data_ = s.data(); + size_ = s.size(); + RegisterCleanup(f, arg1, arg2); + assert(pinned_); + } + + inline void PinSlice(const Slice& s, Cleanable* cleanable) { + assert(!pinned_); + pinned_ = true; + data_ = s.data(); + size_ = s.size(); + cleanable->DelegateCleanupsTo(this); + assert(pinned_); + } + + inline void PinSelf(const Slice& slice) { + assert(!pinned_); + buf_->assign(slice.data(), slice.size()); + data_ = buf_->data(); + size_ = buf_->size(); + assert(!pinned_); + } + + inline void PinSelf() { + assert(!pinned_); + data_ = buf_->data(); + size_ = buf_->size(); + assert(!pinned_); + } + + void remove_suffix(size_t n) { + assert(n <= size()); + if (pinned_) { + size_ -= n; + } else { + buf_->erase(size() - n, n); + PinSelf(); + } + } + + void remove_prefix(size_t n) { + assert(0); // Not implemented + } + + void Reset() { + Cleanable::Reset(); + pinned_ = false; + } + + inline std::string* GetSelf() { return buf_; } + + inline bool IsPinned() { return pinned_; } + + private: + friend class PinnableSlice4Test; + std::string self_space_; + std::string* buf_; + bool pinned_ = false; +}; + // A set of Slices that are virtually concatenated together. 'parts' points // to an array of Slices. The number of elements in the array is 'num_parts'. struct SliceParts { diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index eca800c09..c61e1d9dc 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -56,7 +56,7 @@ class StackableDB : public DB { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override { + PinnableSlice* value) override { return db_->Get(options, column_family, key, value); } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 6efba78d0..f60a8c4af 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1470,9 +1470,6 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, iiter_unique_ptr = std::unique_ptr(iiter); } - PinnedIteratorsManager* pinned_iters_mgr = get_context->pinned_iters_mgr(); - bool pin_blocks = pinned_iters_mgr && pinned_iters_mgr->PinningEnabled(); - bool done = false; for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { Slice handle_value = iiter->value(); @@ -1513,17 +1510,12 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, s = Status::Corruption(Slice()); } - if (!get_context->SaveValue(parsed_key, biter.value(), pin_blocks)) { + if (!get_context->SaveValue(parsed_key, biter.value(), &biter)) { done = true; break; } } s = biter.status(); - - if (pin_blocks && get_context->State() == GetContext::kMerge) { - // Pin blocks as long as we are merging - biter.DelegateCleanupsTo(pinned_iters_mgr); - } } } if (s.ok()) { diff --git a/table/cleanable_test.cc b/table/cleanable_test.cc index 717e20ea6..631586ad0 100644 --- a/table/cleanable_test.cc +++ b/table/cleanable_test.cc @@ -47,6 +47,30 @@ TEST_F(CleanableTest, Register) { } // ~Cleanable ASSERT_EQ(6, res); + + // Test the Reset does cleanup + res = 1; + { + Cleanable c1; + c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2; + c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3; + c1.Reset(); + ASSERT_EQ(6, res); + } + // ~Cleanable + ASSERT_EQ(6, res); + + // Test Clenable is usable after Reset + res = 1; + { + Cleanable c1; + c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2; + c1.Reset(); + ASSERT_EQ(2, res); + c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3; + } + // ~Cleanable + ASSERT_EQ(6, res); } // the first Cleanup is on stack and the rest on heap, @@ -174,6 +198,76 @@ TEST_F(CleanableTest, Delegation) { ASSERT_EQ(5, res); } +static void ReleaseStringHeap(void* s, void*) { + delete reinterpret_cast(s); +} + +class PinnableSlice4Test : public PinnableSlice { + public: + void TestStringIsRegistered(std::string* s) { + ASSERT_TRUE(cleanup_.function == ReleaseStringHeap); + ASSERT_EQ(cleanup_.arg1, s); + ASSERT_EQ(cleanup_.arg2, nullptr); + ASSERT_EQ(cleanup_.next, nullptr); + } +}; + +// Putting the PinnableSlice tests here due to similarity to Cleanable tests +TEST_F(CleanableTest, PinnableSlice) { + int n2 = 2; + int res = 1; + const std::string const_str = "123"; + + { + res = 1; + PinnableSlice4Test value; + Slice slice(const_str); + value.PinSlice(slice, Multiplier, &res, &n2); + std::string str; + str.assign(value.data(), value.size()); + ASSERT_EQ(const_str, str); + } + // ~Cleanable + ASSERT_EQ(2, res); + + { + res = 1; + PinnableSlice4Test value; + Slice slice(const_str); + { + Cleanable c1; + c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2; + value.PinSlice(slice, &c1); + } + // ~Cleanable + ASSERT_EQ(1, res); // cleanups must have be delegated to value + std::string str; + str.assign(value.data(), value.size()); + ASSERT_EQ(const_str, str); + } + // ~Cleanable + ASSERT_EQ(2, res); + + { + PinnableSlice4Test value; + Slice slice(const_str); + value.PinSelf(slice); + std::string str; + str.assign(value.data(), value.size()); + ASSERT_EQ(const_str, str); + } + + { + PinnableSlice4Test value; + std::string* self_str_ptr = value.GetSelf(); + self_str_ptr->assign(const_str); + value.PinSelf(); + std::string str; + str.assign(value.data(), value.size()); + ASSERT_EQ(const_str, str); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/table/cuckoo_table_reader_test.cc b/table/cuckoo_table_reader_test.cc index e207c62bd..5d82d58fc 100644 --- a/table/cuckoo_table_reader_test.cc +++ b/table/cuckoo_table_reader_test.cc @@ -123,12 +123,12 @@ class CuckooReaderTest : public testing::Test { ASSERT_OK(reader.status()); // Assume no merge/deletion for (uint32_t i = 0; i < num_items; ++i) { - std::string value; + PinnableSlice value; GetContext get_context(ucomp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(user_keys[i]), &value, nullptr, nullptr, nullptr, nullptr); ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context)); - ASSERT_EQ(values[i], value); + ASSERT_STREQ(values[i].c_str(), value.data()); } } void UpdateKeys(bool with_zero_seqno) { @@ -333,7 +333,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { AddHashLookups(not_found_user_key, 0, kNumHashFunc); ParsedInternalKey ikey(not_found_user_key, 1000, kTypeValue); AppendInternalKey(¬_found_key, ikey); - std::string value; + PinnableSlice value; GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key), &value, nullptr, nullptr, nullptr, nullptr); @@ -346,6 +346,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { ParsedInternalKey ikey2(not_found_user_key2, 1000, kTypeValue); std::string not_found_key2; AppendInternalKey(¬_found_key2, ikey2); + value.Reset(); GetContext get_context2(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key2), &value, nullptr, nullptr, nullptr, nullptr); @@ -360,6 +361,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { // Add hash values that map to empty buckets. AddHashLookups(ExtractUserKey(unused_key).ToString(), kNumHashFunc, kNumHashFunc); + value.Reset(); GetContext get_context3(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(unused_key), &value, nullptr, nullptr, nullptr, nullptr); @@ -433,12 +435,13 @@ void WriteFile(const std::vector& keys, test::Uint64Comparator(), nullptr); ASSERT_OK(reader.status()); ReadOptions r_options; - std::string value; + PinnableSlice value; // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, nullptr, nullptr, nullptr); for (uint64_t i = 0; i < num; ++i) { + value.Reset(); value.clear(); ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context)); ASSERT_TRUE(Slice(keys[i]) == Slice(&keys[i][0], 4)); @@ -480,7 +483,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) { } std::random_shuffle(keys.begin(), keys.end()); - std::string value; + PinnableSlice value; // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, diff --git a/table/get_context.cc b/table/get_context.cc index 280206c54..4b517a50b 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -35,7 +35,7 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { GetContext::GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, - const Slice& user_key, std::string* ret_value, + const Slice& user_key, PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, RangeDelAggregator* _range_del_agg, Env* env, SequenceNumber* seq, @@ -46,7 +46,7 @@ GetContext::GetContext(const Comparator* ucmp, statistics_(statistics), state_(init_state), user_key_(user_key), - value_(ret_value), + pinnable_val_(pinnable_val), value_found_(value_found), merge_context_(merge_context), range_del_agg_(_range_del_agg), @@ -76,13 +76,13 @@ void GetContext::SaveValue(const Slice& value, SequenceNumber seq) { appendToReplayLog(replay_log_, kTypeValue, value); state_ = kFound; - if (value_ != nullptr) { - value_->assign(value.data(), value.size()); + if (LIKELY(pinnable_val_ != nullptr)) { + pinnable_val_->PinSelf(value); } } bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, - const Slice& value, bool value_pinned) { + const Slice& value, Cleanable* value_pinner) { assert((state_ != kMerge && parsed_key.type != kTypeMerge) || merge_context_ != nullptr); if (ucmp_->Equal(parsed_key.user_key, user_key_)) { @@ -106,17 +106,24 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(state_ == kNotFound || state_ == kMerge); if (kNotFound == state_) { state_ = kFound; - if (value_ != nullptr) { - value_->assign(value.data(), value.size()); + if (LIKELY(pinnable_val_ != nullptr)) { + if (LIKELY(value_pinner != nullptr)) { + // If the backing resources for the value are provided, pin them + pinnable_val_->PinSlice(value, value_pinner); + } else { + // Otherwise copy the value + pinnable_val_->PinSelf(value); + } } } else if (kMerge == state_) { assert(merge_operator_ != nullptr); state_ = kFound; - if (value_ != nullptr) { + if (LIKELY(pinnable_val_ != nullptr)) { Status merge_status = MergeHelper::TimedFullMerge( merge_operator_, user_key_, &value, - merge_context_->GetOperands(), value_, logger_, statistics_, - env_); + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); if (!merge_status.ok()) { state_ = kCorrupt; } @@ -134,12 +141,12 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kDeleted; } else if (kMerge == state_) { state_ = kFound; - if (value_ != nullptr) { - Status merge_status = - MergeHelper::TimedFullMerge(merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), - value_, logger_, statistics_, env_); - + if (LIKELY(pinnable_val_ != nullptr)) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); if (!merge_status.ok()) { state_ = kCorrupt; } @@ -150,7 +157,14 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, case kTypeMerge: assert(state_ == kNotFound || state_ == kMerge); state_ = kMerge; - merge_context_->PushOperand(value, value_pinned); + // value_pinner is not set from plain_table_reader.cc for example. + if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && + value_pinner != nullptr) { + value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); + merge_context_->PushOperand(value, true /*value_pinned*/); + } else { + merge_context_->PushOperand(value, false); + } return true; default: @@ -166,6 +180,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, void replayGetContextLog(const Slice& replay_log, const Slice& user_key, GetContext* get_context) { #ifndef ROCKSDB_LITE + static Cleanable nonToClean; Slice s = replay_log; while (s.size()) { auto type = static_cast(*s.data()); @@ -178,7 +193,8 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key, // Since SequenceNumber is not stored and unknown, we will use // kMaxSequenceNumber. get_context->SaveValue( - ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, true); + ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, + &nonToClean); } #else // ROCKSDB_LITE assert(false); diff --git a/table/get_context.h b/table/get_context.h index e57c7352c..9f65156f7 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -9,6 +9,7 @@ #include "db/range_del_aggregator.h" #include "rocksdb/env.h" #include "rocksdb/types.h" +#include "table/block.h" namespace rocksdb { class MergeContext; @@ -26,7 +27,7 @@ class GetContext { GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, - const Slice& user_key, std::string* ret_value, bool* value_found, + const Slice& user_key, PinnableSlice* value, bool* value_found, MergeContext* merge_context, RangeDelAggregator* range_del_agg, Env* env, SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr); @@ -39,7 +40,7 @@ class GetContext { // Returns True if more keys need to be read (due to merges) or // False if the complete value has been found. bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value, - bool value_pinned = false); + Cleanable* value_pinner = nullptr); // Simplified version of the previous function. Should only be used when we // know that the operation is a Put. @@ -68,7 +69,7 @@ class GetContext { GetState state_; Slice user_key_; - std::string* value_; + PinnableSlice* pinnable_val_; bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; RangeDelAggregator* range_del_agg_; diff --git a/table/iterator.cc b/table/iterator.cc index a90c720d6..91f7135c0 100644 --- a/table/iterator.cc +++ b/table/iterator.cc @@ -21,24 +21,6 @@ Cleanable::Cleanable() { Cleanable::~Cleanable() { DoCleanup(); } -void Cleanable::Reset() { - DoCleanup(); - cleanup_.function = nullptr; - cleanup_.next = nullptr; -} - -void Cleanable::DoCleanup() { - if (cleanup_.function != nullptr) { - (*cleanup_.function)(cleanup_.arg1, cleanup_.arg2); - for (Cleanup* c = cleanup_.next; c != nullptr;) { - (*c->function)(c->arg1, c->arg2); - Cleanup* next = c->next; - delete c; - c = next; - } - } -} - // If the entire linked list was on heap we could have simply add attach one // link list to another. However the head is an embeded object to avoid the cost // of creating objects for most of the use cases when the Cleanable has only one diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index 77adb8877..7984cdb1f 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -166,7 +166,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, std::string key = MakeKey(r1, r2, through_db); uint64_t start_time = Now(env, measured_by_nanosecond); if (!through_db) { - std::string value; + PinnableSlice value; MergeContext merge_context; RangeDelAggregator range_del_agg(ikc, {} /* snapshots */); GetContext get_context(ioptions.user_comparator, diff --git a/table/table_test.cc b/table/table_test.cc index 4c2fe43ad..4a3b049f6 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1994,12 +1994,12 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ASSERT_OK(c3.Reopen(ioptions4)); reader = dynamic_cast(c3.GetTableReader()); ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); - std::string value; + PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr, nullptr); ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context)); - ASSERT_EQ(value, "hello"); + ASSERT_STREQ(value.data(), "hello"); BlockCachePropertiesSnapshot props(options.statistics.get()); props.AssertFilterBlockStat(0, 0); c3.ResetTableReader(); @@ -2077,7 +2077,7 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) { c.Finish(options, ioptions, table_options, GetPlainInternalComparator(options.comparator), &keys, &kvmap); auto reader = c.GetTableReader(); - std::string value; + PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr, nullptr); @@ -2091,13 +2091,14 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) { ASSERT_EQ(perf_context.block_read_count, 1); } ASSERT_EQ(get_context.State(), GetContext::kFound); - ASSERT_EQ(value, "hello"); + ASSERT_STREQ(value.data(), "hello"); // Get non-existing key user_key = "does-not-exist"; internal_key = InternalKey(user_key, 0, kTypeValue); encoded_key = internal_key.Encode().ToString(); + value.Reset(); get_context = GetContext(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr, nullptr); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index fd7c3c63f..faa60d38b 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -230,6 +230,8 @@ DEFINE_bool(reverse_iterator, false, DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator"); +DEFINE_bool(pin_slice, true, "use pinnable slice for point lookup"); + DEFINE_int64(batch_size, 1, "Batch size"); static bool ValidateKeySize(const char* flagname, int32_t value) { @@ -3821,6 +3823,7 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); std::string value; + PinnableSlice pinnable_val; Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { @@ -3836,11 +3839,20 @@ class Benchmark { s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key, &value); } else { - s = db_with_cfh->db->Get(options, key, &value); + if (LIKELY(FLAGS_pin_slice == 1)) { + pinnable_val.Reset(); + s = db_with_cfh->db->Get(options, + db_with_cfh->db->DefaultColumnFamily(), key, + &pinnable_val); + } else { + s = db_with_cfh->db->Get( + options, db_with_cfh->db->DefaultColumnFamily(), key, &value); + } } if (s.ok()) { found++; - bytes += key.size() + value.size(); + bytes += key.size() + + (FLAGS_pin_slice == 1 ? pinnable_val.size() : value.size()); } else if (!s.IsNotFound()) { fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); abort(); diff --git a/util/testutil.cc b/util/testutil.cc index 7b432ab59..fef81f406 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -12,6 +12,7 @@ #include #include +#include "db/memtable_list.h" #include "port/port.h" #include "util/file_reader_writer.h" diff --git a/utilities/document/document_db.cc b/utilities/document/document_db.cc index 85330b123..dacf58205 100644 --- a/utilities/document/document_db.cc +++ b/utilities/document/document_db.cc @@ -826,7 +826,7 @@ class DocumentDBImpl : public DocumentDB { // Lock now, since we're starting DB operations MutexLock l(&write_mutex_); // check if there is already a document with the same primary key - std::string value; + PinnableSlice value; Status s = DocumentDB::Get(ReadOptions(), primary_key_column_family_, primary_key_slice, &value); if (!s.IsNotFound()) { @@ -1039,7 +1039,7 @@ class DocumentDBImpl : public DocumentDB { // RocksDB functions virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override { + PinnableSlice* value) override { return Status::NotSupported(""); } virtual Status Get(const ReadOptions& options, const Slice& key, diff --git a/utilities/ttl/db_ttl_impl.cc b/utilities/ttl/db_ttl_impl.cc index b9edb3cf3..975fbf387 100644 --- a/utilities/ttl/db_ttl_impl.cc +++ b/utilities/ttl/db_ttl_impl.cc @@ -170,6 +170,17 @@ bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) { return (timestamp_value + ttl) < curtime; } +// Strips the TS from the end of the slice +Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) { + Status st; + if (pinnable_val->size() < kTSLength) { + return Status::Corruption("Bad timestamp in key-value"); + } + // Erasing characters which hold the TS + pinnable_val->remove_suffix(kTSLength); + return st; +} + // Strips the TS from the end of the string Status DBWithTTLImpl::StripTS(std::string* str) { Status st; @@ -191,7 +202,7 @@ Status DBWithTTLImpl::Put(const WriteOptions& options, Status DBWithTTLImpl::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { + PinnableSlice* value) { Status st = db_->Get(options, column_family, key, value); if (!st.ok()) { return st; diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index a9b445e65..1023b87bd 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -51,7 +51,7 @@ class DBWithTTLImpl : public DBWithTTL { using StackableDB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; using StackableDB::MultiGet; virtual std::vector MultiGet( @@ -87,6 +87,8 @@ class DBWithTTLImpl : public DBWithTTL { static Status StripTS(std::string* str); + static Status StripTS(PinnableSlice* str); + static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8