rocksdb/db/internal_stats.h
Nathan Bronson 7d87f02799 support for concurrent adds to memtable
Summary:
This diff adds support for concurrent adds to the skiplist memtable
implementations.  Memory allocation is made thread-safe by the addition of
a spinlock, with small per-core buffers to avoid contention.  Concurrent
memtable writes are made via an additional method and don't impose a
performance overhead on the non-concurrent case, so parallelism can be
selected on a per-batch basis.

Write thread synchronization is an increasing bottleneck for higher levels
of concurrency, so this diff adds --enable_write_thread_adaptive_yield
(default off).  This feature causes threads joining a write batch
group to spin for a short time (default 100 usec) using sched_yield,
rather than going to sleep on a mutex.  If the timing of the yield calls
indicates that another thread has actually run during the yield then
spinning is avoided.  This option improves performance for concurrent
situations even without parallel adds, although it has the potential to
increase CPU usage (and the heuristic adaptation is not yet mature).

Parallel writes are not currently compatible with
inplace updates, update callbacks, or delete filtering.
Enable it with --allow_concurrent_memtable_write (and
--enable_write_thread_adaptive_yield).  Parallel memtable writes
are performance neutral when there is no actual parallelism, and in
my experiments (SSD server-class Linux and varying contention and key
sizes for fillrandom) they are always a performance win when there is
more than one thread.

Statistics are updated earlier in the write path, dropping the number
of DB mutex acquisitions from 2 to 1 for almost all cases.

This diff was motivated and inspired by Yahoo's cLSM work.  It is more
conservative than cLSM: RocksDB's write batch group leader role is
preserved (along with all of the existing flush and write throttling
logic) and concurrent writers are blocked until all memtable insertions
have completed and the sequence number has been advanced, to preserve
linearizability.

My test config is "db_bench -benchmarks=fillrandom -threads=$T
-batch_size=1 -memtablerep=skip_list -value_size=100 --num=1000000/$T
-level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999
-disable_auto_compactions --max_write_buffer_number=8
-max_background_flushes=8 --disable_wal --write_buffer_size=160000000
--block_size=16384 --allow_concurrent_memtable_write" on a two-socket
Xeon E5-2660 @ 2.2Ghz with lots of memory and an SSD hard drive.  With 1
thread I get ~440Kops/sec.  Peak performance for 1 socket (numactl
-N1) is slightly more than 1Mops/sec, at 16 threads.  Peak performance
across both sockets happens at 30 threads, and is ~900Kops/sec, although
with fewer threads there is less performance loss when the system has
background work.

Test Plan:
1. concurrent stress tests for InlineSkipList and DynamicBloom
2. make clean; make check
3. make clean; DISABLE_JEMALLOC=1 make valgrind_check; valgrind db_bench
4. make clean; COMPILE_WITH_TSAN=1 make all check; db_bench
5. make clean; COMPILE_WITH_ASAN=1 make all check; db_bench
6. make clean; OPT=-DROCKSDB_LITE make check
7. verify no perf regressions when disabled

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: MarkCallaghan, IslamAbdelRahman, anthony, yhchiang, rven, sdong, guyg8, kradhakrishnan, dhruba

Differential Revision: https://reviews.facebook.net/D50589
2015-12-25 11:03:40 -08:00

417 lines
15 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
#pragma once
#include "db/version_set.h"
#include <vector>
#include <string>
class ColumnFamilyData;
namespace rocksdb {
class MemTableList;
class DBImpl;
// IMPORTANT: If you add a new property here, also add it to the list in
// include/rocksdb/db.h
enum DBPropertyType : uint32_t {
kUnknown,
kNumFilesAtLevel, // Number of files at a specific level
kLevelStats, // Return number of files and total sizes of each level
kCFStats, // Return general statitistics of CF
kDBStats, // Return general statitistics of DB
kStats, // Return general statitistics of both DB and CF
kSsTables, // Return a human readable string of current SST files
kStartIntTypes, // ---- Dummy value to indicate the start of integer values
kNumImmutableMemTable, // Return number of immutable mem tables that
// have not been flushed.
kNumImmutableMemTableFlushed, // Return number of immutable mem tables
// in memory that have already been flushed
kMemtableFlushPending, // Return 1 if mem table flushing is pending,
// otherwise 0.
kNumRunningFlushes, // Return the number of currently running flushes.
kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0.
kNumRunningCompactions, // Return the number of currently running
// compactions.
kBackgroundErrors, // Return accumulated background errors encountered.
kCurSizeActiveMemTable, // Return current size of the active memtable
kCurSizeAllMemTables, // Return current size of unflushed
// (active + immutable) memtables
kSizeAllMemTables, // Return current size of all (active + immutable
// + pinned) memtables
kNumEntriesInMutableMemtable, // Return number of deletes in the mutable
// memtable.
kNumEntriesInImmutableMemtable, // Return sum of number of entries in all
// the immutable mem tables.
kNumDeletesInMutableMemtable, // Return number of deletion entries in the
// mutable memtable.
kNumDeletesInImmutableMemtable, // Return the total number of deletion
// entries in all the immutable mem tables.
kEstimatedNumKeys, // Estimated total number of keys in the database.
kEstimatedUsageByTableReaders, // Estimated memory by table readers.
kIsFileDeletionEnabled, // Equals disable_delete_obsolete_files_,
// 0 means file deletions enabled
kNumSnapshots, // Number of snapshots in the system
kOldestSnapshotTime, // Unix timestamp of the first snapshot
kNumLiveVersions,
kEstimateLiveDataSize, // Estimated amount of live data in bytes
kTotalSstFilesSize, // Total size of all sst files.
kBaseLevel, // The level that L0 data is compacted to
kEstimatePendingCompactionBytes, // Estimated bytes to compaction
kAggregatedTableProperties, // Return a string that contains the aggregated
// table properties.
kAggregatedTablePropertiesAtLevel, // Return a string that contains the
// aggregated
// table properties at the specified level.
};
extern DBPropertyType GetPropertyType(const Slice& property,
bool* is_int_property,
bool* need_out_of_mutex);
#ifndef ROCKSDB_LITE
class InternalStats {
public:
enum InternalCFStatsType {
LEVEL0_SLOWDOWN_TOTAL,
LEVEL0_SLOWDOWN_WITH_COMPACTION,
MEMTABLE_COMPACTION,
MEMTABLE_SLOWDOWN,
LEVEL0_NUM_FILES_TOTAL,
LEVEL0_NUM_FILES_WITH_COMPACTION,
SOFT_PENDING_COMPACTION_BYTES_LIMIT,
HARD_PENDING_COMPACTION_BYTES_LIMIT,
WRITE_STALLS_ENUM_MAX,
BYTES_FLUSHED,
INTERNAL_CF_STATS_ENUM_MAX,
};
enum InternalDBStatsType {
WAL_FILE_BYTES,
WAL_FILE_SYNCED,
BYTES_WRITTEN,
NUMBER_KEYS_WRITTEN,
WRITE_DONE_BY_OTHER,
WRITE_DONE_BY_SELF,
WRITE_WITH_WAL,
WRITE_STALL_MICROS,
INTERNAL_DB_STATS_ENUM_MAX,
};
InternalStats(int num_levels, Env* env, ColumnFamilyData* cfd)
: db_stats_{},
cf_stats_value_{},
cf_stats_count_{},
comp_stats_(num_levels),
file_read_latency_(num_levels),
bg_error_count_(0),
number_levels_(num_levels),
env_(env),
cfd_(cfd),
started_at_(env->NowMicros()) {}
// Per level compaction stats. comp_stats_[level] stores the stats for
// compactions that produced data for the specified "level".
struct CompactionStats {
uint64_t micros;
// The number of bytes read from all non-output levels
uint64_t bytes_read_non_output_levels;
// The number of bytes read from the compaction output level.
uint64_t bytes_read_output_level;
// Total number of bytes written during compaction
uint64_t bytes_written;
// Total number of bytes moved to the output level
uint64_t bytes_moved;
// The number of compaction input files in all non-output levels.
int num_input_files_in_non_output_levels;
// The number of compaction input files in the output level.
int num_input_files_in_output_level;
// The number of compaction output files.
int num_output_files;
// Total incoming entries during compaction between levels N and N+1
uint64_t num_input_records;
// Accumulated diff number of entries
// (num input entries - num output entires) for compaction levels N and N+1
uint64_t num_dropped_records;
// Number of compactions done
int count;
explicit CompactionStats(int _count = 0)
: micros(0),
bytes_read_non_output_levels(0),
bytes_read_output_level(0),
bytes_written(0),
bytes_moved(0),
num_input_files_in_non_output_levels(0),
num_input_files_in_output_level(0),
num_output_files(0),
num_input_records(0),
num_dropped_records(0),
count(_count) {}
explicit CompactionStats(const CompactionStats& c)
: micros(c.micros),
bytes_read_non_output_levels(c.bytes_read_non_output_levels),
bytes_read_output_level(c.bytes_read_output_level),
bytes_written(c.bytes_written),
bytes_moved(c.bytes_moved),
num_input_files_in_non_output_levels(
c.num_input_files_in_non_output_levels),
num_input_files_in_output_level(
c.num_input_files_in_output_level),
num_output_files(c.num_output_files),
num_input_records(c.num_input_records),
num_dropped_records(c.num_dropped_records),
count(c.count) {}
void Add(const CompactionStats& c) {
this->micros += c.micros;
this->bytes_read_non_output_levels += c.bytes_read_non_output_levels;
this->bytes_read_output_level += c.bytes_read_output_level;
this->bytes_written += c.bytes_written;
this->bytes_moved += c.bytes_moved;
this->num_input_files_in_non_output_levels +=
c.num_input_files_in_non_output_levels;
this->num_input_files_in_output_level +=
c.num_input_files_in_output_level;
this->num_output_files += c.num_output_files;
this->num_input_records += c.num_input_records;
this->num_dropped_records += c.num_dropped_records;
this->count += c.count;
}
void Subtract(const CompactionStats& c) {
this->micros -= c.micros;
this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels;
this->bytes_read_output_level -= c.bytes_read_output_level;
this->bytes_written -= c.bytes_written;
this->bytes_moved -= c.bytes_moved;
this->num_input_files_in_non_output_levels -=
c.num_input_files_in_non_output_levels;
this->num_input_files_in_output_level -=
c.num_input_files_in_output_level;
this->num_output_files -= c.num_output_files;
this->num_input_records -= c.num_input_records;
this->num_dropped_records -= c.num_dropped_records;
this->count -= c.count;
}
};
void AddCompactionStats(int level, const CompactionStats& stats) {
comp_stats_[level].Add(stats);
}
void IncBytesMoved(int level, uint64_t amount) {
comp_stats_[level].bytes_moved += amount;
}
void AddCFStats(InternalCFStatsType type, uint64_t value) {
cf_stats_value_[type] += value;
++cf_stats_count_[type];
}
void AddDBStats(InternalDBStatsType type, uint64_t value) {
auto& v = db_stats_[type];
v.store(v.load(std::memory_order_relaxed) + value,
std::memory_order_relaxed);
}
uint64_t GetDBStats(InternalDBStatsType type) {
return db_stats_[type].load(std::memory_order_relaxed);
}
HistogramImpl* GetFileReadHist(int level) {
return &file_read_latency_[level];
}
uint64_t GetBackgroundErrorCount() const { return bg_error_count_; }
uint64_t BumpAndGetBackgroundErrorCount() { return ++bg_error_count_; }
bool GetStringProperty(DBPropertyType property_type, const Slice& property,
std::string* value);
bool GetIntProperty(DBPropertyType property_type, uint64_t* value,
DBImpl* db) const;
bool GetIntPropertyOutOfMutex(DBPropertyType property_type, Version* version,
uint64_t* value) const;
private:
void DumpDBStats(std::string* value);
void DumpCFStats(std::string* value);
// Per-DB stats
std::atomic<uint64_t> db_stats_[INTERNAL_DB_STATS_ENUM_MAX];
// Per-ColumnFamily stats
uint64_t cf_stats_value_[INTERNAL_CF_STATS_ENUM_MAX];
uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX];
// Per-ColumnFamily/level compaction stats
std::vector<CompactionStats> comp_stats_;
std::vector<HistogramImpl> file_read_latency_;
// Used to compute per-interval statistics
struct CFStatsSnapshot {
// ColumnFamily-level stats
CompactionStats comp_stats;
uint64_t ingest_bytes; // Bytes written to L0
uint64_t stall_count; // Stall count
CFStatsSnapshot()
: comp_stats(0),
ingest_bytes(0),
stall_count(0) {}
} cf_stats_snapshot_;
struct DBStatsSnapshot {
// DB-level stats
uint64_t ingest_bytes; // Bytes written by user
uint64_t wal_bytes; // Bytes written to WAL
uint64_t wal_synced; // Number of times WAL is synced
uint64_t write_with_wal; // Number of writes that request WAL
// These count the number of writes processed by the calling thread or
// another thread.
uint64_t write_other;
uint64_t write_self;
// Stats from compaction jobs - bytes written, bytes read, duration.
uint64_t compact_bytes_write;
uint64_t compact_bytes_read;
uint64_t compact_micros;
// Total number of keys written. write_self and write_other measure number
// of write requests written, Each of the write request can contain updates
// to multiple keys. num_keys_written is total number of keys updated by all
// those writes.
uint64_t num_keys_written;
// Total time writes delayed by stalls.
uint64_t write_stall_micros;
double seconds_up;
DBStatsSnapshot()
: ingest_bytes(0),
wal_bytes(0),
wal_synced(0),
write_with_wal(0),
write_other(0),
write_self(0),
compact_bytes_write(0),
compact_bytes_read(0),
compact_micros(0),
num_keys_written(0),
write_stall_micros(0),
seconds_up(0) {}
} db_stats_snapshot_;
// Total number of background errors encountered. Every time a flush task
// or compaction task fails, this counter is incremented. The failure can
// be caused by any possible reason, including file system errors, out of
// resources, or input file corruption. Failing when retrying the same flush
// or compaction will cause the counter to increase too.
uint64_t bg_error_count_;
const int number_levels_;
Env* env_;
ColumnFamilyData* cfd_;
const uint64_t started_at_;
};
#else
class InternalStats {
public:
enum InternalCFStatsType {
LEVEL0_SLOWDOWN_TOTAL,
LEVEL0_SLOWDOWN_WITH_COMPACTION,
MEMTABLE_COMPACTION,
MEMTABLE_SLOWDOWN,
LEVEL0_NUM_FILES_TOTAL,
LEVEL0_NUM_FILES_WITH_COMPACTION,
SOFT_PENDING_COMPACTION_BYTES_LIMIT,
HARD_PENDING_COMPACTION_BYTES_LIMIT,
WRITE_STALLS_ENUM_MAX,
BYTES_FLUSHED,
INTERNAL_CF_STATS_ENUM_MAX,
};
enum InternalDBStatsType {
WAL_FILE_BYTES,
WAL_FILE_SYNCED,
BYTES_WRITTEN,
NUMBER_KEYS_WRITTEN,
WRITE_DONE_BY_OTHER,
WRITE_DONE_BY_SELF,
WRITE_WITH_WAL,
WRITE_STALL_MICROS,
INTERNAL_DB_STATS_ENUM_MAX,
};
InternalStats(int num_levels, Env* env, ColumnFamilyData* cfd) {}
struct CompactionStats {
uint64_t micros;
uint64_t bytes_read_non_output_levels;
uint64_t bytes_read_output_level;
uint64_t bytes_written;
uint64_t bytes_moved;
int num_input_files_in_non_output_levels;
int num_input_files_in_output_level;
int num_output_files;
uint64_t num_input_records;
uint64_t num_dropped_records;
int count;
explicit CompactionStats(int _count = 0) {}
explicit CompactionStats(const CompactionStats& c) {}
void Add(const CompactionStats& c) {}
void Subtract(const CompactionStats& c) {}
};
void AddCompactionStats(int level, const CompactionStats& stats) {}
void IncBytesMoved(int level, uint64_t amount) {}
void AddCFStats(InternalCFStatsType type, uint64_t value) {}
void AddDBStats(InternalDBStatsType type, uint64_t value) {}
HistogramImpl* GetFileReadHist(int level) { return nullptr; }
uint64_t GetBackgroundErrorCount() const { return 0; }
uint64_t BumpAndGetBackgroundErrorCount() { return 0; }
bool GetStringProperty(DBPropertyType property_type, const Slice& property,
std::string* value) { return false; }
bool GetIntProperty(DBPropertyType property_type, uint64_t* value,
DBImpl* db) const { return false; }
bool GetIntPropertyOutOfMutex(DBPropertyType property_type, Version* version,
uint64_t* value) const { return false; }
};
#endif // !ROCKSDB_LITE
} // namespace rocksdb