2013-10-16 23:59:46 +02:00
|
|
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under the BSD-style license found in the
|
|
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
//
|
2011-03-18 23:37:00 +01:00
|
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
|
|
|
|
#include "db/memtable.h"
|
2013-07-23 23:42:27 +02:00
|
|
|
|
|
|
|
#include <memory>
|
2014-03-13 00:40:14 +01:00
|
|
|
#include <algorithm>
|
2013-07-23 23:42:27 +02:00
|
|
|
|
2011-03-18 23:37:00 +01:00
|
|
|
#include "db/dbformat.h"
|
2013-12-03 03:34:05 +01:00
|
|
|
#include "db/merge_context.h"
|
2013-08-23 17:38:13 +02:00
|
|
|
#include "rocksdb/comparator.h"
|
|
|
|
#include "rocksdb/env.h"
|
|
|
|
#include "rocksdb/iterator.h"
|
|
|
|
#include "rocksdb/merge_operator.h"
|
2014-01-31 02:18:17 +01:00
|
|
|
#include "rocksdb/slice_transform.h"
|
|
|
|
#include "util/arena.h"
|
2011-03-18 23:37:00 +01:00
|
|
|
#include "util/coding.h"
|
2013-08-23 08:10:02 +02:00
|
|
|
#include "util/murmurhash.h"
|
2013-11-27 20:47:40 +01:00
|
|
|
#include "util/mutexlock.h"
|
2013-11-18 20:32:54 +01:00
|
|
|
#include "util/perf_context_imp.h"
|
2014-01-17 21:46:06 +01:00
|
|
|
#include "util/statistics.h"
|
2013-11-18 20:32:54 +01:00
|
|
|
#include "util/stop_watch.h"
|
2011-03-18 23:37:00 +01:00
|
|
|
|
2013-10-04 06:49:15 +02:00
|
|
|
namespace rocksdb {
|
2011-03-18 23:37:00 +01:00
|
|
|
|
2014-01-15 00:27:09 +01:00
|
|
|
MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
|
2011-03-18 23:37:00 +01:00
|
|
|
: comparator_(cmp),
|
2011-05-21 04:17:43 +02:00
|
|
|
refs_(0),
|
2014-03-13 00:40:14 +01:00
|
|
|
kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)),
|
|
|
|
kWriteBufferSize(options.write_buffer_size),
|
2014-01-31 02:18:17 +01:00
|
|
|
arena_(options.arena_block_size),
|
2014-03-10 20:56:46 +01:00
|
|
|
table_(options.memtable_factory->CreateMemTableRep(
|
2014-03-25 01:57:13 +01:00
|
|
|
comparator_, &arena_, options.prefix_extractor.get())),
|
2012-10-19 23:00:53 +02:00
|
|
|
flush_in_progress_(false),
|
|
|
|
flush_completed_(false),
|
2012-11-29 01:42:36 +01:00
|
|
|
file_number_(0),
|
2013-06-11 23:23:58 +02:00
|
|
|
first_seqno_(0),
|
2013-07-16 20:56:46 +02:00
|
|
|
mem_next_logfile_number_(0),
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
mem_logfile_number_(0),
|
2014-01-15 00:27:09 +01:00
|
|
|
locks_(options.inplace_update_support ? options.inplace_update_num_locks
|
2014-01-16 08:12:31 +01:00
|
|
|
: 0),
|
2014-03-13 00:40:14 +01:00
|
|
|
prefix_extractor_(options.prefix_extractor.get()),
|
|
|
|
should_flush_(ShouldFlushNow()) {
|
|
|
|
// if should_flush_ == true without an entry inserted, something must have
|
|
|
|
// gone wrong already.
|
|
|
|
assert(!should_flush_);
|
2014-01-16 08:12:31 +01:00
|
|
|
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
|
2013-11-27 23:27:02 +01:00
|
|
|
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
|
2014-03-28 17:21:20 +01:00
|
|
|
options.bloom_locality,
|
2013-11-27 23:27:02 +01:00
|
|
|
options.memtable_prefix_bloom_probes));
|
|
|
|
}
|
|
|
|
}
|
2011-03-18 23:37:00 +01:00
|
|
|
|
|
|
|
MemTable::~MemTable() {
|
2011-05-21 04:17:43 +02:00
|
|
|
assert(refs_ == 0);
|
2011-03-18 23:37:00 +01:00
|
|
|
}
|
|
|
|
|
2013-07-23 23:42:27 +02:00
|
|
|
size_t MemTable::ApproximateMemoryUsage() {
|
2014-01-31 02:18:17 +01:00
|
|
|
return arena_.ApproximateMemoryUsage() + table_->ApproximateMemoryUsage();
|
2013-07-23 23:42:27 +02:00
|
|
|
}
|
2011-03-18 23:37:00 +01:00
|
|
|
|
2014-03-13 00:40:14 +01:00
|
|
|
bool MemTable::ShouldFlushNow() const {
|
|
|
|
// In a lot of times, we cannot allocate arena blocks that exactly matches the
|
|
|
|
// buffer size. Thus we have to decide if we should over-allocate or
|
|
|
|
// under-allocate.
|
|
|
|
// This constant avariable can be interpreted as: if we still have more than
|
|
|
|
// "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
|
|
|
|
// allocate one more block.
|
|
|
|
const double kAllowOverAllocationRatio = 0.6;
|
|
|
|
|
|
|
|
// If arena still have room for new block allocation, we can safely say it
|
|
|
|
// shouldn't flush.
|
|
|
|
auto allocated_memory =
|
|
|
|
table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();
|
|
|
|
|
2014-03-14 22:01:45 +01:00
|
|
|
// if we can still allocate one more block without exceeding the
|
|
|
|
// over-allocation ratio, then we should not flush.
|
|
|
|
if (allocated_memory + kArenaBlockSize <
|
|
|
|
kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) {
|
2014-03-13 00:40:14 +01:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// if user keeps adding entries that exceeds kWriteBufferSize, we need to
|
2014-03-14 22:01:45 +01:00
|
|
|
// flush earlier even though we still have much available memory left.
|
|
|
|
if (allocated_memory >
|
|
|
|
kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) {
|
2014-03-13 00:40:14 +01:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
// In this code path, Arena has already allocated its "last block", which
|
|
|
|
// means the total allocatedmemory size is either:
|
2014-03-14 22:01:45 +01:00
|
|
|
// (1) "moderately" over allocated the memory (no more than `0.6 * arena
|
2014-03-13 00:40:14 +01:00
|
|
|
// block size`. Or,
|
|
|
|
// (2) the allocated memory is less than write buffer size, but we'll stop
|
|
|
|
// here since if we allocate a new arena block, we'll over allocate too much
|
|
|
|
// more (half of the arena block size) memory.
|
|
|
|
//
|
|
|
|
// In either case, to avoid over-allocate, the last block will stop allocation
|
|
|
|
// when its usage reaches a certain ratio, which we carefully choose "0.75
|
|
|
|
// full" as the stop condition because it addresses the following issue with
|
|
|
|
// great simplicity: What if the next inserted entry's size is
|
|
|
|
// bigger than AllocatedAndUnused()?
|
|
|
|
//
|
|
|
|
// The answer is: if the entry size is also bigger than 0.25 *
|
|
|
|
// kArenaBlockSize, a dedicated block will be allocated for it; otherwise
|
|
|
|
// arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
|
|
|
|
// and regular block. In either case, we *overly* over-allocated.
|
|
|
|
//
|
|
|
|
// Therefore, setting the last block to be at most "0.75 full" avoids both
|
|
|
|
// cases.
|
|
|
|
//
|
|
|
|
// NOTE: the average percentage of waste space of this approach can be counted
|
|
|
|
// as: "arena block size * 0.25 / write buffer size". User who specify a small
|
|
|
|
// write buffer size and/or big arena block size may suffer.
|
|
|
|
return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
|
|
|
|
}
|
|
|
|
|
2014-01-25 02:50:59 +01:00
|
|
|
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
|
|
|
|
const char* prefix_len_key2) const {
|
|
|
|
// Internal keys are encoded as length-prefixed strings.
|
|
|
|
Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
|
|
|
|
Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
|
|
|
|
return comparator.Compare(k1, k2);
|
|
|
|
}
|
|
|
|
|
|
|
|
int MemTable::KeyComparator::operator()(const char* prefix_len_key,
|
|
|
|
const Slice& key)
|
2011-03-18 23:37:00 +01:00
|
|
|
const {
|
|
|
|
// Internal keys are encoded as length-prefixed strings.
|
2014-01-25 02:50:59 +01:00
|
|
|
Slice a = GetLengthPrefixedSlice(prefix_len_key);
|
|
|
|
return comparator.Compare(a, key);
|
2011-03-18 23:37:00 +01:00
|
|
|
}
|
|
|
|
|
2013-08-23 08:10:02 +02:00
|
|
|
Slice MemTableRep::UserKey(const char* key) const {
|
|
|
|
Slice slice = GetLengthPrefixedSlice(key);
|
|
|
|
return Slice(slice.data(), slice.size() - 8);
|
|
|
|
}
|
|
|
|
|
2014-04-05 00:37:28 +02:00
|
|
|
KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
|
|
|
|
*buf = arena_->Allocate(len);
|
|
|
|
return static_cast<KeyHandle>(*buf);
|
|
|
|
}
|
|
|
|
|
2011-03-18 23:37:00 +01:00
|
|
|
// Encode a suitable internal key target for "target" and return it.
|
|
|
|
// Uses *scratch as scratch space, and the returned pointer will point
|
|
|
|
// into this scratch space.
|
2013-11-21 04:49:27 +01:00
|
|
|
const char* EncodeKey(std::string* scratch, const Slice& target) {
|
2011-03-18 23:37:00 +01:00
|
|
|
scratch->clear();
|
|
|
|
PutVarint32(scratch, target.size());
|
|
|
|
scratch->append(target.data(), target.size());
|
|
|
|
return scratch->data();
|
|
|
|
}
|
|
|
|
|
|
|
|
class MemTableIterator: public Iterator {
|
|
|
|
public:
|
2013-11-27 23:27:02 +01:00
|
|
|
MemTableIterator(const MemTable& mem, const ReadOptions& options)
|
2014-03-28 22:38:10 +01:00
|
|
|
: bloom_(nullptr),
|
|
|
|
prefix_extractor_(mem.prefix_extractor_),
|
|
|
|
iter_(),
|
|
|
|
valid_(false) {
|
2013-11-04 01:32:46 +01:00
|
|
|
if (options.prefix) {
|
2014-03-28 22:38:10 +01:00
|
|
|
iter_.reset(mem.table_->GetPrefixIterator(*options.prefix));
|
2013-11-04 01:32:46 +01:00
|
|
|
} else if (options.prefix_seek) {
|
2014-03-28 22:38:10 +01:00
|
|
|
bloom_ = mem.prefix_bloom_.get();
|
|
|
|
iter_.reset(mem.table_->GetDynamicPrefixIterator());
|
2013-11-04 01:32:46 +01:00
|
|
|
} else {
|
2014-03-28 22:38:10 +01:00
|
|
|
iter_.reset(mem.table_->GetIterator());
|
2013-11-04 01:32:46 +01:00
|
|
|
}
|
|
|
|
}
|
2013-08-23 08:10:02 +02:00
|
|
|
|
2013-11-27 23:27:02 +01:00
|
|
|
virtual bool Valid() const { return valid_; }
|
|
|
|
virtual void Seek(const Slice& k) {
|
2014-03-28 22:38:10 +01:00
|
|
|
if (bloom_ != nullptr &&
|
|
|
|
!bloom_->MayContain(prefix_extractor_->Transform(ExtractUserKey(k)))) {
|
2013-11-27 23:27:02 +01:00
|
|
|
valid_ = false;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
iter_->Seek(k, nullptr);
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
|
|
|
virtual void SeekToFirst() {
|
|
|
|
iter_->SeekToFirst();
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
|
|
|
virtual void SeekToLast() {
|
|
|
|
iter_->SeekToLast();
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
|
|
|
virtual void Next() {
|
|
|
|
assert(Valid());
|
|
|
|
iter_->Next();
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
|
|
|
virtual void Prev() {
|
|
|
|
assert(Valid());
|
|
|
|
iter_->Prev();
|
|
|
|
valid_ = iter_->Valid();
|
|
|
|
}
|
2013-07-23 23:42:27 +02:00
|
|
|
virtual Slice key() const {
|
2013-11-27 23:27:02 +01:00
|
|
|
assert(Valid());
|
2013-07-23 23:42:27 +02:00
|
|
|
return GetLengthPrefixedSlice(iter_->key());
|
|
|
|
}
|
2011-03-18 23:37:00 +01:00
|
|
|
virtual Slice value() const {
|
2013-11-27 23:27:02 +01:00
|
|
|
assert(Valid());
|
2013-07-23 23:42:27 +02:00
|
|
|
Slice key_slice = GetLengthPrefixedSlice(iter_->key());
|
2011-03-18 23:37:00 +01:00
|
|
|
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual Status status() const { return Status::OK(); }
|
|
|
|
|
|
|
|
private:
|
2014-03-28 22:38:10 +01:00
|
|
|
DynamicBloom* bloom_;
|
|
|
|
const SliceTransform* const prefix_extractor_;
|
2013-07-23 23:42:27 +02:00
|
|
|
std::shared_ptr<MemTableRep::Iterator> iter_;
|
2013-11-27 23:27:02 +01:00
|
|
|
bool valid_;
|
2011-03-18 23:37:00 +01:00
|
|
|
|
|
|
|
// No copying allowed
|
|
|
|
MemTableIterator(const MemTableIterator&);
|
|
|
|
void operator=(const MemTableIterator&);
|
|
|
|
};
|
|
|
|
|
2013-11-04 01:32:46 +01:00
|
|
|
Iterator* MemTable::NewIterator(const ReadOptions& options) {
|
2013-11-27 23:27:02 +01:00
|
|
|
return new MemTableIterator(*this, options);
|
2011-03-18 23:37:00 +01:00
|
|
|
}
|
|
|
|
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
port::RWMutex* MemTable::GetLock(const Slice& key) {
|
2014-02-18 23:58:55 +01:00
|
|
|
static murmur_hash hash;
|
|
|
|
return &locks_[hash(key) % locks_.size()];
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
}
|
|
|
|
|
2011-03-18 23:37:00 +01:00
|
|
|
void MemTable::Add(SequenceNumber s, ValueType type,
|
2013-11-27 23:27:02 +01:00
|
|
|
const Slice& key, /* user key */
|
2011-03-18 23:37:00 +01:00
|
|
|
const Slice& value) {
|
|
|
|
// Format of an entry is concatenation of:
|
|
|
|
// key_size : varint32 of internal_key.size()
|
|
|
|
// key bytes : char[internal_key.size()]
|
|
|
|
// value_size : varint32 of value.size()
|
|
|
|
// value bytes : char[value.size()]
|
|
|
|
size_t key_size = key.size();
|
|
|
|
size_t val_size = value.size();
|
|
|
|
size_t internal_key_size = key_size + 8;
|
|
|
|
const size_t encoded_len =
|
|
|
|
VarintLength(internal_key_size) + internal_key_size +
|
|
|
|
VarintLength(val_size) + val_size;
|
2014-04-05 00:37:28 +02:00
|
|
|
char* buf = nullptr;
|
|
|
|
KeyHandle handle = table_->Allocate(encoded_len, &buf);
|
|
|
|
assert(buf != nullptr);
|
2011-03-18 23:37:00 +01:00
|
|
|
char* p = EncodeVarint32(buf, internal_key_size);
|
|
|
|
memcpy(p, key.data(), key_size);
|
|
|
|
p += key_size;
|
|
|
|
EncodeFixed64(p, (s << 8) | type);
|
|
|
|
p += 8;
|
|
|
|
p = EncodeVarint32(p, val_size);
|
|
|
|
memcpy(p, value.data(), val_size);
|
2014-02-03 22:48:30 +01:00
|
|
|
assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
|
2014-04-05 00:37:28 +02:00
|
|
|
table_->Insert(handle);
|
2013-02-28 23:09:30 +01:00
|
|
|
|
2013-11-27 23:27:02 +01:00
|
|
|
if (prefix_bloom_) {
|
|
|
|
assert(prefix_extractor_);
|
|
|
|
prefix_bloom_->Add(prefix_extractor_->Transform(key));
|
|
|
|
}
|
|
|
|
|
2013-02-28 23:09:30 +01:00
|
|
|
// The first sequence number inserted into the memtable
|
|
|
|
assert(first_seqno_ == 0 || s > first_seqno_);
|
|
|
|
if (first_seqno_ == 0) {
|
|
|
|
first_seqno_ = s;
|
|
|
|
}
|
2014-03-13 00:40:14 +01:00
|
|
|
|
|
|
|
should_flush_ = ShouldFlushNow();
|
2011-03-18 23:37:00 +01:00
|
|
|
}
|
|
|
|
|
2014-02-11 18:46:30 +01:00
|
|
|
// Callback from MemTable::Get()
|
|
|
|
namespace {
|
|
|
|
|
|
|
|
struct Saver {
|
|
|
|
Status* status;
|
|
|
|
const LookupKey* key;
|
|
|
|
bool* found_final_value; // Is value set correctly? Used by KeyMayExist
|
|
|
|
bool* merge_in_progress;
|
|
|
|
std::string* value;
|
|
|
|
const MergeOperator* merge_operator;
|
|
|
|
// the merge operations encountered;
|
|
|
|
MergeContext* merge_context;
|
|
|
|
MemTable* mem;
|
|
|
|
Logger* logger;
|
|
|
|
Statistics* statistics;
|
|
|
|
bool inplace_update_support;
|
|
|
|
};
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
static bool SaveValue(void* arg, const char* entry) {
|
|
|
|
Saver* s = reinterpret_cast<Saver*>(arg);
|
|
|
|
MergeContext* merge_context = s->merge_context;
|
|
|
|
const MergeOperator* merge_operator = s->merge_operator;
|
|
|
|
|
|
|
|
assert(s != nullptr && merge_context != nullptr);
|
|
|
|
|
|
|
|
// entry format is:
|
|
|
|
// klength varint32
|
|
|
|
// userkey char[klength-8]
|
|
|
|
// tag uint64
|
|
|
|
// vlength varint32
|
|
|
|
// value char[vlength]
|
|
|
|
// Check that it belongs to same user key. We do not check the
|
|
|
|
// sequence number since the Seek() call above should have skipped
|
|
|
|
// all entries with overly large sequence numbers.
|
|
|
|
uint32_t key_length;
|
|
|
|
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
|
|
|
|
if (s->mem->GetInternalKeyComparator().user_comparator()->Compare(
|
|
|
|
Slice(key_ptr, key_length - 8), s->key->user_key()) == 0) {
|
|
|
|
// Correct user key
|
|
|
|
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
|
|
|
|
switch (static_cast<ValueType>(tag & 0xff)) {
|
|
|
|
case kTypeValue: {
|
|
|
|
if (s->inplace_update_support) {
|
|
|
|
s->mem->GetLock(s->key->user_key())->ReadLock();
|
|
|
|
}
|
|
|
|
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
|
|
|
|
*(s->status) = Status::OK();
|
|
|
|
if (*(s->merge_in_progress)) {
|
|
|
|
assert(merge_operator);
|
|
|
|
if (!merge_operator->FullMerge(s->key->user_key(), &v,
|
|
|
|
merge_context->GetOperands(), s->value,
|
|
|
|
s->logger)) {
|
|
|
|
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
|
|
|
*(s->status) =
|
|
|
|
Status::Corruption("Error: Could not perform merge.");
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
s->value->assign(v.data(), v.size());
|
|
|
|
}
|
|
|
|
if (s->inplace_update_support) {
|
|
|
|
s->mem->GetLock(s->key->user_key())->Unlock();
|
|
|
|
}
|
|
|
|
*(s->found_final_value) = true;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
case kTypeDeletion: {
|
|
|
|
if (*(s->merge_in_progress)) {
|
|
|
|
assert(merge_operator);
|
|
|
|
*(s->status) = Status::OK();
|
|
|
|
if (!merge_operator->FullMerge(s->key->user_key(), nullptr,
|
|
|
|
merge_context->GetOperands(), s->value,
|
|
|
|
s->logger)) {
|
|
|
|
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
|
|
|
*(s->status) =
|
|
|
|
Status::Corruption("Error: Could not perform merge.");
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
*(s->status) = Status::NotFound();
|
|
|
|
}
|
|
|
|
*(s->found_final_value) = true;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
case kTypeMerge: {
|
|
|
|
std::string merge_result; // temporary area for merge results later
|
|
|
|
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
|
|
|
|
*(s->merge_in_progress) = true;
|
|
|
|
merge_context->PushOperand(v);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
assert(false);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// s->state could be Corrupt, merge or notfound
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2013-03-21 23:59:47 +01:00
|
|
|
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
2013-12-03 03:34:05 +01:00
|
|
|
MergeContext& merge_context, const Options& options) {
|
2013-11-18 20:32:54 +01:00
|
|
|
StopWatchNano memtable_get_timer(options.env, false);
|
|
|
|
StartPerfTimer(&memtable_get_timer);
|
|
|
|
|
2013-11-27 23:27:02 +01:00
|
|
|
Slice user_key = key.user_key();
|
2014-02-11 18:46:30 +01:00
|
|
|
bool found_final_value = false;
|
|
|
|
bool merge_in_progress = s->IsMergeInProgress();
|
2013-11-27 23:27:02 +01:00
|
|
|
|
|
|
|
if (prefix_bloom_ &&
|
|
|
|
!prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) {
|
|
|
|
// iter is null if prefix bloom says the key does not exist
|
|
|
|
} else {
|
2014-02-11 18:46:30 +01:00
|
|
|
Saver saver;
|
|
|
|
saver.status = s;
|
|
|
|
saver.found_final_value = &found_final_value;
|
|
|
|
saver.merge_in_progress = &merge_in_progress;
|
|
|
|
saver.key = &key;
|
|
|
|
saver.value = value;
|
|
|
|
saver.status = s;
|
|
|
|
saver.mem = this;
|
|
|
|
saver.merge_context = &merge_context;
|
|
|
|
saver.merge_operator = options.merge_operator.get();
|
|
|
|
saver.logger = options.info_log.get();
|
|
|
|
saver.inplace_update_support = options.inplace_update_support;
|
|
|
|
saver.statistics = options.statistics.get();
|
|
|
|
table_->Get(key, &saver, SaveValue);
|
2011-06-22 04:36:45 +02:00
|
|
|
}
|
2013-03-21 23:59:47 +01:00
|
|
|
|
[RocksDB] [MergeOperator] The new Merge Interface! Uses merge sequences.
Summary:
Here are the major changes to the Merge Interface. It has been expanded
to handle cases where the MergeOperator is not associative. It does so by stacking
up merge operations while scanning through the key history (i.e.: during Get() or
Compaction), until a valid Put/Delete/end-of-history is encountered; it then
applies all of the merge operations in the correct sequence starting with the
base/sentinel value.
I have also introduced an "AssociativeMerge" function which allows the user to
take advantage of associative merge operations (such as in the case of counters).
The implementation will always attempt to merge the operations/operands themselves
together when they are encountered, and will resort to the "stacking" method if
and only if the "associative-merge" fails.
This implementation is conjectured to allow MergeOperator to handle the general
case, while still providing the user with the ability to take advantage of certain
efficiencies in their own merge-operator / data-structure.
NOTE: This is a preliminary diff. This must still go through a lot of review,
revision, and testing. Feedback welcome!
Test Plan:
-This is a preliminary diff. I have only just begun testing/debugging it.
-I will be testing this with the existing MergeOperator use-cases and unit-tests
(counters, string-append, and redis-lists)
-I will be "desk-checking" and walking through the code with the help gdb.
-I will find a way of stress-testing the new interface / implementation using
db_bench, db_test, merge_test, and/or db_stress.
-I will ensure that my tests cover all cases: Get-Memtable,
Get-Immutable-Memtable, Get-from-Disk, Iterator-Range-Scan, Flush-Memtable-to-L0,
Compaction-L0-L1, Compaction-Ln-L(n+1), Put/Delete found, Put/Delete not-found,
end-of-history, end-of-file, etc.
-A lot of feedback from the reviewers.
Reviewers: haobo, dhruba, zshao, emayanke
Reviewed By: haobo
CC: leveldb
Differential Revision: https://reviews.facebook.net/D11499
2013-08-06 05:14:32 +02:00
|
|
|
// No change to value, since we have not yet found a Put/Delete
|
2013-11-18 20:32:54 +01:00
|
|
|
if (!found_final_value && merge_in_progress) {
|
2013-03-21 23:59:47 +01:00
|
|
|
*s = Status::MergeInProgress("");
|
|
|
|
}
|
2013-11-18 20:32:54 +01:00
|
|
|
BumpPerfTime(&perf_context.get_from_memtable_time, &memtable_get_timer);
|
|
|
|
BumpPerfCount(&perf_context.get_from_memtable_count);
|
|
|
|
return found_final_value;
|
2011-06-22 04:36:45 +02:00
|
|
|
}
|
|
|
|
|
2014-01-14 16:55:16 +01:00
|
|
|
void MemTable::Update(SequenceNumber seq,
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
const Slice& key,
|
|
|
|
const Slice& value) {
|
|
|
|
LookupKey lkey(key, seq);
|
2013-11-27 23:27:02 +01:00
|
|
|
Slice mem_key = lkey.memtable_key();
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
|
2014-01-16 03:17:58 +01:00
|
|
|
std::unique_ptr<MemTableRep::Iterator> iter(
|
2013-12-03 20:17:58 +01:00
|
|
|
table_->GetIterator(lkey.user_key()));
|
2014-01-25 02:50:59 +01:00
|
|
|
iter->Seek(lkey.internal_key(), mem_key.data());
|
2013-11-27 23:27:02 +01:00
|
|
|
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
if (iter->Valid()) {
|
|
|
|
// entry format is:
|
2014-01-14 16:55:16 +01:00
|
|
|
// key_length varint32
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
// userkey char[klength-8]
|
|
|
|
// tag uint64
|
|
|
|
// vlength varint32
|
|
|
|
// value char[vlength]
|
|
|
|
// Check that it belongs to same user key. We do not check the
|
|
|
|
// sequence number since the Seek() call above should have skipped
|
|
|
|
// all entries with overly large sequence numbers.
|
|
|
|
const char* entry = iter->key();
|
2014-01-17 21:22:39 +01:00
|
|
|
uint32_t key_length = 0;
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
|
|
|
|
if (comparator_.comparator.user_comparator()->Compare(
|
|
|
|
Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
|
|
|
|
// Correct user key
|
|
|
|
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
|
|
|
|
switch (static_cast<ValueType>(tag & 0xff)) {
|
|
|
|
case kTypeValue: {
|
2014-01-14 16:55:16 +01:00
|
|
|
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
2014-01-17 00:11:19 +01:00
|
|
|
uint32_t prev_size = prev_value.size();
|
|
|
|
uint32_t new_size = value.size();
|
2014-01-14 16:55:16 +01:00
|
|
|
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
2014-01-17 00:11:19 +01:00
|
|
|
// Update value, if new value size <= previous value size
|
|
|
|
if (new_size <= prev_size ) {
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
2014-01-17 00:11:19 +01:00
|
|
|
new_size);
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
WriteLock wl(GetLock(lkey.user_key()));
|
|
|
|
memcpy(p, value.data(), value.size());
|
2014-02-03 22:48:30 +01:00
|
|
|
assert((unsigned)((p + value.size()) - entry) ==
|
|
|
|
(unsigned)(VarintLength(key_length) + key_length +
|
|
|
|
VarintLength(value.size()) + value.size()));
|
2014-01-14 16:55:16 +01:00
|
|
|
return;
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
// If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData
|
2014-01-14 16:55:16 +01:00
|
|
|
// we don't have enough space for update inplace
|
|
|
|
Add(seq, kTypeValue, key, value);
|
|
|
|
return;
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-01-14 16:55:16 +01:00
|
|
|
// key doesn't exist
|
|
|
|
Add(seq, kTypeValue, key, value);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool MemTable::UpdateCallback(SequenceNumber seq,
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
2014-01-17 00:11:19 +01:00
|
|
|
const Slice& key,
|
|
|
|
const Slice& delta,
|
|
|
|
const Options& options) {
|
2014-01-14 16:55:16 +01:00
|
|
|
LookupKey lkey(key, seq);
|
|
|
|
Slice memkey = lkey.memtable_key();
|
|
|
|
|
|
|
|
std::shared_ptr<MemTableRep::Iterator> iter(
|
|
|
|
table_->GetIterator(lkey.user_key()));
|
2014-01-25 02:50:59 +01:00
|
|
|
iter->Seek(lkey.internal_key(), memkey.data());
|
2014-01-14 16:55:16 +01:00
|
|
|
|
|
|
|
if (iter->Valid()) {
|
|
|
|
// entry format is:
|
|
|
|
// key_length varint32
|
|
|
|
// userkey char[klength-8]
|
|
|
|
// tag uint64
|
|
|
|
// vlength varint32
|
|
|
|
// value char[vlength]
|
|
|
|
// Check that it belongs to same user key. We do not check the
|
|
|
|
// sequence number since the Seek() call above should have skipped
|
|
|
|
// all entries with overly large sequence numbers.
|
|
|
|
const char* entry = iter->key();
|
2014-01-17 21:22:39 +01:00
|
|
|
uint32_t key_length = 0;
|
2014-01-14 16:55:16 +01:00
|
|
|
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
|
|
|
|
if (comparator_.comparator.user_comparator()->Compare(
|
|
|
|
Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
|
|
|
|
// Correct user key
|
|
|
|
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
|
|
|
|
switch (static_cast<ValueType>(tag & 0xff)) {
|
|
|
|
case kTypeValue: {
|
|
|
|
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
2014-01-17 00:11:19 +01:00
|
|
|
uint32_t prev_size = prev_value.size();
|
|
|
|
|
|
|
|
char* prev_buffer = const_cast<char*>(prev_value.data());
|
|
|
|
uint32_t new_prev_size = prev_size;
|
2014-01-14 16:55:16 +01:00
|
|
|
|
|
|
|
std::string str_value;
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
2014-01-17 00:11:19 +01:00
|
|
|
WriteLock wl(GetLock(lkey.user_key()));
|
|
|
|
auto status = options.inplace_callback(prev_buffer, &new_prev_size,
|
|
|
|
delta, &str_value);
|
|
|
|
if (status == UpdateStatus::UPDATED_INPLACE) {
|
2014-01-14 16:55:16 +01:00
|
|
|
// Value already updated by callback.
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
2014-01-17 00:11:19 +01:00
|
|
|
assert(new_prev_size <= prev_size);
|
|
|
|
if (new_prev_size < prev_size) {
|
|
|
|
// overwrite the new prev_size
|
|
|
|
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
|
|
|
|
new_prev_size);
|
|
|
|
if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
|
|
|
|
// shift the value buffer as well.
|
|
|
|
memcpy(p, prev_buffer, new_prev_size);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED);
|
2014-03-13 00:40:14 +01:00
|
|
|
should_flush_ = ShouldFlushNow();
|
2014-01-14 16:55:16 +01:00
|
|
|
return true;
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
2014-01-17 00:11:19 +01:00
|
|
|
} else if (status == UpdateStatus::UPDATED) {
|
|
|
|
Add(seq, kTypeValue, key, Slice(str_value));
|
|
|
|
RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN);
|
2014-03-13 00:40:14 +01:00
|
|
|
should_flush_ = ShouldFlushNow();
|
2014-01-14 16:55:16 +01:00
|
|
|
return true;
|
Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.
Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
2. Generate a new buffer indicating merged value. Returns 2.
3. Fails to do either of above, based on whatever application logic. Returns 0.
Test Plan: Just make all for now. I'm adding another unit test to test each scenario.
Reviewers: dhruba, haobo
Reviewed By: haobo
CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo
Differential Revision: https://reviews.facebook.net/D15195
2014-01-17 00:11:19 +01:00
|
|
|
} else if (status == UpdateStatus::UPDATE_FAILED) {
|
|
|
|
// No action required. Return.
|
2014-03-13 00:40:14 +01:00
|
|
|
should_flush_ = ShouldFlushNow();
|
2014-01-14 16:55:16 +01:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// If the latest value is not kTypeValue
|
|
|
|
// or key doesn't exist
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
2013-08-19 23:12:47 +02:00
|
|
|
return false;
|
|
|
|
}
|
2014-01-11 02:33:56 +01:00
|
|
|
|
|
|
|
size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
|
|
|
|
Slice memkey = key.memtable_key();
|
|
|
|
|
|
|
|
// A total ordered iterator is costly for some memtablerep (prefix aware
|
|
|
|
// reps). By passing in the user key, we allow efficient iterator creation.
|
|
|
|
// The iterator only needs to be ordered within the same user key.
|
2014-01-16 03:17:58 +01:00
|
|
|
std::unique_ptr<MemTableRep::Iterator> iter(
|
|
|
|
table_->GetIterator(key.user_key()));
|
2014-01-25 02:50:59 +01:00
|
|
|
iter->Seek(key.internal_key(), memkey.data());
|
2014-01-11 02:33:56 +01:00
|
|
|
|
|
|
|
size_t num_successive_merges = 0;
|
|
|
|
|
|
|
|
for (; iter->Valid(); iter->Next()) {
|
|
|
|
const char* entry = iter->key();
|
2014-01-17 21:22:39 +01:00
|
|
|
uint32_t key_length = 0;
|
2014-01-11 02:33:56 +01:00
|
|
|
const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
|
2014-03-14 17:54:23 +01:00
|
|
|
if (comparator_.comparator.user_comparator()->Compare(
|
|
|
|
Slice(iter_key_ptr, key_length - 8), key.user_key()) != 0) {
|
2014-01-11 02:33:56 +01:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
|
|
|
|
if (static_cast<ValueType>(tag & 0xff) != kTypeMerge) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
++num_successive_merges;
|
|
|
|
}
|
|
|
|
|
|
|
|
return num_successive_merges;
|
|
|
|
}
|
|
|
|
|
2014-02-11 18:46:30 +01:00
|
|
|
void MemTableRep::Get(const LookupKey& k, void* callback_args,
|
|
|
|
bool (*callback_func)(void* arg, const char* entry)) {
|
|
|
|
auto iter = GetIterator(k.user_key());
|
|
|
|
for (iter->Seek(k.internal_key(), k.memtable_key().data());
|
|
|
|
iter->Valid() && callback_func(callback_args, iter->key());
|
|
|
|
iter->Next()) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-10-04 06:49:15 +02:00
|
|
|
} // namespace rocksdb
|