From 1ea79a78c9f08e6fa2faa29b9a0f44353f8ad03c Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Sun, 13 Nov 2016 18:58:17 -0800 Subject: [PATCH] Optimize sequential insert into memtable - Part 1: Interface Summary: Currently our skip-list have an optimization to speedup sequential inserts from a single stream, by remembering the last insert position. We extend the idea to support sequential inserts from multiple streams, and even tolerate small reordering wihtin each stream. This PR is the interface part adding the following: - Add `memtable_insert_prefix_extractor` to allow specifying prefix for each key. - Add `InsertWithHint()` interface to memtable, to allow underlying implementation to return a hint of insert position, which can be later pass back to optimize inserts. - Memtable will maintain a map from prefix to hints and pass the hint via `InsertWithHint()` if `memtable_insert_prefix_extractor` is non-null. Closes https://github.com/facebook/rocksdb/pull/1419 Differential Revision: D4079367 Pulled By: yiwu-arbug fbshipit-source-id: 3555326 --- CMakeLists.txt | 1 + HISTORY.md | 3 +- Makefile | 4 + db/column_family.cc | 5 ++ db/db_memtable_test.cc | 160 ++++++++++++++++++++++++++++++++++ db/memtable.cc | 18 +++- db/memtable.h | 8 ++ include/rocksdb/memtablerep.h | 8 ++ include/rocksdb/options.h | 24 +++++ memtable/skiplistrep.cc | 6 ++ util/cf_options.cc | 4 +- util/cf_options.h | 2 + util/hash.h | 5 ++ util/options.cc | 7 ++ util/options_helper.h | 5 ++ util/options_settable_test.cc | 6 ++ 16 files changed, 262 insertions(+), 4 deletions(-) create mode 100644 db/db_memtable_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 97e74def1..72b630114 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -496,6 +496,7 @@ set(TESTS db/db_inplace_update_test.cc db/db_iter_test.cc db/db_log_iter_test.cc + db/db_memtable_test.cc db/db_options_test.cc db/db_properties_test.cc db/db_table_properties_test.cc diff --git a/HISTORY.md b/HISTORY.md index e13bb074e..1dc303d02 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,7 +5,8 @@ * Suppor dynamically change `delayed_write_rate` option via SetDBOptions(). ### New Features -* Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable. +* Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable via SetDBOptions(). +* Add memtable_insert_with_hint_prefix_extractor option. The option is mean to reduce CPU usage for inserting keys into memtable, if keys can be group by prefix and insert for each prefix are sequential or almost sequential. See include/rocksdb/options.h for more details. ## 4.13.0 (10/18/2016) ### Public API Change diff --git a/Makefile b/Makefile index 9abc7eb6e..00cbf69cc 100644 --- a/Makefile +++ b/Makefile @@ -276,6 +276,7 @@ TESTS = \ db_flush_test \ db_inplace_update_test \ db_iterator_test \ + db_memtable_test \ db_options_test \ db_sst_test \ external_sst_file_test \ @@ -955,6 +956,9 @@ db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJEC db_iterator_test: db/db_iterator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/column_family.cc b/db/column_family.cc index ed0096c41..ee69e05ba 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -270,6 +270,11 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, result.max_compaction_bytes = result.target_file_size_base * 25; } + // Insert into memtable with hint is incompatible with concurrent inserts. + if (db_options.allow_concurrent_memtable_write) { + result.memtable_insert_with_hint_prefix_extractor = nullptr; + } + return result; } diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc new file mode 100644 index 000000000..19b6a63bc --- /dev/null +++ b/db/db_memtable_test.cc @@ -0,0 +1,160 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include + +#include "db/db_test_util.h" +#include "db/memtable.h" +#include "port/stack_trace.h" +#include "rocksdb/memtablerep.h" +#include "rocksdb/slice_transform.h" + +namespace rocksdb { + +class DBMemTableTest : public DBTestBase { + public: + DBMemTableTest() : DBTestBase("/db_memtable_test") {} +}; + +class MockMemTableRep : public MemTableRep { + public: + explicit MockMemTableRep(MemTableAllocator* allocator, MemTableRep* rep) + : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {} + + virtual KeyHandle Allocate(const size_t len, char** buf) override { + return rep_->Allocate(len, buf); + } + + virtual void Insert(KeyHandle handle) override { + return rep_->Insert(handle); + } + + virtual void InsertWithHint(KeyHandle handle, void** hint) override { + num_insert_with_hint_++; + ASSERT_NE(nullptr, hint); + last_hint_in_ = *hint; + rep_->InsertWithHint(handle, hint); + last_hint_out_ = *hint; + } + + virtual bool Contains(const char* key) const override { + return rep_->Contains(key); + } + + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override { + rep_->Get(k, callback_args, callback_func); + } + + virtual size_t ApproximateMemoryUsage() override { + return rep_->ApproximateMemoryUsage(); + } + + virtual Iterator* GetIterator(Arena* arena) override { + return rep_->GetIterator(arena); + } + + void* last_hint_in() { return last_hint_in_; } + void* last_hint_out() { return last_hint_out_; } + int num_insert_with_hint() { return num_insert_with_hint_; } + + private: + std::unique_ptr rep_; + void* last_hint_in_; + void* last_hint_out_; + int num_insert_with_hint_; +}; + +class MockMemTableRepFactory : public MemTableRepFactory { + public: + virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp, + MemTableAllocator* allocator, + const SliceTransform* transform, + Logger* logger) override { + SkipListFactory factory; + MemTableRep* skiplist_rep = + factory.CreateMemTableRep(cmp, allocator, transform, logger); + mock_rep_ = new MockMemTableRep(allocator, skiplist_rep); + return mock_rep_; + } + + virtual const char* Name() const override { return "MockMemTableRepFactory"; } + + MockMemTableRep* rep() { return mock_rep_; } + + private: + MockMemTableRep* mock_rep_; +}; + +class TestPrefixExtractor : public SliceTransform { + public: + virtual const char* Name() const override { return "TestPrefixExtractor"; } + + virtual Slice Transform(const Slice& key) const override { + const char* p = separator(key); + if (p == nullptr) { + return Slice(); + } + return Slice(key.data(), p - key.data() + 1); + } + + virtual bool InDomain(const Slice& key) const override { + return separator(key) != nullptr; + } + + virtual bool InRange(const Slice& key) const override { return false; } + + private: + const char* separator(const Slice& key) const { + return reinterpret_cast(memchr(key.data(), '_', key.size())); + } +}; + +TEST_F(DBMemTableTest, InsertWithHint) { + Options options; + options.create_if_missing = true; + options.memtable_factory.reset(new MockMemTableRepFactory()); + options.memtable_insert_with_hint_prefix_extractor.reset( + new TestPrefixExtractor()); + Reopen(options); + MockMemTableRep* rep = + reinterpret_cast(options.memtable_factory.get()) + ->rep(); + ASSERT_OK(Put("foo_k1", "foo_v1")); + ASSERT_EQ(nullptr, rep->last_hint_in()); + void* hint_foo = rep->last_hint_out(); + ASSERT_OK(Put("foo_k2", "foo_v2")); + ASSERT_EQ(hint_foo, rep->last_hint_in()); + ASSERT_EQ(hint_foo, rep->last_hint_out()); + ASSERT_OK(Put("foo_k3", "foo_v3")); + ASSERT_EQ(hint_foo, rep->last_hint_in()); + ASSERT_EQ(hint_foo, rep->last_hint_out()); + ASSERT_OK(Put("bar_k1", "bar_v1")); + ASSERT_EQ(nullptr, rep->last_hint_in()); + void* hint_bar = rep->last_hint_out(); + ASSERT_NE(hint_foo, hint_bar); + ASSERT_OK(Put("bar_k2", "bar_v2")); + ASSERT_EQ(hint_bar, rep->last_hint_in()); + ASSERT_EQ(hint_bar, rep->last_hint_out()); + ASSERT_EQ(5, rep->num_insert_with_hint()); + ASSERT_OK(Put("whitelisted", "vvv")); + ASSERT_EQ(5, rep->num_insert_with_hint()); + ASSERT_EQ("foo_v1", Get("foo_k1")); + ASSERT_EQ("foo_v2", Get("foo_k2")); + ASSERT_EQ("foo_v3", Get("foo_k3")); + ASSERT_EQ("bar_v1", Get("bar_k1")); + ASSERT_EQ("bar_v2", Get("bar_k2")); + ASSERT_EQ("vvv", Get("whitelisted")); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/memtable.cc b/db/memtable.cc index baac08dc6..f12365298 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -87,7 +87,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp, : 0), prefix_extractor_(ioptions.prefix_extractor), flush_state_(FLUSH_NOT_REQUESTED), - env_(ioptions.env) { + env_(ioptions.env), + insert_with_hint_prefix_extractor_( + ioptions.memtable_insert_with_hint_prefix_extractor) { UpdateFlushState(); // something went wrong if we need to flush before inserting anything assert(!ShouldScheduleFlush()); @@ -423,6 +425,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, char* p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); + Slice key_slice(p, key_size); p += key_size; uint64_t packed = PackSequenceAndType(s, type); EncodeFixed64(p, packed); @@ -431,7 +434,18 @@ void MemTable::Add(SequenceNumber s, ValueType type, memcpy(p, value.data(), val_size); assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len); if (!allow_concurrent) { - table->Insert(handle); + // Extract prefix for insert with hint. + Slice prefix; + if (insert_with_hint_prefix_extractor_ != nullptr) { + if (insert_with_hint_prefix_extractor_->InDomain(key_slice)) { + prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); + } + } + if (prefix.empty()) { + table->Insert(handle); + } else { + table->InsertWithHint(handle, &insert_hints_[prefix]); + } // this is a bit ugly, but is the way to avoid locked instructions // when incrementing an atomic diff --git a/db/memtable.h b/db/memtable.h index 28ece7d2b..3fee7549f 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include "db/dbformat.h" #include "db/memtable_allocator.h" @@ -25,6 +26,7 @@ #include "util/cf_options.h" #include "util/concurrent_arena.h" #include "util/dynamic_bloom.h" +#include "util/hash.h" #include "util/instrumented_mutex.h" namespace rocksdb { @@ -390,6 +392,12 @@ class MemTable { Env* env_; + // Extract sequential insert prefixes. + const SliceTransform* insert_with_hint_prefix_extractor_; + + // Insert hints for each prefix. + std::unordered_map insert_hints_; + // Returns a heuristic flush decision bool ShouldFlushNow() const; diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index fbf07caaf..31280bbf2 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -83,6 +83,14 @@ class MemTableRep { // collection, and no concurrent modifications to the table in progress virtual void Insert(KeyHandle handle) = 0; + // Same as Insert(), but in additional pass a hint to optimize sequential + // inserts. A new hint will be return from the hint pointer. Caller can get + // an initial hint by passing hint pointing to nullptr. + virtual void InsertWithHint(KeyHandle handle, void** hint) { + // Ignore the hint by default. + Insert(handle); + } + // Like Insert(handle), but may be called concurrent with other calls // to InsertConcurrently for other handles virtual void InsertConcurrently(KeyHandle handle) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 75fdab101..ce76f4193 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -746,6 +746,30 @@ struct ColumnFamilyOptions { // Dynamically changeable through SetOptions() API size_t memtable_huge_page_size; + // If non-nullptr, memtable will use the specified function to extract + // prefixes for keys, and for each non-empty prefix maintain a hint to + // reduce CPU usage for inserting keys with the prefix. Keys with empty + // prefix will be insert without using a hint. + // + // Currently only the default skiplist based memtable implements the feature. + // All other memtable implementation will ignore the option. It incurs ~150 + // additional bytes of memory overhead to store a hint for each prefix. + // If allow_concurrent_memtable_write is true, the option will also be + // ignored. + // + // The option is best suited for sequential inserts, or inserts that's + // almost sequential. One scenario could be inserting keys of the form + // (prefix + timestamp), and keys of the same prefix always comes in + // with time order, or in some cases a key with a smaller timestamp comes + // in later due to network latency. + // + // REQUIRES: If custom comparator is provided, it has to make sure keys + // with the same prefix appear in consecutive range. + // + // Default: nullptr (disable) + std::shared_ptr + memtable_insert_with_hint_prefix_extractor; + // Control locality of bloom filter probes to improve cache miss rate. // This option only applies to memtable prefix bloom and plaintable // prefix bloom. It essentially limits every bloom checking to one cache line. diff --git a/memtable/skiplistrep.cc b/memtable/skiplistrep.cc index daed53d99..59ca2af7c 100644 --- a/memtable/skiplistrep.cc +++ b/memtable/skiplistrep.cc @@ -36,6 +36,12 @@ public: skip_list_.Insert(static_cast(handle)); } + virtual void InsertWithHint(KeyHandle handle, void** hint) override { + skip_list_.InsertWithHint( + static_cast(handle), + reinterpret_cast(hint)); + } + virtual void InsertConcurrently(KeyHandle handle) override { skip_list_.InsertConcurrently(static_cast(handle)); } diff --git a/util/cf_options.cc b/util/cf_options.cc index 48fdfff97..19685d156 100644 --- a/util/cf_options.cc +++ b/util/cf_options.cc @@ -71,7 +71,9 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, force_consistency_checks(cf_options.force_consistency_checks), listeners(db_options.listeners), row_cache(db_options.row_cache), - max_subcompactions(db_options.max_subcompactions) {} + max_subcompactions(db_options.max_subcompactions), + memtable_insert_with_hint_prefix_extractor( + cf_options.memtable_insert_with_hint_prefix_extractor.get()) {} // Multiple two operands. If they overflow, return op1. uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) { diff --git a/util/cf_options.h b/util/cf_options.h index e76e9686c..f16d73869 100644 --- a/util/cf_options.h +++ b/util/cf_options.h @@ -115,6 +115,8 @@ struct ImmutableCFOptions { std::shared_ptr row_cache; uint32_t max_subcompactions; + + const SliceTransform* memtable_insert_with_hint_prefix_extractor; }; struct MutableCFOptions { diff --git a/util/hash.h b/util/hash.h index d0fe35c43..c5da1ae9a 100644 --- a/util/hash.h +++ b/util/hash.h @@ -27,4 +27,9 @@ inline uint32_t GetSliceHash(const Slice& s) { return Hash(s.data(), s.size(), 397); } +// std::hash compatible interface. +struct SliceHasher { + uint32_t operator()(const Slice& s) const { return GetSliceHash(s); } +}; + } // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index da15667bc..b7b47c6bd 100644 --- a/util/options.cc +++ b/util/options.cc @@ -78,6 +78,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() inplace_callback(nullptr), memtable_prefix_bloom_size_ratio(0.0), memtable_huge_page_size(0), + memtable_insert_with_hint_prefix_extractor(nullptr), bloom_locality(0), max_successive_merges(0), min_partial_merge_operands(2), @@ -145,6 +146,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) memtable_prefix_bloom_size_ratio( options.memtable_prefix_bloom_size_ratio), memtable_huge_page_size(options.memtable_huge_page_size), + memtable_insert_with_hint_prefix_extractor( + options.memtable_insert_with_hint_prefix_extractor), bloom_locality(options.bloom_locality), max_successive_merges(options.max_successive_merges), min_partial_merge_operands(options.min_partial_merge_operands), @@ -463,6 +466,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const { : CompressionTypeToString(bottommost_compression).c_str()); Header(log, " Options.prefix_extractor: %s", prefix_extractor == nullptr ? "nullptr" : prefix_extractor->Name()); + Header(log, " Options.memtable_insert_with_hint_prefix_extractor: %s", + memtable_insert_with_hint_prefix_extractor == nullptr + ? "nullptr" + : memtable_insert_with_hint_prefix_extractor->Name()); Header(log, " Options.num_levels: %d", num_levels); Header(log, " Options.min_write_buffer_number_to_merge: %d", min_write_buffer_number_to_merge); diff --git a/util/options_helper.h b/util/options_helper.h index f00f7637f..a9a95c486 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -531,6 +531,11 @@ static std::unordered_map cf_options_type_info = { {offsetof(struct ColumnFamilyOptions, prefix_extractor), OptionType::kSliceTransform, OptionVerificationType::kByNameAllowNull, false, 0}}, + {"memtable_insert_with_hint_prefix_extractor", + {offsetof(struct ColumnFamilyOptions, + memtable_insert_with_hint_prefix_extractor), + OptionType::kSliceTransform, OptionVerificationType::kByNameAllowNull, + false, 0}}, {"memtable_factory", {offsetof(struct ColumnFamilyOptions, memtable_factory), OptionType::kMemTableRepFactory, OptionVerificationType::kByName, false, diff --git a/util/options_settable_test.cc b/util/options_settable_test.cc index a4d2c437f..59ed0e1e5 100644 --- a/util/options_settable_test.cc +++ b/util/options_settable_test.cc @@ -312,6 +312,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { // kColumnFamilyOptionsBlacklist, and maybe add customized verification // for it. TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { + // options in the blacklist need to appear in the same order as in + // ColumnFamilyOptions. const OffsetGap kColumnFamilyOptionsBlacklist = { {offsetof(struct ColumnFamilyOptions, comparator), sizeof(Comparator*)}, {offsetof(struct ColumnFamilyOptions, merge_operator), @@ -336,6 +338,9 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(ColumnFamilyOptions::TablePropertiesCollectorFactories)}, {offsetof(struct ColumnFamilyOptions, inplace_callback), sizeof(UpdateStatus(*)(char*, uint32_t*, Slice, std::string*))}, + {offsetof(struct ColumnFamilyOptions, + memtable_insert_with_hint_prefix_extractor), + sizeof(std::shared_ptr)}, }; char* options_ptr = new char[sizeof(ColumnFamilyOptions)]; @@ -419,6 +424,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "verify_checksums_in_compaction=false;" "merge_operator=aabcxehazrMergeOperator;" "memtable_prefix_bloom_size_ratio=0.4642;" + "memtable_insert_with_hint_prefix_extractor=rocksdb.CappedPrefix.13;" "paranoid_file_checks=true;" "force_consistency_checks=true;" "inplace_update_num_locks=7429;"