Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
623774b6b5 | ||
|
d25f018223 | ||
|
61b95f999a | ||
|
c58a32b30d | ||
|
7c70ceed14 | ||
|
392d72739a | ||
|
95168f3514 | ||
|
b47bbbf768 | ||
|
ba98c17de6 | ||
|
8a33ede20f | ||
|
5023dc5991 | ||
|
bfb9496af0 | ||
|
d216050bd4 | ||
|
8c2f72a440 | ||
|
229640f88d | ||
|
c7f8ae9f17 | ||
|
5d44932358 | ||
|
436e4f1e4d |
15
HISTORY.md
15
HISTORY.md
@ -1,5 +1,18 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
## 6.21.3 (2021-07-19)
|
||||
### Bug Fixes
|
||||
* Fixed confusingly duplicated output in LOG for periodic stats ("DUMPING STATS"), including "Compaction Stats" and "File Read Latency Histogram By Level". (Included because the next bug fix built upon this one.)
|
||||
* Fixed block cache entry stat scans not to hold the DB mutex, which was a serious performance bug for tail latencies in TransactionDB and likely elsewhere.
|
||||
|
||||
## 6.21.2 (2021-06-14)
|
||||
### Bug Fixes
|
||||
* Fixed related but less common performance bugs in background gathering of block cache entry statistics.
|
||||
|
||||
## 6.21.1 (2021-06-08)
|
||||
### Bug Fixes
|
||||
* Fixed a performance bug in background gathering of block cache entry statistics, that could consume a lot of CPU when there are many column families with a shared block cache.
|
||||
|
||||
## 6.21.0 (2021-05-21)
|
||||
### Bug Fixes
|
||||
* Fixed a bug in handling file rename error in distributed/network file systems when the server succeeds but client returns error. The bug can cause CURRENT file to point to non-existing MANIFEST file, thus DB cannot be opened.
|
||||
* Fixed a bug where ingested files were written with incorrect boundary key metadata. In rare cases this could have led to a level's files being wrongly ordered and queries for the boundary keys returning wrong results.
|
||||
|
12
TARGETS
12
TARGETS
@ -10,7 +10,7 @@ load(":defs.bzl", "test_binary")
|
||||
|
||||
REPO_PATH = package_name() + "/"
|
||||
|
||||
ROCKSDB_COMPILER_FLAGS = [
|
||||
ROCKSDB_COMPILER_FLAGS_0 = [
|
||||
"-fno-builtin-memcmp",
|
||||
# Needed to compile in fbcode
|
||||
"-Wno-expansion-to-defined",
|
||||
@ -28,7 +28,7 @@ ROCKSDB_EXTERNAL_DEPS = [
|
||||
("zstd", None, "zstd"),
|
||||
]
|
||||
|
||||
ROCKSDB_OS_DEPS = [
|
||||
ROCKSDB_OS_DEPS_0 = [
|
||||
(
|
||||
"linux",
|
||||
["third-party//numa:numa", "third-party//liburing:uring", "third-party//tbb:tbb"],
|
||||
@ -39,7 +39,7 @@ ROCKSDB_OS_DEPS = [
|
||||
),
|
||||
]
|
||||
|
||||
ROCKSDB_OS_PREPROCESSOR_FLAGS = [
|
||||
ROCKSDB_OS_PREPROCESSOR_FLAGS_0 = [
|
||||
(
|
||||
"linux",
|
||||
[
|
||||
@ -107,18 +107,18 @@ is_opt_mode = build_mode.startswith("opt")
|
||||
|
||||
# -DNDEBUG is added by default in opt mode in fbcode. But adding it twice
|
||||
# doesn't harm and avoid forgetting to add it.
|
||||
ROCKSDB_COMPILER_FLAGS += (["-DNDEBUG"] if is_opt_mode else [])
|
||||
ROCKSDB_COMPILER_FLAGS = ROCKSDB_COMPILER_FLAGS_0 + (["-DNDEBUG"] if is_opt_mode else [])
|
||||
|
||||
sanitizer = read_config("fbcode", "sanitizer")
|
||||
|
||||
# Do not enable jemalloc if sanitizer presents. RocksDB will further detect
|
||||
# whether the binary is linked with jemalloc at runtime.
|
||||
ROCKSDB_OS_PREPROCESSOR_FLAGS += ([(
|
||||
ROCKSDB_OS_PREPROCESSOR_FLAGS = ROCKSDB_OS_PREPROCESSOR_FLAGS_0 + ([(
|
||||
"linux",
|
||||
["-DROCKSDB_JEMALLOC"],
|
||||
)] if sanitizer == "" else [])
|
||||
|
||||
ROCKSDB_OS_DEPS += ([(
|
||||
ROCKSDB_OS_DEPS = ROCKSDB_OS_DEPS_0 + ([(
|
||||
"linux",
|
||||
["third-party//jemalloc:headers"],
|
||||
)] if sanitizer == "" else [])
|
||||
|
@ -17,7 +17,7 @@ load(":defs.bzl", "test_binary")
|
||||
|
||||
REPO_PATH = package_name() + "/"
|
||||
|
||||
ROCKSDB_COMPILER_FLAGS = [
|
||||
ROCKSDB_COMPILER_FLAGS_0 = [
|
||||
"-fno-builtin-memcmp",
|
||||
# Needed to compile in fbcode
|
||||
"-Wno-expansion-to-defined",
|
||||
@ -35,7 +35,7 @@ ROCKSDB_EXTERNAL_DEPS = [
|
||||
("zstd", None, "zstd"),
|
||||
]
|
||||
|
||||
ROCKSDB_OS_DEPS = [
|
||||
ROCKSDB_OS_DEPS_0 = [
|
||||
(
|
||||
"linux",
|
||||
["third-party//numa:numa", "third-party//liburing:uring", "third-party//tbb:tbb"],
|
||||
@ -46,7 +46,7 @@ ROCKSDB_OS_DEPS = [
|
||||
),
|
||||
]
|
||||
|
||||
ROCKSDB_OS_PREPROCESSOR_FLAGS = [
|
||||
ROCKSDB_OS_PREPROCESSOR_FLAGS_0 = [
|
||||
(
|
||||
"linux",
|
||||
[
|
||||
@ -114,18 +114,18 @@ is_opt_mode = build_mode.startswith("opt")
|
||||
|
||||
# -DNDEBUG is added by default in opt mode in fbcode. But adding it twice
|
||||
# doesn't harm and avoid forgetting to add it.
|
||||
ROCKSDB_COMPILER_FLAGS += (["-DNDEBUG"] if is_opt_mode else [])
|
||||
ROCKSDB_COMPILER_FLAGS = ROCKSDB_COMPILER_FLAGS_0 + (["-DNDEBUG"] if is_opt_mode else [])
|
||||
|
||||
sanitizer = read_config("fbcode", "sanitizer")
|
||||
|
||||
# Do not enable jemalloc if sanitizer presents. RocksDB will further detect
|
||||
# whether the binary is linked with jemalloc at runtime.
|
||||
ROCKSDB_OS_PREPROCESSOR_FLAGS += ([(
|
||||
ROCKSDB_OS_PREPROCESSOR_FLAGS = ROCKSDB_OS_PREPROCESSOR_FLAGS_0 + ([(
|
||||
"linux",
|
||||
["-DROCKSDB_JEMALLOC"],
|
||||
)] if sanitizer == "" else [])
|
||||
|
||||
ROCKSDB_OS_DEPS += ([(
|
||||
ROCKSDB_OS_DEPS = ROCKSDB_OS_DEPS_0 + ([(
|
||||
"linux",
|
||||
["third-party//jemalloc:headers"],
|
||||
)] if sanitizer == "" else [])
|
||||
|
@ -69,10 +69,6 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
source "$PWD/build_tools/fbcode_config_platform007.sh"
|
||||
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_PLATFORM009" ]; then
|
||||
source "$PWD/build_tools/fbcode_config_platform009.sh"
|
||||
elif [ -z "$USE_CLANG" ]; then
|
||||
# Still use platform007 for gcc by default for build break on
|
||||
# some hosts.
|
||||
source "$PWD/build_tools/fbcode_config_platform007.sh"
|
||||
else
|
||||
source "$PWD/build_tools/fbcode_config_platform009.sh"
|
||||
fi
|
||||
|
@ -120,7 +120,7 @@ if [ -z "$USE_CLANG" ]; then
|
||||
CXX="$GCC_BASE/bin/g++"
|
||||
AR="$GCC_BASE/bin/gcc-ar"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold"
|
||||
CFLAGS+=" -B$BINUTILS"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
JEMALLOC=1
|
||||
@ -133,7 +133,7 @@ else
|
||||
|
||||
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
|
||||
CFLAGS+=" -B$BINUTILS -nostdinc -nostdlib"
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/9.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/9.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
@ -150,10 +150,11 @@ CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PR
|
||||
CXXFLAGS+=" $CFLAGS"
|
||||
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS $LIBURING_LIBS"
|
||||
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
|
||||
EXEC_LDFLAGS+=" -B$BINUTILS"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform009/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" $LIBUNWIND"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform009/lib"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=$GCC_BASE/lib64"
|
||||
# required by libtbb
|
||||
EXEC_LDFLAGS+=" -ldl"
|
||||
|
||||
|
64
cache/cache_entry_stats.h
vendored
64
cache/cache_entry_stats.h
vendored
@ -51,35 +51,56 @@ namespace ROCKSDB_NAMESPACE {
|
||||
template <class Stats>
|
||||
class CacheEntryStatsCollector {
|
||||
public:
|
||||
// Gathers stats and saves results into `stats`
|
||||
void GetStats(Stats *stats, int maximum_age_in_seconds = 180) {
|
||||
// Gather and save stats if saved stats are too old. (Use GetStats() to
|
||||
// read saved stats.)
|
||||
//
|
||||
// Maximum allowed age for a "hit" on saved results is determined by the
|
||||
// two interval parameters. Both set to 0 forces a re-scan. For example
|
||||
// with min_interval_seconds=300 and min_interval_factor=100, if the last
|
||||
// scan took 10s, we would only rescan ("miss") if the age in seconds of
|
||||
// the saved results is > max(300, 100*10).
|
||||
// Justification: scans can vary wildly in duration, e.g. from 0.02 sec
|
||||
// to as much as 20 seconds, so we want to be able to cap the absolute
|
||||
// and relative frequency of scans.
|
||||
void CollectStats(int min_interval_seconds, int min_interval_factor) {
|
||||
// Waits for any pending reader or writer (collector)
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
std::lock_guard<std::mutex> lock(working_mutex_);
|
||||
|
||||
// Maximum allowed age is nominally given by the parameter
|
||||
uint64_t max_age_micros =
|
||||
static_cast<uint64_t>(std::min(maximum_age_in_seconds, 0)) * 1000000U;
|
||||
// But we will re-scan more frequently if it means scanning < 1%
|
||||
// of the time and no more than once per second.
|
||||
max_age_micros = std::min(
|
||||
max_age_micros,
|
||||
std::max(uint64_t{1000000},
|
||||
100U * (last_end_time_micros_ - last_start_time_micros_)));
|
||||
static_cast<uint64_t>(std::max(min_interval_seconds, 0)) * 1000000U;
|
||||
|
||||
if (last_end_time_micros_ > last_start_time_micros_ &&
|
||||
min_interval_factor > 0) {
|
||||
max_age_micros = std::max(
|
||||
max_age_micros, min_interval_factor * (last_end_time_micros_ -
|
||||
last_start_time_micros_));
|
||||
}
|
||||
|
||||
uint64_t start_time_micros = clock_->NowMicros();
|
||||
if ((start_time_micros - last_end_time_micros_) > max_age_micros) {
|
||||
last_start_time_micros_ = start_time_micros;
|
||||
saved_stats_.BeginCollection(cache_, clock_, start_time_micros);
|
||||
working_stats_.BeginCollection(cache_, clock_, start_time_micros);
|
||||
|
||||
cache_->ApplyToAllEntries(saved_stats_.GetEntryCallback(), {});
|
||||
cache_->ApplyToAllEntries(working_stats_.GetEntryCallback(), {});
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries", nullptr);
|
||||
|
||||
uint64_t end_time_micros = clock_->NowMicros();
|
||||
last_end_time_micros_ = end_time_micros;
|
||||
saved_stats_.EndCollection(cache_, clock_, end_time_micros);
|
||||
working_stats_.EndCollection(cache_, clock_, end_time_micros);
|
||||
} else {
|
||||
saved_stats_.SkippedCollection();
|
||||
working_stats_.SkippedCollection();
|
||||
}
|
||||
// Copy to caller
|
||||
|
||||
// Save so that we don't need to wait for an outstanding collection in
|
||||
// order to make of copy of the last saved stats
|
||||
std::lock_guard<std::mutex> lock2(saved_mutex_);
|
||||
saved_stats_ = working_stats_;
|
||||
}
|
||||
|
||||
// Gets saved stats, regardless of age
|
||||
void GetStats(Stats *stats) {
|
||||
std::lock_guard<std::mutex> lock(saved_mutex_);
|
||||
*stats = saved_stats_;
|
||||
}
|
||||
|
||||
@ -113,9 +134,11 @@ class CacheEntryStatsCollector {
|
||||
// usage to go flaky. Fix the problem somehow so we can use an
|
||||
// accurate charge.
|
||||
size_t charge = 0;
|
||||
Status s = cache->Insert(cache_key, new_ptr, charge, Deleter, &h);
|
||||
Status s = cache->Insert(cache_key, new_ptr, charge, Deleter, &h,
|
||||
Cache::Priority::HIGH);
|
||||
if (!s.ok()) {
|
||||
assert(h == nullptr);
|
||||
delete new_ptr;
|
||||
return s;
|
||||
}
|
||||
}
|
||||
@ -132,6 +155,7 @@ class CacheEntryStatsCollector {
|
||||
private:
|
||||
explicit CacheEntryStatsCollector(Cache *cache, SystemClock *clock)
|
||||
: saved_stats_(),
|
||||
working_stats_(),
|
||||
last_start_time_micros_(0),
|
||||
last_end_time_micros_(/*pessimistic*/ 10000000),
|
||||
cache_(cache),
|
||||
@ -141,10 +165,14 @@ class CacheEntryStatsCollector {
|
||||
delete static_cast<CacheEntryStatsCollector *>(value);
|
||||
}
|
||||
|
||||
std::mutex mutex_;
|
||||
std::mutex saved_mutex_;
|
||||
Stats saved_stats_;
|
||||
|
||||
std::mutex working_mutex_;
|
||||
Stats working_stats_;
|
||||
uint64_t last_start_time_micros_;
|
||||
uint64_t last_end_time_micros_;
|
||||
|
||||
Cache *const cache_;
|
||||
SystemClock *const clock_;
|
||||
};
|
||||
|
3
cache/lru_cache.cc
vendored
3
cache/lru_cache.cc
vendored
@ -516,6 +516,9 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
|
||||
e->SetSecondaryCacheCompatible(true);
|
||||
e->info_.helper = helper;
|
||||
} else {
|
||||
#ifdef __SANITIZE_THREAD__
|
||||
e->is_secondary_cache_compatible_for_tsan = false;
|
||||
#endif // __SANITIZE_THREAD__
|
||||
e->info_.deleter = deleter;
|
||||
}
|
||||
e->charge = charge;
|
||||
|
35
cache/lru_cache.h
vendored
35
cache/lru_cache.h
vendored
@ -12,6 +12,7 @@
|
||||
#include <string>
|
||||
|
||||
#include "cache/sharded_cache.h"
|
||||
#include "port/lang.h"
|
||||
#include "port/malloc.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/secondary_cache.h"
|
||||
@ -75,8 +76,8 @@ struct LRUHandle {
|
||||
IN_HIGH_PRI_POOL = (1 << 2),
|
||||
// Whether this entry has had any lookups (hits).
|
||||
HAS_HIT = (1 << 3),
|
||||
// Can this be inserted into the tiered cache
|
||||
IS_TIERED_CACHE_COMPATIBLE = (1 << 4),
|
||||
// Can this be inserted into the secondary cache
|
||||
IS_SECONDARY_CACHE_COMPATIBLE = (1 << 4),
|
||||
// Is the handle still being read from a lower tier
|
||||
IS_PENDING = (1 << 5),
|
||||
// Has the item been promoted from a lower tier
|
||||
@ -85,6 +86,14 @@ struct LRUHandle {
|
||||
|
||||
uint8_t flags;
|
||||
|
||||
#ifdef __SANITIZE_THREAD__
|
||||
// TSAN can report a false data race on flags, where one thread is writing
|
||||
// to one of the mutable bits and another thread is reading this immutable
|
||||
// bit. So precisely suppress that TSAN warning, we separate out this bit
|
||||
// during TSAN runs.
|
||||
bool is_secondary_cache_compatible_for_tsan;
|
||||
#endif // __SANITIZE_THREAD__
|
||||
|
||||
// Beginning of the key (MUST BE THE LAST FIELD IN THIS STRUCT!)
|
||||
char key_data[1];
|
||||
|
||||
@ -108,7 +117,11 @@ struct LRUHandle {
|
||||
bool InHighPriPool() const { return flags & IN_HIGH_PRI_POOL; }
|
||||
bool HasHit() const { return flags & HAS_HIT; }
|
||||
bool IsSecondaryCacheCompatible() const {
|
||||
return flags & IS_TIERED_CACHE_COMPATIBLE;
|
||||
#ifdef __SANITIZE_THREAD__
|
||||
return is_secondary_cache_compatible_for_tsan;
|
||||
#else
|
||||
return flags & IS_SECONDARY_CACHE_COMPATIBLE;
|
||||
#endif // __SANITIZE_THREAD__
|
||||
}
|
||||
bool IsPending() const { return flags & IS_PENDING; }
|
||||
bool IsPromoted() const { return flags & IS_PROMOTED; }
|
||||
@ -139,12 +152,15 @@ struct LRUHandle {
|
||||
|
||||
void SetHit() { flags |= HAS_HIT; }
|
||||
|
||||
void SetSecondaryCacheCompatible(bool tiered) {
|
||||
if (tiered) {
|
||||
flags |= IS_TIERED_CACHE_COMPATIBLE;
|
||||
void SetSecondaryCacheCompatible(bool compat) {
|
||||
if (compat) {
|
||||
flags |= IS_SECONDARY_CACHE_COMPATIBLE;
|
||||
} else {
|
||||
flags &= ~IS_TIERED_CACHE_COMPATIBLE;
|
||||
flags &= ~IS_SECONDARY_CACHE_COMPATIBLE;
|
||||
}
|
||||
#ifdef __SANITIZE_THREAD__
|
||||
is_secondary_cache_compatible_for_tsan = compat;
|
||||
#endif // __SANITIZE_THREAD__
|
||||
}
|
||||
|
||||
void SetIncomplete(bool incomp) {
|
||||
@ -165,6 +181,11 @@ struct LRUHandle {
|
||||
|
||||
void Free() {
|
||||
assert(refs == 0);
|
||||
#ifdef __SANITIZE_THREAD__
|
||||
// Here we can safely assert they are the same without a data race reported
|
||||
assert(((flags & IS_SECONDARY_CACHE_COMPATIBLE) != 0) ==
|
||||
is_secondary_cache_compatible_for_tsan);
|
||||
#endif // __SANITIZE_THREAD__
|
||||
if (!IsSecondaryCacheCompatible() && info_.deleter) {
|
||||
(*info_.deleter)(key(), value);
|
||||
} else if (IsSecondaryCacheCompatible()) {
|
||||
|
4
cache/lru_cache_test.cc
vendored
4
cache/lru_cache_test.cc
vendored
@ -659,14 +659,14 @@ TEST_F(DBSecondaryCacheTest, TestSecondaryCacheCorrectness1) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
// In this test, the block cache size is set to 5100, after insert 6 KV-pairs
|
||||
// In this test, the block cache size is set to 6100, after insert 6 KV-pairs
|
||||
// and flush, there are 5 blocks in this SST file, 2 data blocks and 3 meta
|
||||
// blocks. block_1 size is 4096 and block_2 size is 2056. The total size
|
||||
// of the meta blocks are about 900 to 1000. Therefore, we can successfully
|
||||
// insert and cache block_1 in the block cache (this is the different place
|
||||
// from TestSecondaryCacheCorrectness1)
|
||||
TEST_F(DBSecondaryCacheTest, TestSecondaryCacheCorrectness2) {
|
||||
LRUCacheOptions opts(5100, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
|
||||
LRUCacheOptions opts(6100, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
|
||||
kDontChargeCacheMetadata);
|
||||
std::shared_ptr<TestSecondaryCache> secondary_cache(
|
||||
new TestSecondaryCache(2048 * 1024));
|
||||
|
@ -30,7 +30,9 @@ class SequenceIterWrapper : public InternalIterator {
|
||||
public:
|
||||
SequenceIterWrapper(InternalIterator* iter, const Comparator* cmp,
|
||||
bool need_count_entries)
|
||||
: cmp_(cmp), inner_iter_(iter), need_count_entries_(need_count_entries) {}
|
||||
: icmp_(cmp, /*named=*/false),
|
||||
inner_iter_(iter),
|
||||
need_count_entries_(need_count_entries) {}
|
||||
bool Valid() const override { return inner_iter_->Valid(); }
|
||||
Status status() const override { return inner_iter_->status(); }
|
||||
void Next() override {
|
||||
@ -44,7 +46,7 @@ class SequenceIterWrapper : public InternalIterator {
|
||||
// For flush cases, we need to count total number of entries, so we
|
||||
// do Next() rather than Seek().
|
||||
while (inner_iter_->Valid() &&
|
||||
cmp_->Compare(inner_iter_->key(), target) < 0) {
|
||||
icmp_.Compare(inner_iter_->key(), target) < 0) {
|
||||
Next();
|
||||
}
|
||||
}
|
||||
@ -61,7 +63,7 @@ class SequenceIterWrapper : public InternalIterator {
|
||||
uint64_t num_itered() const { return num_itered_; }
|
||||
|
||||
private:
|
||||
const Comparator* cmp_; // not owned
|
||||
InternalKeyComparator icmp_;
|
||||
InternalIterator* inner_iter_; // not owned
|
||||
uint64_t num_itered_ = 0;
|
||||
bool need_count_entries_;
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#include "cache/cache_entry_roles.h"
|
||||
#include "cache/lru_cache.h"
|
||||
#include "db/column_family.h"
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/table.h"
|
||||
@ -152,12 +153,15 @@ class DBBlockCacheTest : public DBTestBase {
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
const std::array<size_t, kNumCacheEntryRoles>& GetCacheEntryRoleCounts() {
|
||||
const std::array<size_t, kNumCacheEntryRoles> GetCacheEntryRoleCountsBg() {
|
||||
// Verify in cache entry role stats
|
||||
ColumnFamilyHandleImpl* cfh =
|
||||
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
|
||||
InternalStats* internal_stats_ptr = cfh->cfd()->internal_stats();
|
||||
return internal_stats_ptr->TEST_GetCacheEntryRoleStats().entry_counts;
|
||||
InternalStats::CacheEntryRoleStats stats;
|
||||
internal_stats_ptr->TEST_GetCacheEntryRoleStats(&stats,
|
||||
/*foreground=*/false);
|
||||
return stats.entry_counts;
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
};
|
||||
@ -169,7 +173,13 @@ TEST_F(DBBlockCacheTest, IteratorBlockCacheUsage) {
|
||||
auto options = GetOptions(table_options);
|
||||
InitTable(options);
|
||||
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
|
||||
LRUCacheOptions co;
|
||||
co.capacity = 0;
|
||||
co.num_shard_bits = 0;
|
||||
co.strict_capacity_limit = false;
|
||||
// Needed not to count entry stats collector
|
||||
co.metadata_charge_policy = kDontChargeCacheMetadata;
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(co);
|
||||
table_options.block_cache = cache;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
@ -193,7 +203,13 @@ TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) {
|
||||
auto options = GetOptions(table_options);
|
||||
InitTable(options);
|
||||
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
|
||||
LRUCacheOptions co;
|
||||
co.capacity = 0;
|
||||
co.num_shard_bits = 0;
|
||||
co.strict_capacity_limit = false;
|
||||
// Needed not to count entry stats collector
|
||||
co.metadata_charge_policy = kDontChargeCacheMetadata;
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(co);
|
||||
table_options.block_cache = cache;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
@ -247,8 +263,14 @@ TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
|
||||
options.compression = CompressionType::kSnappyCompression;
|
||||
InitTable(options);
|
||||
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
|
||||
std::shared_ptr<Cache> compressed_cache = NewLRUCache(1 << 25, 0, false);
|
||||
LRUCacheOptions co;
|
||||
co.capacity = 0;
|
||||
co.num_shard_bits = 0;
|
||||
co.strict_capacity_limit = false;
|
||||
// Needed not to count entry stats collector
|
||||
co.metadata_charge_policy = kDontChargeCacheMetadata;
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(co);
|
||||
table_options.block_cache = cache;
|
||||
table_options.block_cache_compressed = compressed_cache;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
@ -905,10 +927,15 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) {
|
||||
}
|
||||
|
||||
static void ClearCache(Cache* cache) {
|
||||
auto roles = CopyCacheDeleterRoleMap();
|
||||
std::deque<std::string> keys;
|
||||
Cache::ApplyToAllEntriesOptions opts;
|
||||
auto callback = [&](const Slice& key, void* /*value*/, size_t /*charge*/,
|
||||
Cache::DeleterFn /*deleter*/) {
|
||||
Cache::DeleterFn deleter) {
|
||||
if (roles.find(deleter) == roles.end()) {
|
||||
// Keep the stats collector
|
||||
return;
|
||||
}
|
||||
keys.push_back(key.ToString());
|
||||
};
|
||||
cache->ApplyToAllEntries(callback, opts);
|
||||
@ -930,11 +957,13 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
|
||||
++iterations_tested;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
SetTimeElapseOnlySleepOnReopen(&options);
|
||||
options.create_if_missing = true;
|
||||
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
|
||||
options.stats_dump_period_sec = 0;
|
||||
options.max_open_files = 13;
|
||||
options.table_cache_numshardbits = 0;
|
||||
// If this wakes up, it could interfere with test
|
||||
options.stats_dump_period_sec = 0;
|
||||
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.block_cache = cache;
|
||||
@ -970,7 +999,9 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
|
||||
std::array<size_t, kNumCacheEntryRoles> expected{};
|
||||
// For CacheEntryStatsCollector
|
||||
expected[static_cast<size_t>(CacheEntryRole::kMisc)] = 1;
|
||||
EXPECT_EQ(expected, GetCacheEntryRoleCounts());
|
||||
EXPECT_EQ(expected, GetCacheEntryRoleCountsBg());
|
||||
|
||||
std::array<size_t, kNumCacheEntryRoles> prev_expected = expected;
|
||||
|
||||
// First access only filters
|
||||
ASSERT_EQ("NOT_FOUND", Get("different from any key added"));
|
||||
@ -978,7 +1009,14 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
|
||||
if (partition) {
|
||||
expected[static_cast<size_t>(CacheEntryRole::kFilterMetaBlock)] += 2;
|
||||
}
|
||||
EXPECT_EQ(expected, GetCacheEntryRoleCounts());
|
||||
// Within some time window, we will get cached entry stats
|
||||
EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg());
|
||||
// Not enough to force a miss
|
||||
env_->MockSleepForSeconds(45);
|
||||
EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg());
|
||||
// Enough to force a miss
|
||||
env_->MockSleepForSeconds(601);
|
||||
EXPECT_EQ(expected, GetCacheEntryRoleCountsBg());
|
||||
|
||||
// Now access index and data block
|
||||
ASSERT_EQ("value", Get("foo"));
|
||||
@ -988,7 +1026,22 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
|
||||
expected[static_cast<size_t>(CacheEntryRole::kIndexBlock)]++;
|
||||
}
|
||||
expected[static_cast<size_t>(CacheEntryRole::kDataBlock)]++;
|
||||
EXPECT_EQ(expected, GetCacheEntryRoleCounts());
|
||||
// Enough to force a miss
|
||||
env_->MockSleepForSeconds(601);
|
||||
// But inject a simulated long scan so that we need a longer
|
||||
// interval to force a miss next time.
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries",
|
||||
[this](void*) {
|
||||
// To spend no more than 0.2% of time scanning, we would need
|
||||
// interval of at least 10000s
|
||||
env_->MockSleepForSeconds(20);
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
EXPECT_EQ(expected, GetCacheEntryRoleCountsBg());
|
||||
prev_expected = expected;
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
|
||||
// The same for other file
|
||||
ASSERT_EQ("value", Get("zfoo"));
|
||||
@ -998,7 +1051,14 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
|
||||
expected[static_cast<size_t>(CacheEntryRole::kIndexBlock)]++;
|
||||
}
|
||||
expected[static_cast<size_t>(CacheEntryRole::kDataBlock)]++;
|
||||
EXPECT_EQ(expected, GetCacheEntryRoleCounts());
|
||||
// Because of the simulated long scan, this is not enough to force
|
||||
// a miss
|
||||
env_->MockSleepForSeconds(601);
|
||||
EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg());
|
||||
// But this is enough
|
||||
env_->MockSleepForSeconds(10000);
|
||||
EXPECT_EQ(expected, GetCacheEntryRoleCountsBg());
|
||||
prev_expected = expected;
|
||||
|
||||
// Also check the GetProperty interface
|
||||
std::map<std::string, std::string> values;
|
||||
@ -1014,8 +1074,99 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
|
||||
EXPECT_EQ(
|
||||
ToString(expected[static_cast<size_t>(CacheEntryRole::kFilterBlock)]),
|
||||
values["count.filter-block"]);
|
||||
EXPECT_EQ(
|
||||
ToString(
|
||||
prev_expected[static_cast<size_t>(CacheEntryRole::kWriteBuffer)]),
|
||||
values["count.write-buffer"]);
|
||||
EXPECT_EQ(ToString(expected[static_cast<size_t>(CacheEntryRole::kMisc)]),
|
||||
values["count.misc"]);
|
||||
|
||||
// Add one for kWriteBuffer
|
||||
{
|
||||
WriteBufferManager wbm(size_t{1} << 20, cache);
|
||||
wbm.ReserveMem(1024);
|
||||
expected[static_cast<size_t>(CacheEntryRole::kWriteBuffer)]++;
|
||||
// Now we check that the GetProperty interface is more agressive about
|
||||
// re-scanning stats, but not totally aggressive.
|
||||
// Within some time window, we will get cached entry stats
|
||||
env_->MockSleepForSeconds(1);
|
||||
EXPECT_EQ(ToString(prev_expected[static_cast<size_t>(
|
||||
CacheEntryRole::kWriteBuffer)]),
|
||||
values["count.write-buffer"]);
|
||||
// Not enough for a "background" miss but enough for a "foreground" miss
|
||||
env_->MockSleepForSeconds(45);
|
||||
|
||||
ASSERT_TRUE(db_->GetMapProperty(DB::Properties::kBlockCacheEntryStats,
|
||||
&values));
|
||||
EXPECT_EQ(
|
||||
ToString(
|
||||
expected[static_cast<size_t>(CacheEntryRole::kWriteBuffer)]),
|
||||
values["count.write-buffer"]);
|
||||
}
|
||||
prev_expected = expected;
|
||||
|
||||
// With collector pinned in cache, we should be able to hit
|
||||
// even if the cache is full
|
||||
ClearCache(cache.get());
|
||||
Cache::Handle* h = nullptr;
|
||||
ASSERT_OK(cache->Insert("Fill-it-up", nullptr, capacity + 1,
|
||||
GetNoopDeleterForRole<CacheEntryRole::kMisc>(),
|
||||
&h, Cache::Priority::HIGH));
|
||||
ASSERT_GT(cache->GetUsage(), cache->GetCapacity());
|
||||
expected = {};
|
||||
// For CacheEntryStatsCollector
|
||||
expected[static_cast<size_t>(CacheEntryRole::kMisc)] = 1;
|
||||
// For Fill-it-up
|
||||
expected[static_cast<size_t>(CacheEntryRole::kMisc)]++;
|
||||
// Still able to hit on saved stats
|
||||
EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg());
|
||||
// Enough to force a miss
|
||||
env_->MockSleepForSeconds(1000);
|
||||
EXPECT_EQ(expected, GetCacheEntryRoleCountsBg());
|
||||
|
||||
cache->Release(h);
|
||||
|
||||
// Now we test that the DB mutex is not held during scans, for the ways
|
||||
// we know how to (possibly) trigger them. Without a better good way to
|
||||
// check this, we simply inject an acquire & release of the DB mutex
|
||||
// deep in the stat collection code. If we were already holding the
|
||||
// mutex, that is UB that would at least be found by TSAN.
|
||||
int scan_count = 0;
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries",
|
||||
[this, &scan_count](void*) {
|
||||
dbfull()->TEST_LockMutex();
|
||||
dbfull()->TEST_UnlockMutex();
|
||||
++scan_count;
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Different things that might trigger a scan, with mock sleeps to
|
||||
// force a miss.
|
||||
env_->MockSleepForSeconds(10000);
|
||||
dbfull()->DumpStats();
|
||||
ASSERT_EQ(scan_count, 1);
|
||||
|
||||
env_->MockSleepForSeconds(10000);
|
||||
ASSERT_TRUE(
|
||||
db_->GetMapProperty(DB::Properties::kBlockCacheEntryStats, &values));
|
||||
ASSERT_EQ(scan_count, 2);
|
||||
|
||||
env_->MockSleepForSeconds(10000);
|
||||
std::string value_str;
|
||||
ASSERT_TRUE(
|
||||
db_->GetProperty(DB::Properties::kBlockCacheEntryStats, &value_str));
|
||||
ASSERT_EQ(scan_count, 3);
|
||||
|
||||
env_->MockSleepForSeconds(10000);
|
||||
ASSERT_TRUE(db_->GetProperty(DB::Properties::kCFStats, &value_str));
|
||||
// To match historical speed, querying this property no longer triggers
|
||||
// a scan, even if results are old. But periodic dump stats should keep
|
||||
// things reasonably updated.
|
||||
ASSERT_EQ(scan_count, /*unchanged*/ 3);
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
EXPECT_GE(iterations_tested, 1);
|
||||
}
|
||||
|
@ -63,6 +63,7 @@
|
||||
#include "memtable/hash_linklist_rep.h"
|
||||
#include "memtable/hash_skiplist_rep.h"
|
||||
#include "monitoring/in_memory_stats_history.h"
|
||||
#include "monitoring/instrumented_mutex.h"
|
||||
#include "monitoring/iostats_context_imp.h"
|
||||
#include "monitoring/perf_context_imp.h"
|
||||
#include "monitoring/persistent_stats_history.h"
|
||||
@ -906,32 +907,50 @@ Status DBImpl::GetStatsHistory(
|
||||
void DBImpl::DumpStats() {
|
||||
TEST_SYNC_POINT("DBImpl::DumpStats:1");
|
||||
#ifndef ROCKSDB_LITE
|
||||
const DBPropertyInfo* cf_property_info =
|
||||
GetPropertyInfo(DB::Properties::kCFStats);
|
||||
assert(cf_property_info != nullptr);
|
||||
const DBPropertyInfo* db_property_info =
|
||||
GetPropertyInfo(DB::Properties::kDBStats);
|
||||
assert(db_property_info != nullptr);
|
||||
|
||||
std::string stats;
|
||||
if (shutdown_initiated_) {
|
||||
return;
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
default_cf_internal_stats_->GetStringProperty(
|
||||
*db_property_info, DB::Properties::kDBStats, &stats);
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->initialized()) {
|
||||
cfd->internal_stats()->GetStringProperty(
|
||||
*cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
|
||||
// Release DB mutex for gathering cache entry stats. Pass over all
|
||||
// column families for this first so that other stats are dumped
|
||||
// near-atomically.
|
||||
InstrumentedMutexUnlock u(&mutex_);
|
||||
cfd->internal_stats()->CollectCacheEntryStats(/*foreground=*/false);
|
||||
}
|
||||
}
|
||||
|
||||
const std::string* property = &DB::Properties::kDBStats;
|
||||
const DBPropertyInfo* property_info = GetPropertyInfo(*property);
|
||||
assert(property_info != nullptr);
|
||||
assert(!property_info->need_out_of_mutex);
|
||||
default_cf_internal_stats_->GetStringProperty(*property_info, *property,
|
||||
&stats);
|
||||
|
||||
property = &DB::Properties::kCFStatsNoFileHistogram;
|
||||
property_info = GetPropertyInfo(*property);
|
||||
assert(property_info != nullptr);
|
||||
assert(!property_info->need_out_of_mutex);
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->initialized()) {
|
||||
cfd->internal_stats()->GetStringProperty(
|
||||
*cf_property_info, DB::Properties::kCFFileHistogram, &stats);
|
||||
cfd->internal_stats()->GetStringProperty(*property_info, *property,
|
||||
&stats);
|
||||
}
|
||||
}
|
||||
|
||||
property = &DB::Properties::kCFFileHistogram;
|
||||
property_info = GetPropertyInfo(*property);
|
||||
assert(property_info != nullptr);
|
||||
assert(!property_info->need_out_of_mutex);
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->initialized()) {
|
||||
cfd->internal_stats()->GetStringProperty(*property_info, *property,
|
||||
&stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3197,16 +3216,21 @@ bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
|
||||
}
|
||||
return ret_value;
|
||||
} else if (property_info->handle_string) {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return cfd->internal_stats()->GetStringProperty(*property_info, property,
|
||||
value);
|
||||
} else if (property_info->handle_string_dbimpl) {
|
||||
std::string tmp_value;
|
||||
bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
|
||||
if (ret_value) {
|
||||
*value = tmp_value;
|
||||
if (property_info->need_out_of_mutex) {
|
||||
return cfd->internal_stats()->GetStringProperty(*property_info, property,
|
||||
value);
|
||||
} else {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return cfd->internal_stats()->GetStringProperty(*property_info, property,
|
||||
value);
|
||||
}
|
||||
} else if (property_info->handle_string_dbimpl) {
|
||||
if (property_info->need_out_of_mutex) {
|
||||
return (this->*(property_info->handle_string_dbimpl))(value);
|
||||
} else {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return (this->*(property_info->handle_string_dbimpl))(value);
|
||||
}
|
||||
return ret_value;
|
||||
}
|
||||
// Shouldn't reach here since exactly one of handle_string and handle_int
|
||||
// should be non-nullptr.
|
||||
@ -3224,9 +3248,14 @@ bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
|
||||
if (property_info == nullptr) {
|
||||
return false;
|
||||
} else if (property_info->handle_map) {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return cfd->internal_stats()->GetMapProperty(*property_info, property,
|
||||
value);
|
||||
if (property_info->need_out_of_mutex) {
|
||||
return cfd->internal_stats()->GetMapProperty(*property_info, property,
|
||||
value);
|
||||
} else {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return cfd->internal_stats()->GetMapProperty(*property_info, property,
|
||||
value);
|
||||
}
|
||||
}
|
||||
// If we reach this point it means that handle_map is not provided for the
|
||||
// requested property
|
||||
|
@ -394,7 +394,7 @@ const std::unordered_map<std::string, DBPropertyInfo>
|
||||
{DB::Properties::kDBStats,
|
||||
{false, &InternalStats::HandleDBStats, nullptr, nullptr, nullptr}},
|
||||
{DB::Properties::kBlockCacheEntryStats,
|
||||
{false, &InternalStats::HandleBlockCacheEntryStats, nullptr,
|
||||
{true, &InternalStats::HandleBlockCacheEntryStats, nullptr,
|
||||
&InternalStats::HandleBlockCacheEntryStatsMap, nullptr}},
|
||||
{DB::Properties::kSSTables,
|
||||
{false, &InternalStats::HandleSsTables, nullptr, nullptr, nullptr}},
|
||||
@ -510,7 +510,7 @@ const std::unordered_map<std::string, DBPropertyInfo>
|
||||
{false, nullptr, &InternalStats::HandleBlockCachePinnedUsage, nullptr,
|
||||
nullptr}},
|
||||
{DB::Properties::kOptionsStatistics,
|
||||
{false, nullptr, nullptr, nullptr,
|
||||
{true, nullptr, nullptr, nullptr,
|
||||
&DBImpl::GetPropertyHandleOptionsStatistics}},
|
||||
};
|
||||
|
||||
@ -526,28 +526,52 @@ InternalStats::InternalStats(int num_levels, SystemClock* clock,
|
||||
number_levels_(num_levels),
|
||||
clock_(clock),
|
||||
cfd_(cfd),
|
||||
started_at_(clock->NowMicros()) {}
|
||||
|
||||
Status InternalStats::CollectCacheEntryStats() {
|
||||
using Collector = CacheEntryStatsCollector<CacheEntryRoleStats>;
|
||||
Cache* block_cache;
|
||||
bool ok = HandleBlockCacheStat(&block_cache);
|
||||
started_at_(clock->NowMicros()) {
|
||||
Cache* block_cache = nullptr;
|
||||
bool ok = GetBlockCacheForStats(&block_cache);
|
||||
if (ok) {
|
||||
// Extract or create stats collector.
|
||||
std::shared_ptr<Collector> collector;
|
||||
Status s = Collector::GetShared(block_cache, clock_, &collector);
|
||||
assert(block_cache);
|
||||
// Extract or create stats collector. Could fail in rare cases.
|
||||
Status s = CacheEntryStatsCollector<CacheEntryRoleStats>::GetShared(
|
||||
block_cache, clock_, &cache_entry_stats_collector_);
|
||||
if (s.ok()) {
|
||||
collector->GetStats(&cache_entry_stats);
|
||||
assert(cache_entry_stats_collector_);
|
||||
} else {
|
||||
// Block cache likely under pressure. Scanning could make it worse,
|
||||
// so skip.
|
||||
assert(!cache_entry_stats_collector_);
|
||||
}
|
||||
return s;
|
||||
} else {
|
||||
return Status::NotFound("block cache not configured");
|
||||
assert(!block_cache);
|
||||
}
|
||||
}
|
||||
|
||||
void InternalStats::TEST_GetCacheEntryRoleStats(CacheEntryRoleStats* stats,
|
||||
bool foreground) {
|
||||
CollectCacheEntryStats(foreground);
|
||||
if (cache_entry_stats_collector_) {
|
||||
cache_entry_stats_collector_->GetStats(stats);
|
||||
}
|
||||
}
|
||||
|
||||
void InternalStats::CollectCacheEntryStats(bool foreground) {
|
||||
// This function is safe to call from any thread because
|
||||
// cache_entry_stats_collector_ field is const after constructor
|
||||
// and ->GetStats does its own synchronization, which also suffices for
|
||||
// cache_entry_stats_.
|
||||
|
||||
if (!cache_entry_stats_collector_) {
|
||||
return; // nothing to do (e.g. no block cache)
|
||||
}
|
||||
|
||||
// For "background" collections, strictly cap the collection time by
|
||||
// expanding effective cache TTL. For foreground, be more aggressive about
|
||||
// getting latest data.
|
||||
int min_interval_seconds = foreground ? 10 : 180;
|
||||
// 1/500 = max of 0.2% of one CPU thread
|
||||
int min_interval_factor = foreground ? 10 : 500;
|
||||
cache_entry_stats_collector_->CollectStats(min_interval_seconds,
|
||||
min_interval_factor);
|
||||
}
|
||||
|
||||
std::function<void(const Slice&, void*, size_t, Cache::DeleterFn)>
|
||||
InternalStats::CacheEntryRoleStats::GetEntryCallback() {
|
||||
return [&](const Slice& /*key*/, void* /*value*/, size_t charge,
|
||||
@ -636,21 +660,25 @@ void InternalStats::CacheEntryRoleStats::ToMap(
|
||||
|
||||
bool InternalStats::HandleBlockCacheEntryStats(std::string* value,
|
||||
Slice /*suffix*/) {
|
||||
Status s = CollectCacheEntryStats();
|
||||
if (!s.ok()) {
|
||||
if (!cache_entry_stats_collector_) {
|
||||
return false;
|
||||
}
|
||||
*value = cache_entry_stats.ToString(clock_);
|
||||
CollectCacheEntryStats(/*foreground*/ true);
|
||||
CacheEntryRoleStats stats;
|
||||
cache_entry_stats_collector_->GetStats(&stats);
|
||||
*value = stats.ToString(clock_);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool InternalStats::HandleBlockCacheEntryStatsMap(
|
||||
std::map<std::string, std::string>* values, Slice /*suffix*/) {
|
||||
Status s = CollectCacheEntryStats();
|
||||
if (!s.ok()) {
|
||||
if (!cache_entry_stats_collector_) {
|
||||
return false;
|
||||
}
|
||||
cache_entry_stats.ToMap(values, clock_);
|
||||
CollectCacheEntryStats(/*foreground*/ true);
|
||||
CacheEntryRoleStats stats;
|
||||
cache_entry_stats_collector_->GetStats(&stats);
|
||||
stats.ToMap(values, clock_);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1110,7 +1138,7 @@ bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/,
|
||||
return *value > 0 && *value < std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
bool InternalStats::HandleBlockCacheStat(Cache** block_cache) {
|
||||
bool InternalStats::GetBlockCacheForStats(Cache** block_cache) {
|
||||
assert(block_cache != nullptr);
|
||||
auto* table_factory = cfd_->ioptions()->table_factory.get();
|
||||
assert(table_factory != nullptr);
|
||||
@ -1122,7 +1150,7 @@ bool InternalStats::HandleBlockCacheStat(Cache** block_cache) {
|
||||
bool InternalStats::HandleBlockCacheCapacity(uint64_t* value, DBImpl* /*db*/,
|
||||
Version* /*version*/) {
|
||||
Cache* block_cache;
|
||||
bool ok = HandleBlockCacheStat(&block_cache);
|
||||
bool ok = GetBlockCacheForStats(&block_cache);
|
||||
if (!ok) {
|
||||
return false;
|
||||
}
|
||||
@ -1133,7 +1161,7 @@ bool InternalStats::HandleBlockCacheCapacity(uint64_t* value, DBImpl* /*db*/,
|
||||
bool InternalStats::HandleBlockCacheUsage(uint64_t* value, DBImpl* /*db*/,
|
||||
Version* /*version*/) {
|
||||
Cache* block_cache;
|
||||
bool ok = HandleBlockCacheStat(&block_cache);
|
||||
bool ok = GetBlockCacheForStats(&block_cache);
|
||||
if (!ok) {
|
||||
return false;
|
||||
}
|
||||
@ -1144,7 +1172,7 @@ bool InternalStats::HandleBlockCacheUsage(uint64_t* value, DBImpl* /*db*/,
|
||||
bool InternalStats::HandleBlockCachePinnedUsage(uint64_t* value, DBImpl* /*db*/,
|
||||
Version* /*version*/) {
|
||||
Cache* block_cache;
|
||||
bool ok = HandleBlockCacheStat(&block_cache);
|
||||
bool ok = GetBlockCacheForStats(&block_cache);
|
||||
if (!ok) {
|
||||
return false;
|
||||
}
|
||||
@ -1491,7 +1519,8 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
|
||||
vstorage->GetTotalBlobFileSize() / kGB);
|
||||
value->append(buf);
|
||||
|
||||
double seconds_up = (clock_->NowMicros() - started_at_ + 1) / kMicrosInSec;
|
||||
uint64_t now_micros = clock_->NowMicros();
|
||||
double seconds_up = (now_micros - started_at_ + 1) / kMicrosInSec;
|
||||
double interval_seconds_up = seconds_up - cf_stats_snapshot_.seconds_up;
|
||||
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
|
||||
seconds_up, interval_seconds_up);
|
||||
@ -1606,13 +1635,20 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
|
||||
cf_stats_snapshot_.comp_stats = compaction_stats_sum;
|
||||
cf_stats_snapshot_.stall_count = total_stall_count;
|
||||
|
||||
Status s = CollectCacheEntryStats();
|
||||
if (s.ok()) {
|
||||
value->append(cache_entry_stats.ToString(clock_));
|
||||
} else {
|
||||
value->append("Block cache: ");
|
||||
value->append(s.ToString());
|
||||
value->append("\n");
|
||||
// Do not gather cache entry stats during CFStats because DB
|
||||
// mutex is held. Only dump last cached collection (rely on DB
|
||||
// periodic stats dump to update)
|
||||
if (cache_entry_stats_collector_) {
|
||||
CacheEntryRoleStats stats;
|
||||
// thread safe
|
||||
cache_entry_stats_collector_->GetStats(&stats);
|
||||
|
||||
constexpr uint64_t kDayInMicros = uint64_t{86400} * 1000000U;
|
||||
|
||||
// Skip if stats are extremely old (> 1 day, incl not yet populated)
|
||||
if (now_micros - stats.last_end_time_micros_ < kDayInMicros) {
|
||||
value->append(stats.ToString(clock_));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,8 @@ class ColumnFamilyData;
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
template <class Stats>
|
||||
class CacheEntryStatsCollector;
|
||||
class DBImpl;
|
||||
class MemTableList;
|
||||
|
||||
@ -390,7 +392,6 @@ class InternalStats {
|
||||
cf_stats_count_[i] = 0;
|
||||
cf_stats_value_[i] = 0;
|
||||
}
|
||||
cache_entry_stats.Clear();
|
||||
for (auto& comp_stat : comp_stats_) {
|
||||
comp_stat.Clear();
|
||||
}
|
||||
@ -457,20 +458,20 @@ class InternalStats {
|
||||
bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info,
|
||||
Version* version, uint64_t* value);
|
||||
|
||||
// Unless there is a recent enough collection of the stats, collect and
|
||||
// saved new cache entry stats. If `foreground`, require data to be more
|
||||
// recent to skip re-collection.
|
||||
//
|
||||
// This should only be called while NOT holding the DB mutex.
|
||||
void CollectCacheEntryStats(bool foreground);
|
||||
|
||||
const uint64_t* TEST_GetCFStatsValue() const { return cf_stats_value_; }
|
||||
|
||||
const std::vector<CompactionStats>& TEST_GetCompactionStats() const {
|
||||
return comp_stats_;
|
||||
}
|
||||
|
||||
const CacheEntryRoleStats& TEST_GetCacheEntryRoleStats() {
|
||||
Status s = CollectCacheEntryStats();
|
||||
if (!s.ok()) {
|
||||
assert(false);
|
||||
cache_entry_stats.Clear();
|
||||
}
|
||||
return cache_entry_stats;
|
||||
}
|
||||
void TEST_GetCacheEntryRoleStats(CacheEntryRoleStats* stats, bool foreground);
|
||||
|
||||
// Store a mapping from the user-facing DB::Properties string to our
|
||||
// DBPropertyInfo struct used internally for retrieving properties.
|
||||
@ -490,16 +491,20 @@ class InternalStats {
|
||||
void DumpCFStatsNoFileHistogram(std::string* value);
|
||||
void DumpCFFileHistogram(std::string* value);
|
||||
|
||||
bool HandleBlockCacheStat(Cache** block_cache);
|
||||
|
||||
Status CollectCacheEntryStats();
|
||||
bool GetBlockCacheForStats(Cache** block_cache);
|
||||
|
||||
// Per-DB stats
|
||||
std::atomic<uint64_t> db_stats_[kIntStatsNumMax];
|
||||
// Per-ColumnFamily stats
|
||||
uint64_t cf_stats_value_[INTERNAL_CF_STATS_ENUM_MAX];
|
||||
uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX];
|
||||
CacheEntryRoleStats cache_entry_stats;
|
||||
// Initialize/reference the collector in constructor so that we don't need
|
||||
// additional synchronization in InternalStats, relying on synchronization
|
||||
// in CacheEntryStatsCollector::GetStats. This collector is pinned in cache
|
||||
// (through a shared_ptr) so that it does not get immediately ejected from
|
||||
// a full cache, which would force a re-scan on the next GetStats.
|
||||
std::shared_ptr<CacheEntryStatsCollector<CacheEntryRoleStats>>
|
||||
cache_entry_stats_collector_;
|
||||
// Per-ColumnFamily/level compaction stats
|
||||
std::vector<CompactionStats> comp_stats_;
|
||||
std::vector<CompactionStats> comp_stats_by_pri_;
|
||||
|
@ -572,7 +572,7 @@ class Statistics {
|
||||
// Resets all ticker and histogram stats
|
||||
virtual Status Reset() { return Status::NotSupported("Not implemented"); }
|
||||
|
||||
// String representation of the statistic object.
|
||||
// String representation of the statistic object. Must be thread-safe.
|
||||
virtual std::string ToString() const {
|
||||
// Do nothing by default
|
||||
return std::string("ToString(): not implemented");
|
||||
|
@ -10,8 +10,8 @@
|
||||
#include "rocksdb/rocksdb_namespace.h"
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 20
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_MINOR 21
|
||||
#define ROCKSDB_PATCH 3
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -58,7 +58,8 @@ public class MemoryUtilTest {
|
||||
db.getAggregatedLongProperty(UNFLUSHED_MEMTABLE_SIZE));
|
||||
assertThat(usage.get(MemoryUsageType.kTableReadersTotal)).isEqualTo(
|
||||
db.getAggregatedLongProperty(TABLE_READERS));
|
||||
assertThat(usage.get(MemoryUsageType.kCacheTotal)).isEqualTo(0);
|
||||
// TODO(peterd): disable block cache entry stats and check for 0
|
||||
assertThat(usage.get(MemoryUsageType.kCacheTotal)).isLessThan(1024);
|
||||
|
||||
db.put(key, value);
|
||||
db.flush(flushOptions);
|
||||
|
@ -51,8 +51,7 @@ class InstrumentedMutex {
|
||||
int stats_code_;
|
||||
};
|
||||
|
||||
// A wrapper class for port::Mutex that provides additional layer
|
||||
// for collecting stats and instrumentation.
|
||||
// RAII wrapper for InstrumentedMutex
|
||||
class InstrumentedMutexLock {
|
||||
public:
|
||||
explicit InstrumentedMutexLock(InstrumentedMutex* mutex) : mutex_(mutex) {
|
||||
@ -69,6 +68,22 @@ class InstrumentedMutexLock {
|
||||
void operator=(const InstrumentedMutexLock&) = delete;
|
||||
};
|
||||
|
||||
// RAII wrapper for temporary releasing InstrumentedMutex inside
|
||||
// InstrumentedMutexLock
|
||||
class InstrumentedMutexUnlock {
|
||||
public:
|
||||
explicit InstrumentedMutexUnlock(InstrumentedMutex* mutex) : mutex_(mutex) {
|
||||
mutex_->Unlock();
|
||||
}
|
||||
|
||||
~InstrumentedMutexUnlock() { mutex_->Lock(); }
|
||||
|
||||
private:
|
||||
InstrumentedMutex* const mutex_;
|
||||
InstrumentedMutexUnlock(const InstrumentedMutexUnlock&) = delete;
|
||||
void operator=(const InstrumentedMutexUnlock&) = delete;
|
||||
};
|
||||
|
||||
class InstrumentedCondVar {
|
||||
public:
|
||||
explicit InstrumentedCondVar(InstrumentedMutex* instrumented_mutex)
|
||||
|
17
port/lang.h
17
port/lang.h
@ -15,6 +15,8 @@
|
||||
#endif
|
||||
#endif
|
||||
|
||||
// ASAN (Address sanitizer)
|
||||
|
||||
#if defined(__clang__)
|
||||
#if defined(__has_feature)
|
||||
#if __has_feature(address_sanitizer)
|
||||
@ -39,3 +41,18 @@
|
||||
#else
|
||||
#define STATIC_AVOID_DESTRUCTION(Type, name) static Type& name = *new Type
|
||||
#endif
|
||||
|
||||
// TSAN (Thread sanitizer)
|
||||
|
||||
// For simplicity, standardize on the GCC define
|
||||
#if defined(__clang__)
|
||||
#if defined(__has_feature) && __has_feature(thread_sanitizer)
|
||||
#define __SANITIZE_THREAD__ 1
|
||||
#endif // __has_feature(thread_sanitizer)
|
||||
#endif // __clang__
|
||||
|
||||
#ifdef __SANITIZE_THREAD__
|
||||
#define TSAN_SUPPRESSION __attribute__((no_sanitize("thread")))
|
||||
#else
|
||||
#define TSAN_SUPPRESSION
|
||||
#endif // TSAN_SUPPRESSION
|
||||
|
@ -36,6 +36,8 @@ void* SaveStack(int* /*num_frames*/, int /*first_frames_to_skip*/) {
|
||||
#include <sys/sysctl.h>
|
||||
#endif
|
||||
|
||||
#include "port/lang.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
namespace port {
|
||||
|
||||
@ -163,8 +165,7 @@ static void StackTraceHandler(int sig) {
|
||||
|
||||
// Efforts to fix or suppress TSAN warnings "signal-unsafe call inside of
|
||||
// a signal" have failed, so just warn the user about them.
|
||||
#if defined(__clang__) && defined(__has_feature)
|
||||
#if __has_feature(thread_sanitizer)
|
||||
#ifdef __SANITIZE_THREAD__
|
||||
fprintf(stderr,
|
||||
"==> NOTE: any above warnings about \"signal-unsafe call\" are\n"
|
||||
"==> ignorable, as they are expected when generating a stack\n"
|
||||
@ -173,7 +174,6 @@ static void StackTraceHandler(int sig) {
|
||||
"==> in the TSAN warning can be useful for that. (The stack\n"
|
||||
"==> trace printed by the signal handler is likely obscured\n"
|
||||
"==> by TSAN output.)\n");
|
||||
#endif
|
||||
#endif
|
||||
|
||||
// re-signal to default handler (so we still get core dump if needed...)
|
||||
|
@ -1124,9 +1124,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
Statistics* statistics = rep_->ioptions.statistics.get();
|
||||
bool using_zstd = rep_->blocks_definitely_zstd_compressed;
|
||||
const FilterPolicy* filter_policy = rep_->filter_policy;
|
||||
Cache::CreateCallback create_cb =
|
||||
GetCreateCallback(read_amp_bytes_per_bit, statistics, using_zstd,
|
||||
filter_policy, *block->GetValue());
|
||||
Cache::CreateCallback create_cb = GetCreateCallback<TBlocklike>(
|
||||
read_amp_bytes_per_bit, statistics, using_zstd, filter_policy);
|
||||
|
||||
// Lookup uncompressed cache first
|
||||
if (block_cache != nullptr) {
|
||||
@ -1151,8 +1150,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
|
||||
assert(!compressed_block_cache_key.empty());
|
||||
BlockContents contents;
|
||||
Cache::CreateCallback create_cb_special = GetCreateCallback(
|
||||
read_amp_bytes_per_bit, statistics, using_zstd, filter_policy, contents);
|
||||
Cache::CreateCallback create_cb_special = GetCreateCallback<BlockContents>(
|
||||
read_amp_bytes_per_bit, statistics, using_zstd, filter_policy);
|
||||
block_cache_compressed_handle = block_cache_compressed->Lookup(
|
||||
compressed_block_cache_key,
|
||||
BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
|
||||
|
@ -23,8 +23,7 @@ Cache::CacheItemHelper* GetCacheItemHelperForRole();
|
||||
template <typename TBlocklike>
|
||||
Cache::CreateCallback GetCreateCallback(size_t read_amp_bytes_per_bit,
|
||||
Statistics* statistics, bool using_zstd,
|
||||
const FilterPolicy* filter_policy,
|
||||
const TBlocklike& /*block*/) {
|
||||
const FilterPolicy* filter_policy) {
|
||||
return [read_amp_bytes_per_bit, statistics, using_zstd, filter_policy](
|
||||
void* buf, size_t size, void** out_obj, size_t* charge) -> Status {
|
||||
assert(buf != nullptr);
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <fstream>
|
||||
|
||||
#include "monitoring/instrumented_mutex.h"
|
||||
#include "port/lang.h"
|
||||
#include "rocksdb/file_system.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "trace_replay/trace_replay.h"
|
||||
@ -157,20 +158,6 @@ class IOTracer {
|
||||
// mutex and ignore the operation if writer_is null. So its ok if
|
||||
// tracing_enabled shows non updated value.
|
||||
|
||||
#if defined(__clang__)
|
||||
#if defined(__has_feature) && __has_feature(thread_sanitizer)
|
||||
#define TSAN_SUPPRESSION __attribute__((no_sanitize("thread")))
|
||||
#endif // __has_feature(thread_sanitizer)
|
||||
#else // __clang__
|
||||
#ifdef __SANITIZE_THREAD__
|
||||
#define TSAN_SUPPRESSION __attribute__((no_sanitize("thread")))
|
||||
#endif // __SANITIZE_THREAD__
|
||||
#endif // __clang__
|
||||
|
||||
#ifndef TSAN_SUPPRESSION
|
||||
#define TSAN_SUPPRESSION
|
||||
#endif // TSAN_SUPPRESSION
|
||||
|
||||
// Start writing IO operations to the trace_writer.
|
||||
TSAN_SUPPRESSION Status
|
||||
StartIOTrace(SystemClock* clock, const TraceOptions& trace_options,
|
||||
|
@ -4,7 +4,9 @@
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#include "rocksdb/utilities/sim_cache.h"
|
||||
|
||||
#include <cstdlib>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
|
||||
@ -87,6 +89,8 @@ TEST_F(SimCacheTest, SimCache) {
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
RecordCacheCounters(options);
|
||||
// due to cache entry stats collector
|
||||
uint64_t base_misses = simCache->get_miss_counter();
|
||||
|
||||
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks);
|
||||
Iterator* iter = nullptr;
|
||||
@ -99,8 +103,8 @@ TEST_F(SimCacheTest, SimCache) {
|
||||
CheckCacheCounters(options, 1, 0, 1, 0);
|
||||
iterators[i].reset(iter);
|
||||
}
|
||||
ASSERT_EQ(kNumBlocks,
|
||||
simCache->get_hit_counter() + simCache->get_miss_counter());
|
||||
ASSERT_EQ(kNumBlocks, simCache->get_hit_counter() +
|
||||
simCache->get_miss_counter() - base_misses);
|
||||
ASSERT_EQ(0, simCache->get_hit_counter());
|
||||
size_t usage = simCache->GetUsage();
|
||||
ASSERT_LT(0, usage);
|
||||
@ -137,8 +141,8 @@ TEST_F(SimCacheTest, SimCache) {
|
||||
CheckCacheCounters(options, 1, 0, 1, 0);
|
||||
}
|
||||
ASSERT_EQ(0, simCache->GetPinnedUsage());
|
||||
ASSERT_EQ(3 * kNumBlocks + 1,
|
||||
simCache->get_hit_counter() + simCache->get_miss_counter());
|
||||
ASSERT_EQ(3 * kNumBlocks + 1, simCache->get_hit_counter() +
|
||||
simCache->get_miss_counter() - base_misses);
|
||||
ASSERT_EQ(6, simCache->get_hit_counter());
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user