From 33042669f6499fa6ab8741bfef00c066c867ea79 Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Tue, 11 Feb 2014 09:46:30 -0800 Subject: [PATCH] Reduce malloc of iterators in Get() code paths Summary: This patch optimized Get() code paths by avoiding malloc of iterators. Iterator creation is moved to mem table rep implementations, where a callback is called when any key is found. This is the same practice as what we do in (SST) table readers. db_bench result for readrandom following a writeseq, with no compression, single thread and tmpfs, we see throughput improved to 144958 from 139027, about 3%. Test Plan: make all check Reviewers: dhruba, haobo, igor Reviewed By: haobo CC: leveldb, yhchiang Differential Revision: https://reviews.facebook.net/D14685 --- db/memtable.cc | 232 ++++++++++++++++++++-------------- db/memtable.h | 10 +- db/skiplist.h | 1 + db/version_set.h | 1 + include/rocksdb/db.h | 1 + include/rocksdb/memtablerep.h | 15 +++ util/hash_linklist_rep.cc | 17 +++ util/hash_skiplist_rep.cc | 17 +++ util/skiplistrep.cc | 11 ++ util/vectorrep.cc | 23 ++++ 10 files changed, 229 insertions(+), 99 deletions(-) diff --git a/db/memtable.cc b/db/memtable.cc index e7519de5d..e9f528725 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -207,116 +207,147 @@ void MemTable::Add(SequenceNumber s, ValueType type, } } +// Callback from MemTable::Get() +namespace { + +struct Saver { + Status* status; + const LookupKey* key; + bool* found_final_value; // Is value set correctly? Used by KeyMayExist + bool* merge_in_progress; + std::string* value; + const MergeOperator* merge_operator; + // the merge operations encountered; + MergeContext* merge_context; + MemTable* mem; + Logger* logger; + Statistics* statistics; + bool inplace_update_support; +}; +} // namespace + +static bool SaveValue(void* arg, const char* entry) { + Saver* s = reinterpret_cast(arg); + MergeContext* merge_context = s->merge_context; + const MergeOperator* merge_operator = s->merge_operator; + + assert(s != nullptr && merge_context != nullptr); + + // entry format is: + // klength varint32 + // userkey char[klength-8] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + uint32_t key_length; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (s->mem->GetInternalKeyComparator().user_comparator()->Compare( + Slice(key_ptr, key_length - 8), s->key->user_key()) == 0) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast(tag & 0xff)) { + case kTypeValue: { + if (s->inplace_update_support) { + s->mem->GetLock(s->key->user_key())->ReadLock(); + } + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + *(s->status) = Status::OK(); + if (*(s->merge_in_progress)) { + assert(merge_operator); + if (!merge_operator->FullMerge(s->key->user_key(), &v, + merge_context->GetOperands(), s->value, + s->logger)) { + RecordTick(s->statistics, NUMBER_MERGE_FAILURES); + *(s->status) = + Status::Corruption("Error: Could not perform merge."); + } + } else { + s->value->assign(v.data(), v.size()); + } + if (s->inplace_update_support) { + s->mem->GetLock(s->key->user_key())->Unlock(); + } + *(s->found_final_value) = true; + return false; + } + case kTypeDeletion: { + if (*(s->merge_in_progress)) { + assert(merge_operator); + *(s->status) = Status::OK(); + if (!merge_operator->FullMerge(s->key->user_key(), nullptr, + merge_context->GetOperands(), s->value, + s->logger)) { + RecordTick(s->statistics, NUMBER_MERGE_FAILURES); + *(s->status) = + Status::Corruption("Error: Could not perform merge."); + } + } else { + *(s->status) = Status::NotFound(); + } + *(s->found_final_value) = true; + return false; + } + case kTypeMerge: { + std::string merge_result; // temporary area for merge results later + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + *(s->merge_in_progress) = true; + merge_context->PushOperand(v); + while (merge_context->GetNumOperands() >= 2) { + // Attempt to associative merge. (Returns true if successful) + if (merge_operator->PartialMerge( + s->key->user_key(), merge_context->GetOperand(0), + merge_context->GetOperand(1), &merge_result, s->logger)) { + merge_context->PushPartialMergeResult(merge_result); + } else { + // Stack them because user can't associative merge + break; + } + } + return true; + } + default: + assert(false); + return true; + } + } + + // s->state could be Corrupt, merge or notfound + return false; +} + bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, MergeContext& merge_context, const Options& options) { StopWatchNano memtable_get_timer(options.env, false); StartPerfTimer(&memtable_get_timer); - Slice mem_key = key.memtable_key(); Slice user_key = key.user_key(); + bool found_final_value = false; + bool merge_in_progress = s->IsMergeInProgress(); - std::unique_ptr iter; if (prefix_bloom_ && !prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) { // iter is null if prefix bloom says the key does not exist } else { - iter.reset(table_->GetIterator(user_key)); - iter->Seek(key.internal_key(), mem_key.data()); - } - - bool merge_in_progress = s->IsMergeInProgress(); - auto merge_operator = options.merge_operator.get(); - auto logger = options.info_log; - std::string merge_result; - - bool found_final_value = false; - for (; !found_final_value && iter && iter->Valid(); iter->Next()) { - // entry format is: - // klength varint32 - // userkey char[klength-8] - // tag uint64 - // vlength varint32 - // value char[vlength] - // Check that it belongs to same user key. We do not check the - // sequence number since the Seek() call above should have skipped - // all entries with overly large sequence numbers. - const char* entry = iter->key(); - uint32_t key_length = 0; - const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); - if (comparator_.comparator.user_comparator()->Compare( - Slice(key_ptr, key_length - 8), key.user_key()) == 0) { - // Correct user key - const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); - switch (static_cast(tag & 0xff)) { - case kTypeValue: { - if (options.inplace_update_support) { - GetLock(key.user_key())->ReadLock(); - } - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - *s = Status::OK(); - if (merge_in_progress) { - assert(merge_operator); - if (!merge_operator->FullMerge(key.user_key(), &v, - merge_context.GetOperands(), value, - logger.get())) { - RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES); - *s = Status::Corruption("Error: Could not perform merge."); - } - } else { - value->assign(v.data(), v.size()); - } - if (options.inplace_update_support) { - GetLock(key.user_key())->Unlock(); - } - found_final_value = true; - break; - } - case kTypeDeletion: { - if (merge_in_progress) { - assert(merge_operator); - *s = Status::OK(); - if (!merge_operator->FullMerge(key.user_key(), nullptr, - merge_context.GetOperands(), value, - logger.get())) { - RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES); - *s = Status::Corruption("Error: Could not perform merge."); - } - } else { - *s = Status::NotFound(); - } - found_final_value = true; - break; - } - case kTypeMerge: { - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - merge_in_progress = true; - merge_context.PushOperand(v); - while(merge_context.GetNumOperands() >= 2) { - // Attempt to associative merge. (Returns true if successful) - if (merge_operator->PartialMerge(key.user_key(), - merge_context.GetOperand(0), - merge_context.GetOperand(1), - &merge_result, logger.get())) { - merge_context.PushPartialMergeResult(merge_result); - } else { - // Stack them because user can't associative merge - break; - } - } - break; - } - default: - assert(false); - break; - } - } else { - // exit loop if user key does not match - break; - } + Saver saver; + saver.status = s; + saver.found_final_value = &found_final_value; + saver.merge_in_progress = &merge_in_progress; + saver.key = &key; + saver.value = value; + saver.status = s; + saver.mem = this; + saver.merge_context = &merge_context; + saver.merge_operator = options.merge_operator.get(); + saver.logger = options.info_log.get(); + saver.inplace_update_support = options.inplace_update_support; + saver.statistics = options.statistics.get(); + table_->Get(key, &saver, SaveValue); } // No change to value, since we have not yet found a Put/Delete - if (!found_final_value && merge_in_progress) { *s = Status::MergeInProgress(""); } @@ -488,4 +519,13 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { return num_successive_merges; } +void MemTableRep::Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + auto iter = GetIterator(k.user_key()); + for (iter->Seek(k.internal_key(), k.memtable_key().data()); + iter->Valid() && callback_func(callback_args, iter->key()); + iter->Next()) { + } +} + } // namespace rocksdb diff --git a/db/memtable.h b/db/memtable.h index 349359f8b..414d4ac95 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -154,6 +154,13 @@ class MemTable { // Notify the underlying storage that no more items will be added void MarkImmutable() { table_->MarkReadOnly(); } + // Get the lock associated for the key + port::RWMutex* GetLock(const Slice& key); + + const InternalKeyComparator& GetInternalKeyComparator() const { + return comparator_.comparator; + } + private: friend class MemTableIterator; friend class MemTableBackwardIterator; @@ -190,9 +197,6 @@ class MemTable { MemTable(const MemTable&); void operator=(const MemTable&); - // Get the lock associated for the key - port::RWMutex* GetLock(const Slice& key); - const SliceTransform* const prefix_extractor_; std::unique_ptr prefix_bloom_; }; diff --git a/db/skiplist.h b/db/skiplist.h index e713fe42a..e4a253bcc 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -33,6 +33,7 @@ #pragma once #include #include +#include "util/arena.h" #include "port/port.h" #include "util/arena.h" #include "util/random.h" diff --git a/db/version_set.h b/db/version_set.h index 82394a556..c41c18eb0 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -43,6 +43,7 @@ class TableCache; class Version; class VersionSet; class MergeContext; +class LookupKey; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 155c49fe7..1e5737724 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -31,6 +31,7 @@ struct ReadOptions; struct WriteOptions; struct FlushOptions; class WriteBatch; +class Env; // Metadata associated with each SST file. struct LiveFileMetaData { diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index e816387b4..428f27d4e 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -41,6 +41,7 @@ namespace rocksdb { class Arena; +class LookupKey; class Slice; class SliceTransform; @@ -74,6 +75,20 @@ class MemTableRep { // nothing. virtual void MarkReadOnly() { } + // Look up key from the mem table, since the first key in the mem table whose + // user_key matches the one given k, call the function callback_func(), with + // callback_args directly forwarded as the first parameter, and the mem table + // key as the second parameter. If the return value is false, then terminates. + // Otherwise, go through the next key. + // It's safe for Get() to terminate after having finished all the potential + // key for the k.user_key(), or not. + // + // Default: + // Get() function with a default value of dynamically construct an iterator, + // seek and call the call back function. + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)); + // Report an approximation of how much memory has been used other than memory // that was allocated through the arena. virtual size_t ApproximateMemoryUsage() = 0; diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 83f0f3d5a..4db624975 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -64,6 +64,10 @@ class HashLinkListRep : public MemTableRep { virtual size_t ApproximateMemoryUsage() override; + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override; + virtual ~HashLinkListRep(); virtual MemTableRep::Iterator* GetIterator() override; @@ -398,6 +402,19 @@ size_t HashLinkListRep::ApproximateMemoryUsage() { return 0; } +void HashLinkListRep::Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + auto transformed = transform_->Transform(k.user_key()); + auto bucket = GetBucket(transformed); + if (bucket != nullptr) { + Iterator iter(this, bucket); + for (iter.Seek(k.internal_key(), nullptr); + iter.Valid() && callback_func(callback_args, iter.key()); + iter.Next()) { + } + } +} + MemTableRep::Iterator* HashLinkListRep::GetIterator() { auto list = new FullList(compare_, arena_); for (size_t i = 0; i < bucket_size_; ++i) { diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index aa070bc8b..61da5ae41 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -31,6 +31,10 @@ class HashSkipListRep : public MemTableRep { virtual size_t ApproximateMemoryUsage() override; + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override; + virtual ~HashSkipListRep(); virtual MemTableRep::Iterator* GetIterator() override; @@ -271,6 +275,19 @@ size_t HashSkipListRep::ApproximateMemoryUsage() { return sizeof(buckets_); } +void HashSkipListRep::Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + auto transformed = transform_->Transform(k.user_key()); + auto bucket = GetBucket(transformed); + if (bucket != nullptr) { + Bucket::Iterator iter(bucket); + for (iter.Seek(k.memtable_key().data()); + iter.Valid() && callback_func(callback_args, iter.key()); + iter.Next()) { + } + } +} + MemTableRep::Iterator* HashSkipListRep::GetIterator() { auto list = new Bucket(compare_, arena_); for (size_t i = 0; i < bucket_size_; ++i) { diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index 6f1fb1a15..ab77e7f3a 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -32,6 +32,17 @@ public: return 0; } + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override { + SkipListRep::Iterator iter(&skip_list_); + Slice dummy_slice; + for (iter.Seek(dummy_slice, k.memtable_key().data()); + iter.Valid() && callback_func(callback_args, iter.key()); + iter.Next()) { + } + } + virtual ~SkipListRep() override { } // Iteration over the contents of a skip list diff --git a/util/vectorrep.cc b/util/vectorrep.cc index 4b8b3d552..e0f3d69b0 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -39,6 +39,10 @@ class VectorRep : public MemTableRep { virtual size_t ApproximateMemoryUsage() override; + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override; + virtual ~VectorRep() override { } class Iterator : public MemTableRep::Iterator { @@ -233,6 +237,25 @@ void VectorRep::Iterator::SeekToLast() { } } +void VectorRep::Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + rwlock_.ReadLock(); + VectorRep* vector_rep; + std::shared_ptr bucket; + if (immutable_) { + vector_rep = this; + } else { + vector_rep = nullptr; + bucket.reset(new Bucket(*bucket_)); // make a copy + } + VectorRep::Iterator iter(vector_rep, immutable_ ? bucket_ : bucket, compare_); + rwlock_.Unlock(); + + for (iter.Seek(k.user_key(), k.memtable_key().data()); + iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) { + } +} + MemTableRep::Iterator* VectorRep::GetIterator() { ReadLock l(&rwlock_); // Do not sort here. The sorting would be done the first time