rocksdb/db/memtable.h
Nathan Bronson 7d87f02799 support for concurrent adds to memtable
Summary:
This diff adds support for concurrent adds to the skiplist memtable
implementations.  Memory allocation is made thread-safe by the addition of
a spinlock, with small per-core buffers to avoid contention.  Concurrent
memtable writes are made via an additional method and don't impose a
performance overhead on the non-concurrent case, so parallelism can be
selected on a per-batch basis.

Write thread synchronization is an increasing bottleneck for higher levels
of concurrency, so this diff adds --enable_write_thread_adaptive_yield
(default off).  This feature causes threads joining a write batch
group to spin for a short time (default 100 usec) using sched_yield,
rather than going to sleep on a mutex.  If the timing of the yield calls
indicates that another thread has actually run during the yield then
spinning is avoided.  This option improves performance for concurrent
situations even without parallel adds, although it has the potential to
increase CPU usage (and the heuristic adaptation is not yet mature).

Parallel writes are not currently compatible with
inplace updates, update callbacks, or delete filtering.
Enable it with --allow_concurrent_memtable_write (and
--enable_write_thread_adaptive_yield).  Parallel memtable writes
are performance neutral when there is no actual parallelism, and in
my experiments (SSD server-class Linux and varying contention and key
sizes for fillrandom) they are always a performance win when there is
more than one thread.

Statistics are updated earlier in the write path, dropping the number
of DB mutex acquisitions from 2 to 1 for almost all cases.

This diff was motivated and inspired by Yahoo's cLSM work.  It is more
conservative than cLSM: RocksDB's write batch group leader role is
preserved (along with all of the existing flush and write throttling
logic) and concurrent writers are blocked until all memtable insertions
have completed and the sequence number has been advanced, to preserve
linearizability.

My test config is "db_bench -benchmarks=fillrandom -threads=$T
-batch_size=1 -memtablerep=skip_list -value_size=100 --num=1000000/$T
-level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999
-disable_auto_compactions --max_write_buffer_number=8
-max_background_flushes=8 --disable_wal --write_buffer_size=160000000
--block_size=16384 --allow_concurrent_memtable_write" on a two-socket
Xeon E5-2660 @ 2.2Ghz with lots of memory and an SSD hard drive.  With 1
thread I get ~440Kops/sec.  Peak performance for 1 socket (numactl
-N1) is slightly more than 1Mops/sec, at 16 threads.  Peak performance
across both sockets happens at 30 threads, and is ~900Kops/sec, although
with fewer threads there is less performance loss when the system has
background work.

Test Plan:
1. concurrent stress tests for InlineSkipList and DynamicBloom
2. make clean; make check
3. make clean; DISABLE_JEMALLOC=1 make valgrind_check; valgrind db_bench
4. make clean; COMPILE_WITH_TSAN=1 make all check; db_bench
5. make clean; COMPILE_WITH_ASAN=1 make all check; db_bench
6. make clean; OPT=-DROCKSDB_LITE make check
7. verify no perf regressions when disabled

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: MarkCallaghan, IslamAbdelRahman, anthony, yhchiang, rven, sdong, guyg8, kradhakrishnan, dhruba

Differential Revision: https://reviews.facebook.net/D50589
2015-12-25 11:03:40 -08:00

369 lines
14 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "db/skiplist.h"
#include "db/version_edit.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/immutable_options.h"
#include "db/memtable_allocator.h"
#include "util/concurrent_arena.h"
#include "util/dynamic_bloom.h"
#include "util/instrumented_mutex.h"
#include "util/mutable_cf_options.h"
namespace rocksdb {
class Mutex;
class MemTableIterator;
class MergeContext;
class WriteBuffer;
class InternalIterator;
struct MemTableOptions {
explicit MemTableOptions(
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);
size_t write_buffer_size;
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
uint32_t memtable_prefix_bloom_probes;
size_t memtable_prefix_bloom_huge_page_tlb_size;
bool inplace_update_support;
size_t inplace_update_num_locks;
UpdateStatus (*inplace_callback)(char* existing_value,
uint32_t* existing_value_size,
Slice delta_value,
std::string* merged_value);
size_t max_successive_merges;
bool filter_deletes;
Statistics* statistics;
MergeOperator* merge_operator;
Logger* info_log;
};
// Note: Many of the methods in this class have comments indicating that
// external synchromization is required as these methods are not thread-safe.
// It is up to higher layers of code to decide how to prevent concurrent
// invokation of these methods. This is usually done by acquiring either
// the db mutex or the single writer thread.
//
// Some of these methods are documented to only require external
// synchronization if this memtable is immutable. Calling MarkImmutable() is
// not sufficient to guarantee immutability. It is up to higher layers of
// code to determine if this MemTable can still be modified by other threads.
// Eg: The Superversion stores a pointer to the current MemTable (that can
// be modified) and a separate list of the MemTables that can no longer be
// written to (aka the 'immutable memtables').
class MemTable {
public:
struct KeyComparator : public MemTableRep::KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
virtual int operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const override;
virtual int operator()(const char* prefix_len_key,
const Slice& key) const override;
};
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
//
// earliest_seq should be the current SequenceNumber in the db such that any
// key inserted into this memtable will have an equal or larger seq number.
// (When a db is first created, the earliest sequence number will be 0).
// If the earliest sequence number is not known, kMaxSequenceNumber may be
// used, but this may prevent some transactions from succeeding until the
// first key is inserted into the memtable.
explicit MemTable(const InternalKeyComparator& comparator,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBuffer* write_buffer, SequenceNumber earliest_seq);
// Do not delete this MemTable unless Unref() indicates it not in use.
~MemTable();
// Increase reference count.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void Ref() { ++refs_; }
// Drop reference count.
// If the refcount goes to zero return this memtable, otherwise return null.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
MemTable* Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
return this;
}
return nullptr;
}
// Returns an estimate of the number of bytes of data in use by this
// data structure.
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
size_t ApproximateMemoryUsage();
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldScheduleFlush() const {
return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
}
// Returns true if a flush should be scheduled and the caller should
// be the one to schedule it
bool MarkFlushScheduled() {
auto before = FLUSH_REQUESTED;
return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
// Return an iterator that yields the contents of the memtable.
//
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live. The keys returned by this
// iterator are internal keys encoded by AppendInternalKey in the
// db/dbformat.{h,cc} module.
//
// By default, it returns an iterator for prefix seek if prefix_extractor
// is configured in Options.
// arena: If not null, the arena needs to be used to allocate the Iterator.
// Calling ~Iterator of the iterator will destroy all the states but
// those allocated in arena.
InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
//
// REQUIRES: if allow_concurrent = false, external synchronization to prevent
// simultaneous operations on the same MemTable.
void Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, bool allow_concurrent = false);
// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// If memtable contains Merge operation as the most recent entry for a key,
// and the merge process does not stop (not reaching a value or delete),
// prepend the current merge operand to *operands.
// store MergeInProgress in s, and return false.
// Else, return false.
// If any operation was found, its most recent sequence number
// will be stored in *seq on success (regardless of whether true/false is
// returned). Otherwise, *seq will be set to kMaxSequenceNumber.
// On success, *s may be set to OK, NotFound, or MergeInProgress. Any other
// status returned indicates a corruption or other unexpected error.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq);
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context) {
SequenceNumber seq;
return Get(key, value, s, merge_context, &seq);
}
// Attempts to update the new_value inplace, else does normal Add
// Pseudocode
// if key exists in current memtable && prev_value is of type kTypeValue
// if new sizeof(new_value) <= sizeof(prev_value)
// update inplace
// else add(key, new_value)
// else add(key, new_value)
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void Update(SequenceNumber seq,
const Slice& key,
const Slice& value);
// If prev_value for key exists, attempts to update it inplace.
// else returns false
// Pseudocode
// if key exists in current memtable && prev_value is of type kTypeValue
// new_value = delta(prev_value)
// if sizeof(new_value) <= sizeof(prev_value)
// update inplace
// else add(key, new_value)
// else return false
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
bool UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta);
// Returns the number of successive merge entries starting from the newest
// entry for the key up to the last non-merge entry or last entry for the
// key in the memtable.
size_t CountSuccessiveMergeEntries(const LookupKey& key);
// Get total number of entries in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
uint64_t num_entries() const {
return num_entries_.load(std::memory_order_relaxed);
}
// Get total number of deletes in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
uint64_t num_deletes() const {
return num_deletes_.load(std::memory_order_relaxed);
}
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
// Returns if there is no entry inserted to the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
bool IsEmpty() const { return first_seqno_ == 0; }
// Returns the sequence number of the first element that was inserted
// into the memtable.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
SequenceNumber GetFirstSequenceNumber() {
return first_seqno_.load(std::memory_order_relaxed);
}
// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into this
// memtable. It can then be assumed that any write with a larger(or equal)
// sequence number will be present in this memtable or a later memtable.
//
// If the earliest sequence number could not be determined,
// kMaxSequenceNumber will be returned.
SequenceNumber GetEarliestSequenceNumber() {
return earliest_seqno_.load(std::memory_order_relaxed);
}
// Returns the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
uint64_t GetNextLogNumber() { return mem_next_logfile_number_; }
// Sets the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
// Notify the underlying storage that no more items will be added.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
// After MarkImmutable() is called, you should not attempt to
// write anything to this MemTable(). (Ie. do not call Add() or Update()).
void MarkImmutable() {
table_->MarkReadOnly();
allocator_.DoneAllocating();
}
// return true if the current MemTableRep supports merge operator.
bool IsMergeOperatorSupported() const {
return table_->IsMergeOperatorSupported();
}
// return true if the current MemTableRep supports snapshots.
// inplace update prevents snapshots,
bool IsSnapshotSupported() const {
return table_->IsSnapshotSupported() && !moptions_.inplace_update_support;
}
uint64_t ApproximateSize(const Slice& start_ikey, const Slice& end_ikey);
// Get the lock associated for the key
port::RWMutex* GetLock(const Slice& key);
const InternalKeyComparator& GetInternalKeyComparator() const {
return comparator_.comparator;
}
const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
private:
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
friend class MemTableIterator;
friend class MemTableBackwardIterator;
friend class MemTableList;
KeyComparator comparator_;
const MemTableOptions moptions_;
int refs_;
const size_t kArenaBlockSize;
ConcurrentArena arena_;
MemTableAllocator allocator_;
unique_ptr<MemTableRep> table_;
// Total data size of all data inserted
std::atomic<uint64_t> data_size_;
std::atomic<uint64_t> num_entries_;
std::atomic<uint64_t> num_deletes_;
// These are used to manage memtable flushes to storage
bool flush_in_progress_; // started the flush
bool flush_completed_; // finished the flush
uint64_t file_number_; // filled up after flush is complete
// The updates to be applied to the transaction log when this
// memtable is flushed to storage.
VersionEdit edit_;
// The sequence number of the kv that was inserted first
std::atomic<SequenceNumber> first_seqno_;
// The db sequence number at the time of creation or kMaxSequenceNumber
// if not set.
std::atomic<SequenceNumber> earliest_seqno_;
// The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_;
// rw locks for inplace updates
std::vector<port::RWMutex> locks_;
const SliceTransform* const prefix_extractor_;
std::unique_ptr<DynamicBloom> prefix_bloom_;
std::atomic<FlushStateEnum> flush_state_;
Env* env_;
// Returns a heuristic flush decision
bool ShouldFlushNow() const;
// Updates flush_state_ using ShouldFlushNow()
void UpdateFlushState();
// No copying allowed
MemTable(const MemTable&);
MemTable& operator=(const MemTable&);
};
extern const char* EncodeKey(std::string* scratch, const Slice& target);
} // namespace rocksdb