Compare commits
14 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
b026bfdf4a | ||
|
ce7b6bced6 | ||
|
2f3fac5a1d | ||
|
b2ef1cb1ae | ||
|
05ab235fe7 | ||
|
36a76d543e | ||
|
21b393c7f3 | ||
|
8d7926a766 | ||
|
9ed80413e0 | ||
|
15ec7d4f95 | ||
|
0612f472b2 | ||
|
187cb23caa | ||
|
848cf937d9 | ||
|
c5cfed2e4a |
@ -338,6 +338,7 @@ set(TESTS
|
||||
db/db_tailing_iter_test.cc
|
||||
db/db_test.cc
|
||||
db/db_test2.cc
|
||||
db/db_block_cache_test.cc
|
||||
db/db_universal_compaction_test.cc
|
||||
db/db_wal_test.cc
|
||||
db/dbformat_test.cc
|
||||
|
@ -1,7 +1,10 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
## 4.6.0 (3/10/2016)
|
||||
### Public API Changes
|
||||
* Change default of BlockBasedTableOptions.format_version to 2. It means default DB created by 4.6 or up cannot be opened by RocksDB version 3.9 or earlier.
|
||||
* Added strict_capacity_limit option to NewLRUCache. If the flag is set to true, insert to cache will fail if no enough capacity can be free. Signiture of Cache::Insert() is updated accordingly.
|
||||
* Tickers [NUMBER_DB_NEXT, NUMBER_DB_PREV, NUMBER_DB_NEXT_FOUND, NUMBER_DB_PREV_FOUND, ITER_BYTES_READ] are not updated immediately. The are updated when the Iterator is deleted.
|
||||
* Add monotonically increasing counter (DB property "rocksdb.current-super-version-number") that increments upon any change to the LSM tree.
|
||||
### New Features
|
||||
* Add CompactionPri::kMinOverlappingRatio, a compaction picking mode friendly to write amplification.
|
||||
* Deprecate Iterator::IsKeyPinned() and replace it with Iterator::GetProperty() with prop_name="rocksdb.iterator.is.key.pinned"
|
||||
|
8
Makefile
8
Makefile
@ -191,10 +191,6 @@ default: all
|
||||
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
|
||||
-Wno-unused-parameter
|
||||
|
||||
ifndef DISABLE_WARNING_AS_ERROR
|
||||
WARNING_FLAGS += -Werror
|
||||
endif
|
||||
|
||||
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
|
||||
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers
|
||||
|
||||
@ -246,6 +242,7 @@ BENCHTOOLOBJECTS = $(BENCH_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL)
|
||||
TESTS = \
|
||||
db_test \
|
||||
db_test2 \
|
||||
db_block_cache_test \
|
||||
db_iter_test \
|
||||
db_log_iter_test \
|
||||
db_compaction_filter_test \
|
||||
@ -794,6 +791,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
db_block_cache_test: db/db_block_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
db_log_iter_test: db/db_log_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
|
@ -52,12 +52,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
FBCODE_BUILD="true"
|
||||
# If we're compiling with TSAN we need pic build
|
||||
PIC_BUILD=$COMPILE_WITH_TSAN
|
||||
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
# we need this to build with MySQL. Don't use for other purposes.
|
||||
source "$PWD/build_tools/fbcode_config4.8.1.sh"
|
||||
fi
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
fi
|
||||
|
||||
# Delete existing output, if it exists
|
||||
|
@ -1,16 +1,19 @@
|
||||
GCC_BASE=/mnt/vol/engshare/fbcode/third-party2/gcc/4.9.x/centos6-native/1317bc4/
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/clang/fc904e50a9266b9d7b98cae1993afa0c5aae1440/3.7.1/centos6-native/9d9ecb9/
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/ea2fd1278810d3af2ea52218d2767e09d786dbd0/4.9.x/gcc-4.9-glibc-2.20/024dbc3
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/f5484f168c0e4d19823d41df052c5870c6e575a4/2.20/gcc-4.9-glibc-2.20/500e281
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/cbf6f1f209e5bd160bdc5d971744e039f36b1566/1.1.3/gcc-4.9-glibc-2.20/e9936bf
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/6d39cb54708049f527e713ad19f2aadb9d3667e8/1.2.8/gcc-4.9-glibc-2.20/e9936bf
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/2ddd45f0853bfc8bb1c27f0f447236a1a26c338a/1.0.6/gcc-4.9-glibc-2.20/e9936bf
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/6858fac689e0f92e584224d91bdb0e39f6c8320d/r131/gcc-4.9-glibc-2.20/e9936bf
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/cb6c4880fcb4fee471574ba6af63a3882155a16a/0.5.1/gcc-4.9-glibc-2.20/e9936bf
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/c7275a4ceae0aca0929e56964a31dafc53c1ee96/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/40791a3fef9206a77f2c4bc51f8169e5bf10d68e/master/gcc-4.9-glibc-2.20/a6c5e1e
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/ae54a5ed22cdabb1c6446dce4e8ffae5b4446d73/2.0.8/gcc-4.9-glibc-2.20/e9936bf
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/303048f72efc92ae079e62dfc84823401aecfd94/trunk/gcc-4.9-glibc-2.20/12266b1
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/1a48835975c66d30e47770ec419758ed3b9ba010/3.10.62-62_fbk17_03959_ge29cc63/gcc-4.9-glibc-2.20/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/a5b8152b2a15ce8a98808cf954fbccec825a97bc/2.25/centos6-native/da39a3e
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/af85c56f424cd5edfc2c97588299b44ecdec96bb/3.10.0/gcc-4.9-glibc-2.20/e9936bf
|
||||
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
|
||||
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
|
||||
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832
|
||||
|
@ -1,4 +1,4 @@
|
||||
GCC_BASE=/mnt/vol/engshare/fbcode/third-party2/gcc/4.8.1/centos6-native/cc6c9dc/
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/ebc96bc2fb751b5a0300b8d91a95bdf24ac1d88b/4.8.1/centos6-native/cc6c9dc
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/clang/fc904e50a9266b9d7b98cae1993afa0c5aae1440/3.7.1/centos6-native/9d9ecb9/
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/ea2fd1278810d3af2ea52218d2767e09d786dbd0/4.8.1/gcc-4.8.1-glibc-2.17/8aac7fc
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/f5484f168c0e4d19823d41df052c5870c6e575a4/2.17/gcc-4.8.1-glibc-2.17/99df8fc
|
||||
|
@ -13,12 +13,12 @@ source "$BASEDIR/dependencies.sh"
|
||||
CFLAGS=""
|
||||
|
||||
# libgcc
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
|
||||
LIBGCC_LIBS=" -L $LIBGCC_BASE/libs"
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
|
||||
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
|
||||
|
||||
# glibc
|
||||
GLIBC_INCLUDE="$GLIBC_BASE/include"
|
||||
GLIBC_LIBS=" -L $GLIB_BASE/lib"
|
||||
GLIBC_LIBS=" -L $GLIBC_BASE/lib"
|
||||
|
||||
# snappy
|
||||
SNAPPY_INCLUDE=" -I $SNAPPY_BASE/include/"
|
||||
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
|
||||
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
|
||||
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
|
||||
CFLAGS+=" -DLZ4"
|
||||
|
||||
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
|
||||
CFLAGS+=" -DZSTD"
|
||||
fi
|
||||
|
||||
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
|
||||
else
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DZSTD"
|
||||
|
||||
# location of gflags headers and libraries
|
||||
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
|
||||
else
|
||||
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DGFLAGS=google"
|
||||
CFLAGS+=" -DGFLAGS=gflags"
|
||||
|
||||
# location of jemalloc
|
||||
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
|
||||
@ -72,13 +76,22 @@ if test -z $PIC_BUILD; then
|
||||
LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a"
|
||||
fi
|
||||
|
||||
# location of TBB
|
||||
TBB_INCLUDE=" -isystem $TBB_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
TBB_LIBS="$TBB_BASE/lib/libtbb.a"
|
||||
else
|
||||
TBB_LIBS="$TBB_BASE/lib/libtbb_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DTBB"
|
||||
|
||||
# use Intel SSE support for checksum calculations
|
||||
export USE_SSE=1
|
||||
|
||||
BINUTILS="$BINUTILS_BASE/bin"
|
||||
AR="$BINUTILS/ar"
|
||||
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE"
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
|
||||
|
||||
STDLIBS="-L $GCC_BASE/lib64"
|
||||
|
||||
@ -87,7 +100,7 @@ CLANG_LIB="$CLANG_BASE/lib"
|
||||
CLANG_SRC="$CLANG_BASE/../../src"
|
||||
|
||||
CLANG_ANALYZER="$CLANG_BIN/clang++"
|
||||
CLANG_SCAN_BUILD="$CLANG_SRC/clang/tools/scan-build/scan-build"
|
||||
CLANG_SCAN_BUILD="$CLANG_SRC/llvm/tools/clang/tools/scan-build/bin/scan-build"
|
||||
|
||||
if [ -z "$USE_CLANG" ]; then
|
||||
# gcc
|
||||
@ -95,39 +108,44 @@ if [ -z "$USE_CLANG" ]; then
|
||||
CXX="$GCC_BASE/bin/g++"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
JEMALLOC=1
|
||||
else
|
||||
# clang
|
||||
CLANG_INCLUDE="$CLANG_LIB/clang/*/include"
|
||||
CLANG_INCLUDE="$CLANG_LIB/clang/stable/include"
|
||||
CC="$CLANG_BIN/clang"
|
||||
CXX="$CLANG_BIN/clang++"
|
||||
|
||||
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $CLANG_INCLUDE"
|
||||
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE/linux "
|
||||
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE "
|
||||
CFLAGS+=" -Wno-expansion-to-defined "
|
||||
CXXFLAGS="-nostdinc++"
|
||||
fi
|
||||
|
||||
CFLAGS+=" $DEPS_INCLUDE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
|
||||
CXXFLAGS+=" $CFLAGS"
|
||||
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
|
||||
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" $LIBUNWIND"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
|
||||
# required by libtbb
|
||||
EXEC_LDFLAGS+=" -ldl"
|
||||
|
||||
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
|
||||
|
||||
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS"
|
||||
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
|
||||
|
||||
VALGRIND_VER="$VALGRIND_BASE/bin/"
|
||||
|
||||
|
@ -64,7 +64,7 @@ touch "$OUTPUT"
|
||||
echo "Writing dependencies to $OUTPUT"
|
||||
|
||||
# Compilers locations
|
||||
GCC_BASE=`ls -d1 $TP2_LATEST/gcc/4.9.x/centos6-native/*/ | head -n1`
|
||||
GCC_BASE=`readlink -f $TP2_LATEST/gcc/4.9.x/centos6-native/*/`
|
||||
CLANG_BASE=`ls -d1 /mnt/gvfs/third-party2/clang/fc904e50a9266b9d7b98cae1993afa0c5aae1440/3.7.1/centos6-native/*/ | head -n1`
|
||||
|
||||
log_variable GCC_BASE
|
||||
@ -101,7 +101,7 @@ touch "$OUTPUT"
|
||||
echo "Writing 4.8.1 dependencies to $OUTPUT"
|
||||
|
||||
# Compilers locations
|
||||
GCC_BASE=`ls -d1 $TP2_LATEST/gcc/4.8.1/centos6-native/*/ | head -n1`
|
||||
GCC_BASE=`readlink -f $TP2_LATEST/gcc/4.8.1/centos6-native/*/`
|
||||
CLANG_BASE=`ls -d1 /mnt/gvfs/third-party2/clang/fc904e50a9266b9d7b98cae1993afa0c5aae1440/3.7.1/centos6-native/*/ | head -n1`
|
||||
|
||||
log_variable GCC_BASE
|
||||
|
237
db/db_block_cache_test.cc
Normal file
237
db/db_block_cache_test.cc
Normal file
@ -0,0 +1,237 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#include <cstdlib>
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
static uint64_t TestGetTickerCount(const Options& options,
|
||||
Tickers ticker_type) {
|
||||
return options.statistics->getTickerCount(ticker_type);
|
||||
}
|
||||
|
||||
class DBBlockCacheTest : public DBTestBase {
|
||||
private:
|
||||
size_t miss_count_ = 0;
|
||||
size_t hit_count_ = 0;
|
||||
size_t insert_count_ = 0;
|
||||
size_t failure_count_ = 0;
|
||||
size_t compressed_miss_count_ = 0;
|
||||
size_t compressed_hit_count_ = 0;
|
||||
size_t compressed_insert_count_ = 0;
|
||||
size_t compressed_failure_count_ = 0;
|
||||
|
||||
public:
|
||||
const size_t kNumBlocks = 10;
|
||||
const size_t kValueSize = 100;
|
||||
|
||||
DBBlockCacheTest() : DBTestBase("/db_block_cache_test") {}
|
||||
|
||||
BlockBasedTableOptions GetTableOptions() {
|
||||
BlockBasedTableOptions table_options;
|
||||
// Set a small enough block size so that each key-value get its own block.
|
||||
table_options.block_size = 1;
|
||||
return table_options;
|
||||
}
|
||||
|
||||
Options GetOptions(const BlockBasedTableOptions& table_options) {
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
// options.compression = kNoCompression;
|
||||
options.statistics = rocksdb::CreateDBStatistics();
|
||||
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
||||
return options;
|
||||
}
|
||||
|
||||
void InitTable(const Options& options) {
|
||||
std::string value(kValueSize, 'a');
|
||||
for (size_t i = 0; i < kNumBlocks; i++) {
|
||||
ASSERT_OK(Put(ToString(i), value.c_str()));
|
||||
}
|
||||
}
|
||||
|
||||
void RecordCacheCounters(const Options& options) {
|
||||
miss_count_ = TestGetTickerCount(options, BLOCK_CACHE_MISS);
|
||||
hit_count_ = TestGetTickerCount(options, BLOCK_CACHE_HIT);
|
||||
insert_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD);
|
||||
failure_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES);
|
||||
compressed_miss_count_ =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
|
||||
compressed_hit_count_ =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
|
||||
compressed_insert_count_ =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD);
|
||||
compressed_failure_count_ =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
|
||||
}
|
||||
|
||||
void CheckCacheCounters(const Options& options, size_t expected_misses,
|
||||
size_t expected_hits, size_t expected_inserts,
|
||||
size_t expected_failures) {
|
||||
size_t new_miss_count = TestGetTickerCount(options, BLOCK_CACHE_MISS);
|
||||
size_t new_hit_count = TestGetTickerCount(options, BLOCK_CACHE_HIT);
|
||||
size_t new_insert_count = TestGetTickerCount(options, BLOCK_CACHE_ADD);
|
||||
size_t new_failure_count =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES);
|
||||
ASSERT_EQ(miss_count_ + expected_misses, new_miss_count);
|
||||
ASSERT_EQ(hit_count_ + expected_hits, new_hit_count);
|
||||
ASSERT_EQ(insert_count_ + expected_inserts, new_insert_count);
|
||||
ASSERT_EQ(failure_count_ + expected_failures, new_failure_count);
|
||||
miss_count_ = new_miss_count;
|
||||
hit_count_ = new_hit_count;
|
||||
insert_count_ = new_insert_count;
|
||||
failure_count_ = new_failure_count;
|
||||
}
|
||||
|
||||
void CheckCompressedCacheCounters(const Options& options,
|
||||
size_t expected_misses,
|
||||
size_t expected_hits,
|
||||
size_t expected_inserts,
|
||||
size_t expected_failures) {
|
||||
size_t new_miss_count =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
|
||||
size_t new_hit_count =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
|
||||
size_t new_insert_count =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD);
|
||||
size_t new_failure_count =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
|
||||
ASSERT_EQ(compressed_miss_count_ + expected_misses, new_miss_count);
|
||||
ASSERT_EQ(compressed_hit_count_ + expected_hits, new_hit_count);
|
||||
ASSERT_EQ(compressed_insert_count_ + expected_inserts, new_insert_count);
|
||||
ASSERT_EQ(compressed_failure_count_ + expected_failures, new_failure_count);
|
||||
compressed_miss_count_ = new_miss_count;
|
||||
compressed_hit_count_ = new_hit_count;
|
||||
compressed_insert_count_ = new_insert_count;
|
||||
compressed_failure_count_ = new_failure_count;
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) {
|
||||
ReadOptions read_options;
|
||||
auto table_options = GetTableOptions();
|
||||
auto options = GetOptions(table_options);
|
||||
InitTable(options);
|
||||
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
|
||||
table_options.block_cache = cache;
|
||||
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
RecordCacheCounters(options);
|
||||
|
||||
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks - 1);
|
||||
Iterator* iter = nullptr;
|
||||
|
||||
// Load blocks into cache.
|
||||
for (size_t i = 0; i < kNumBlocks - 1; i++) {
|
||||
iter = db_->NewIterator(read_options);
|
||||
iter->Seek(ToString(i));
|
||||
ASSERT_OK(iter->status());
|
||||
CheckCacheCounters(options, 1, 0, 1, 0);
|
||||
iterators[i].reset(iter);
|
||||
}
|
||||
size_t usage = cache->GetUsage();
|
||||
ASSERT_LT(0, usage);
|
||||
cache->SetCapacity(usage);
|
||||
ASSERT_EQ(usage, cache->GetPinnedUsage());
|
||||
|
||||
// Test with strict capacity limit.
|
||||
cache->SetStrictCapacityLimit(true);
|
||||
iter = db_->NewIterator(read_options);
|
||||
iter->Seek(ToString(kNumBlocks - 1));
|
||||
ASSERT_TRUE(iter->status().IsIncomplete());
|
||||
CheckCacheCounters(options, 1, 0, 0, 1);
|
||||
delete iter;
|
||||
iter = nullptr;
|
||||
|
||||
// Release interators and access cache again.
|
||||
for (size_t i = 0; i < kNumBlocks - 1; i++) {
|
||||
iterators[i].reset();
|
||||
CheckCacheCounters(options, 0, 0, 0, 0);
|
||||
}
|
||||
ASSERT_EQ(0, cache->GetPinnedUsage());
|
||||
for (size_t i = 0; i < kNumBlocks - 1; i++) {
|
||||
iter = db_->NewIterator(read_options);
|
||||
iter->Seek(ToString(i));
|
||||
ASSERT_OK(iter->status());
|
||||
CheckCacheCounters(options, 0, 1, 0, 0);
|
||||
iterators[i].reset(iter);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
|
||||
ReadOptions read_options;
|
||||
auto table_options = GetTableOptions();
|
||||
auto options = GetOptions(table_options);
|
||||
InitTable(options);
|
||||
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
|
||||
std::shared_ptr<Cache> compressed_cache = NewLRUCache(0, 0, false);
|
||||
table_options.block_cache = cache;
|
||||
table_options.block_cache_compressed = compressed_cache;
|
||||
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
RecordCacheCounters(options);
|
||||
|
||||
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks - 1);
|
||||
Iterator* iter = nullptr;
|
||||
|
||||
// Load blocks into cache.
|
||||
for (size_t i = 0; i < kNumBlocks - 1; i++) {
|
||||
iter = db_->NewIterator(read_options);
|
||||
iter->Seek(ToString(i));
|
||||
ASSERT_OK(iter->status());
|
||||
CheckCacheCounters(options, 1, 0, 1, 0);
|
||||
CheckCompressedCacheCounters(options, 1, 0, 1, 0);
|
||||
iterators[i].reset(iter);
|
||||
}
|
||||
size_t usage = cache->GetUsage();
|
||||
ASSERT_LT(0, usage);
|
||||
ASSERT_EQ(usage, cache->GetPinnedUsage());
|
||||
size_t compressed_usage = compressed_cache->GetUsage();
|
||||
ASSERT_LT(0, compressed_usage);
|
||||
// Compressed block cache cannot be pinned.
|
||||
ASSERT_EQ(0, compressed_cache->GetPinnedUsage());
|
||||
|
||||
// Set strict capacity limit flag. Now block will only load into compressed
|
||||
// block cache.
|
||||
cache->SetCapacity(usage);
|
||||
cache->SetStrictCapacityLimit(true);
|
||||
ASSERT_EQ(usage, cache->GetPinnedUsage());
|
||||
// compressed_cache->SetCapacity(compressed_usage);
|
||||
compressed_cache->SetCapacity(0);
|
||||
// compressed_cache->SetStrictCapacityLimit(true);
|
||||
iter = db_->NewIterator(read_options);
|
||||
iter->Seek(ToString(kNumBlocks - 1));
|
||||
ASSERT_TRUE(iter->status().IsIncomplete());
|
||||
CheckCacheCounters(options, 1, 0, 0, 1);
|
||||
CheckCompressedCacheCounters(options, 1, 0, 1, 0);
|
||||
delete iter;
|
||||
iter = nullptr;
|
||||
|
||||
// Clear strict capacity limit flag. This time we shall hit compressed block
|
||||
// cache.
|
||||
cache->SetStrictCapacityLimit(false);
|
||||
iter = db_->NewIterator(read_options);
|
||||
iter->Seek(ToString(kNumBlocks - 1));
|
||||
ASSERT_OK(iter->status());
|
||||
CheckCacheCounters(options, 1, 0, 1, 0);
|
||||
CheckCompressedCacheCounters(options, 0, 1, 0, 0);
|
||||
delete iter;
|
||||
iter = nullptr;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
rocksdb::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -572,6 +572,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
// Get obsolete files. This function will also update the list of
|
||||
// pending files in VersionSet().
|
||||
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
|
||||
&job_context->manifest_delete_files,
|
||||
job_context->min_pending_output);
|
||||
|
||||
// store the current filenum, lognum, etc
|
||||
@ -689,9 +690,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
|
||||
}
|
||||
|
||||
auto candidate_files = state.full_scan_candidate_files;
|
||||
candidate_files.reserve(candidate_files.size() +
|
||||
state.sst_delete_files.size() +
|
||||
state.log_delete_files.size());
|
||||
candidate_files.reserve(
|
||||
candidate_files.size() + state.sst_delete_files.size() +
|
||||
state.log_delete_files.size() + state.manifest_delete_files.size());
|
||||
// We may ignore the dbname when generating the file names.
|
||||
const char* kDumbDbName = "";
|
||||
for (auto file : state.sst_delete_files) {
|
||||
@ -707,6 +708,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
|
||||
0);
|
||||
}
|
||||
}
|
||||
for (const auto& filename : state.manifest_delete_files) {
|
||||
candidate_files.emplace_back(filename, 0);
|
||||
}
|
||||
|
||||
// dedup state.candidate_files so we don't try to delete the same
|
||||
// file twice
|
||||
@ -1844,6 +1848,17 @@ Status DBImpl::CompactFilesImpl(
|
||||
// support for CompactFiles, we should have CompactFiles API
|
||||
// pass a pointer of CompactionJobStats as the out-value
|
||||
// instead of using EventListener.
|
||||
|
||||
// Creating a compaction influences the compaction score because the score
|
||||
// takes running compactions into account (by skipping files that are already
|
||||
// being compacted). Since we just changed compaction score, we recalculate it
|
||||
// here.
|
||||
{
|
||||
CompactionOptionsFIFO dummy_compaction_options_fifo;
|
||||
version->storage_info()->ComputeCompactionScore(
|
||||
*c->mutable_cf_options(), dummy_compaction_options_fifo);
|
||||
}
|
||||
|
||||
compaction_job.Prepare();
|
||||
|
||||
mutex_.Unlock();
|
||||
|
@ -60,6 +60,44 @@ class DBIter: public Iterator {
|
||||
kReverse
|
||||
};
|
||||
|
||||
// LocalStatistics contain Statistics counters that will be aggregated per
|
||||
// each iterator instance and then will be sent to the global statistics when
|
||||
// the iterator is destroyed.
|
||||
//
|
||||
// The purpose of this approach is to avoid perf regression happening
|
||||
// when multiple threads bump the atomic counters from a DBIter::Next().
|
||||
struct LocalStatistics {
|
||||
explicit LocalStatistics() { ResetCounters(); }
|
||||
|
||||
void ResetCounters() {
|
||||
next_count_ = 0;
|
||||
next_found_count_ = 0;
|
||||
prev_count_ = 0;
|
||||
prev_found_count_ = 0;
|
||||
bytes_read_ = 0;
|
||||
}
|
||||
|
||||
void BumpGlobalStatistics(Statistics* global_statistics) {
|
||||
RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
|
||||
RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
|
||||
RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
|
||||
RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
|
||||
RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
|
||||
ResetCounters();
|
||||
}
|
||||
|
||||
// Map to Tickers::NUMBER_DB_NEXT
|
||||
uint64_t next_count_;
|
||||
// Map to Tickers::NUMBER_DB_NEXT_FOUND
|
||||
uint64_t next_found_count_;
|
||||
// Map to Tickers::NUMBER_DB_PREV
|
||||
uint64_t prev_count_;
|
||||
// Map to Tickers::NUMBER_DB_PREV_FOUND
|
||||
uint64_t prev_found_count_;
|
||||
// Map to Tickers::ITER_BYTES_READ
|
||||
uint64_t bytes_read_;
|
||||
};
|
||||
|
||||
DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
|
||||
InternalIterator* iter, SequenceNumber s, bool arena_mode,
|
||||
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
|
||||
@ -86,6 +124,7 @@ class DBIter: public Iterator {
|
||||
}
|
||||
virtual ~DBIter() {
|
||||
RecordTick(statistics_, NO_ITERATORS, -1);
|
||||
local_stats_.BumpGlobalStatistics(statistics_);
|
||||
if (!arena_mode_) {
|
||||
delete iter_;
|
||||
} else {
|
||||
@ -213,6 +252,7 @@ class DBIter: public Iterator {
|
||||
bool iter_pinned_;
|
||||
// List of operands for merge operator.
|
||||
std::deque<std::string> merge_operands_;
|
||||
LocalStatistics local_stats_;
|
||||
|
||||
// No copying allowed
|
||||
DBIter(const DBIter&);
|
||||
@ -250,6 +290,9 @@ void DBIter::Next() {
|
||||
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
|
||||
}
|
||||
|
||||
if (statistics_ != nullptr) {
|
||||
local_stats_.next_count_++;
|
||||
}
|
||||
// Now we point to the next internal position, for both of merge and
|
||||
// not merge cases.
|
||||
if (!iter_->Valid()) {
|
||||
@ -257,18 +300,15 @@ void DBIter::Next() {
|
||||
return;
|
||||
}
|
||||
FindNextUserEntry(true /* skipping the current user key */);
|
||||
if (statistics_ != nullptr) {
|
||||
RecordTick(statistics_, NUMBER_DB_NEXT);
|
||||
if (valid_) {
|
||||
RecordTick(statistics_, NUMBER_DB_NEXT_FOUND);
|
||||
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
|
||||
}
|
||||
}
|
||||
if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
|
||||
prefix_extractor_->Transform(saved_key_.GetKey())
|
||||
.compare(prefix_start_.GetKey()) != 0) {
|
||||
valid_ = false;
|
||||
}
|
||||
if (statistics_ != nullptr && valid_) {
|
||||
local_stats_.next_found_count_++;
|
||||
local_stats_.bytes_read_ += (key().size() + value().size());
|
||||
}
|
||||
}
|
||||
|
||||
// PRE: saved_key_ has the current user key if skipping
|
||||
@ -436,10 +476,10 @@ void DBIter::Prev() {
|
||||
}
|
||||
PrevInternal();
|
||||
if (statistics_ != nullptr) {
|
||||
RecordTick(statistics_, NUMBER_DB_PREV);
|
||||
local_stats_.prev_count_++;
|
||||
if (valid_) {
|
||||
RecordTick(statistics_, NUMBER_DB_PREV_FOUND);
|
||||
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
|
||||
local_stats_.prev_found_count_++;
|
||||
local_stats_.bytes_read_ += (key().size() + value().size());
|
||||
}
|
||||
}
|
||||
if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
|
||||
|
@ -10611,6 +10611,88 @@ TEST_F(DBTest, PrefixExtractorBlockFilter) {
|
||||
delete iter;
|
||||
}
|
||||
|
||||
TEST_F(DBTest, IteratorWithLocalStatistics) {
|
||||
Options options = CurrentOptions();
|
||||
options.statistics = rocksdb::CreateDBStatistics();
|
||||
DestroyAndReopen(options);
|
||||
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
// Key 10 bytes / Value 10 bytes
|
||||
ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
|
||||
}
|
||||
|
||||
std::atomic<uint64_t> total_next(0);
|
||||
std::atomic<uint64_t> total_next_found(0);
|
||||
std::atomic<uint64_t> total_prev(0);
|
||||
std::atomic<uint64_t> total_prev_found(0);
|
||||
std::atomic<uint64_t> total_bytes(0);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
std::function<void()> reader_func_next = [&]() {
|
||||
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||
|
||||
iter->SeekToFirst();
|
||||
// Seek will bump ITER_BYTES_READ
|
||||
total_bytes += iter->key().size();
|
||||
total_bytes += iter->value().size();
|
||||
while (true) {
|
||||
iter->Next();
|
||||
total_next++;
|
||||
|
||||
if (!iter->Valid()) {
|
||||
break;
|
||||
}
|
||||
total_next_found++;
|
||||
total_bytes += iter->key().size();
|
||||
total_bytes += iter->value().size();
|
||||
}
|
||||
|
||||
delete iter;
|
||||
};
|
||||
|
||||
std::function<void()> reader_func_prev = [&]() {
|
||||
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||
|
||||
iter->SeekToLast();
|
||||
// Seek will bump ITER_BYTES_READ
|
||||
total_bytes += iter->key().size();
|
||||
total_bytes += iter->value().size();
|
||||
while (true) {
|
||||
iter->Prev();
|
||||
total_prev++;
|
||||
|
||||
if (!iter->Valid()) {
|
||||
break;
|
||||
}
|
||||
total_prev_found++;
|
||||
total_bytes += iter->key().size();
|
||||
total_bytes += iter->value().size();
|
||||
}
|
||||
|
||||
delete iter;
|
||||
};
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
threads.emplace_back(reader_func_next);
|
||||
}
|
||||
for (int i = 0; i < 15; i++) {
|
||||
threads.emplace_back(reader_func_prev);
|
||||
}
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT), total_next);
|
||||
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT_FOUND),
|
||||
total_next_found);
|
||||
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV), total_prev);
|
||||
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV_FOUND),
|
||||
total_prev_found);
|
||||
ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes);
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
class BloomStatsTestWithParam
|
||||
: public DBTest,
|
||||
|
@ -22,9 +22,9 @@ class MemTable;
|
||||
struct JobContext {
|
||||
inline bool HaveSomethingToDelete() const {
|
||||
return full_scan_candidate_files.size() || sst_delete_files.size() ||
|
||||
log_delete_files.size() || new_superversion != nullptr ||
|
||||
superversions_to_free.size() > 0 || memtables_to_free.size() > 0 ||
|
||||
logs_to_free.size() > 0;
|
||||
log_delete_files.size() || manifest_delete_files.size() ||
|
||||
new_superversion != nullptr || superversions_to_free.size() > 0 ||
|
||||
memtables_to_free.size() > 0 || logs_to_free.size() > 0;
|
||||
}
|
||||
|
||||
// Structure to store information for candidate files to delete.
|
||||
@ -56,6 +56,9 @@ struct JobContext {
|
||||
// a list of log files that we need to delete
|
||||
std::vector<uint64_t> log_delete_files;
|
||||
|
||||
// a list of manifest files that we need to delete
|
||||
std::vector<std::string> manifest_delete_files;
|
||||
|
||||
// a list of memtables to be free
|
||||
autovector<MemTable*> memtables_to_free;
|
||||
|
||||
|
@ -143,8 +143,12 @@ Status TableCache::FindTable(const EnvOptions& env_options,
|
||||
// We do not cache error results so that if the error is transient,
|
||||
// or somebody repairs the file, we recover automatically.
|
||||
} else {
|
||||
*handle = cache_->Insert(key, table_reader.release(), 1,
|
||||
&DeleteEntry<TableReader>);
|
||||
s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
|
||||
handle);
|
||||
if (s.ok()) {
|
||||
// Release ownership of table reader.
|
||||
table_reader.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
return s;
|
||||
@ -285,9 +289,8 @@ Status TableCache::Get(const ReadOptions& options,
|
||||
size_t charge =
|
||||
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
|
||||
void* row_ptr = new std::string(std::move(*row_cache_entry));
|
||||
auto row_handle = ioptions_.row_cache->Insert(
|
||||
row_cache_key.GetKey(), row_ptr, charge, &DeleteEntry<std::string>);
|
||||
ioptions_.row_cache->Release(row_handle);
|
||||
ioptions_.row_cache->Insert(row_cache_key.GetKey(), row_ptr, charge,
|
||||
&DeleteEntry<std::string>);
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
|
@ -2254,6 +2254,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
db_options_->disableDataSync ? nullptr : db_directory);
|
||||
// Leave the old file behind since PurgeObsoleteFiles will take care of it
|
||||
// later. It's unsafe to delete now since file deletion may be disabled.
|
||||
obsolete_manifests_.emplace_back(
|
||||
DescriptorFileName("", manifest_file_number_));
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
@ -3388,7 +3390,10 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
||||
}
|
||||
|
||||
void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files,
|
||||
std::vector<std::string>* manifest_filenames,
|
||||
uint64_t min_pending_output) {
|
||||
assert(manifest_filenames->empty());
|
||||
obsolete_manifests_.swap(*manifest_filenames);
|
||||
std::vector<FileMetaData*> pending_files;
|
||||
for (auto f : obsolete_files_) {
|
||||
if (f->fd.GetNumber() < min_pending_output) {
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
@ -697,6 +698,7 @@ class VersionSet {
|
||||
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
|
||||
|
||||
void GetObsoleteFiles(std::vector<FileMetaData*>* files,
|
||||
std::vector<std::string>* manifest_filenames,
|
||||
uint64_t min_pending_output);
|
||||
|
||||
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
|
||||
@ -758,6 +760,7 @@ class VersionSet {
|
||||
uint64_t manifest_file_size_;
|
||||
|
||||
std::vector<FileMetaData*> obsolete_files_;
|
||||
std::vector<std::string> obsolete_manifests_;
|
||||
|
||||
// env options for all reads and writes except compactions
|
||||
const EnvOptions& env_options_;
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include <memory>
|
||||
#include <stdint.h>
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/status.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -33,12 +34,15 @@ using std::shared_ptr;
|
||||
class Cache;
|
||||
|
||||
// Create a new cache with a fixed size capacity. The cache is sharded
|
||||
// to 2^numShardBits shards, by hash of the key. The total capacity
|
||||
// to 2^num_shard_bits shards, by hash of the key. The total capacity
|
||||
// is divided and evenly assigned to each shard.
|
||||
//
|
||||
// The functions without parameter numShardBits uses default value, which is 4
|
||||
// The parameter num_shard_bits defaults to 4, and strict_capacity_limit
|
||||
// defaults to false.
|
||||
extern shared_ptr<Cache> NewLRUCache(size_t capacity);
|
||||
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int numShardBits);
|
||||
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits);
|
||||
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
|
||||
bool strict_capacity_limit);
|
||||
|
||||
class Cache {
|
||||
public:
|
||||
@ -55,15 +59,22 @@ class Cache {
|
||||
|
||||
// Insert a mapping from key->value into the cache and assign it
|
||||
// the specified charge against the total cache capacity.
|
||||
// If strict_capacity_limit is true and cache reaches its full capacity,
|
||||
// return Status::Incomplete.
|
||||
//
|
||||
// Returns a handle that corresponds to the mapping. The caller
|
||||
// must call this->Release(handle) when the returned mapping is no
|
||||
// longer needed.
|
||||
// If handle is not nullptr, returns a handle that corresponds to the
|
||||
// mapping. The caller must call this->Release(handle) when the returned
|
||||
// mapping is no longer needed. In case of error caller is responsible to
|
||||
// cleanup the value (i.e. calling "deleter").
|
||||
//
|
||||
// If handle is nullptr, it is as if Release is called immediately after
|
||||
// insert. In case of error value will be cleanup.
|
||||
//
|
||||
// When the inserted entry is no longer needed, the key and
|
||||
// value will be passed to "deleter".
|
||||
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
|
||||
void (*deleter)(const Slice& key, void* value)) = 0;
|
||||
virtual Status Insert(const Slice& key, void* value, size_t charge,
|
||||
void (*deleter)(const Slice& key, void* value),
|
||||
Handle** handle = nullptr) = 0;
|
||||
|
||||
// If the cache has no mapping for "key", returns nullptr.
|
||||
//
|
||||
@ -100,6 +111,14 @@ class Cache {
|
||||
// purge the released entries from the cache in order to lower the usage
|
||||
virtual void SetCapacity(size_t capacity) = 0;
|
||||
|
||||
// Set whether to return error on insertion when cache reaches its full
|
||||
// capacity.
|
||||
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0;
|
||||
|
||||
// Set whether to return error on insertion when cache reaches its full
|
||||
// capacity.
|
||||
virtual bool HasStrictCapacityLimit() const = 0;
|
||||
|
||||
// returns the maximum configured capacity of the cache
|
||||
virtual size_t GetCapacity() const = 0;
|
||||
|
||||
|
@ -33,6 +33,8 @@ enum Tickers : uint32_t {
|
||||
BLOCK_CACHE_HIT,
|
||||
// # of blocks added to block cache.
|
||||
BLOCK_CACHE_ADD,
|
||||
// # of failures when adding blocks to block cache.
|
||||
BLOCK_CACHE_ADD_FAILURES,
|
||||
// # of times cache miss when accessing index block from block cache.
|
||||
BLOCK_CACHE_INDEX_MISS,
|
||||
// # of times cache hit when accessing index block from block cache.
|
||||
@ -140,8 +142,12 @@ enum Tickers : uint32_t {
|
||||
GET_UPDATES_SINCE_CALLS,
|
||||
BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache
|
||||
BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache
|
||||
WAL_FILE_SYNCED, // Number of times WAL sync is done
|
||||
WAL_FILE_BYTES, // Number of bytes written to WAL
|
||||
// Number of blocks added to comopressed block cache
|
||||
BLOCK_CACHE_COMPRESSED_ADD,
|
||||
// Number of failures when adding blocks to compressed block cache
|
||||
BLOCK_CACHE_COMPRESSED_ADD_FAILURES,
|
||||
WAL_FILE_SYNCED, // Number of times WAL sync is done
|
||||
WAL_FILE_BYTES, // Number of bytes written to WAL
|
||||
|
||||
// Writes can be processed by requesting thread or by the thread at the
|
||||
// head of the writers queue.
|
||||
@ -176,6 +182,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
|
||||
{BLOCK_CACHE_MISS, "rocksdb.block.cache.miss"},
|
||||
{BLOCK_CACHE_HIT, "rocksdb.block.cache.hit"},
|
||||
{BLOCK_CACHE_ADD, "rocksdb.block.cache.add"},
|
||||
{BLOCK_CACHE_ADD_FAILURES, "rocksdb.block.cache.add.failures"},
|
||||
{BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss"},
|
||||
{BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit"},
|
||||
{BLOCK_CACHE_FILTER_MISS, "rocksdb.block.cache.filter.miss"},
|
||||
@ -227,6 +234,9 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
|
||||
{GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls"},
|
||||
{BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss"},
|
||||
{BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit"},
|
||||
{BLOCK_CACHE_COMPRESSED_ADD, "rocksdb.block.cachecompressed.add"},
|
||||
{BLOCK_CACHE_COMPRESSED_ADD_FAILURES,
|
||||
"rocksdb.block.cachecompressed.add.failures"},
|
||||
{WAL_FILE_SYNCED, "rocksdb.wal.synced"},
|
||||
{WAL_FILE_BYTES, "rocksdb.wal.bytes"},
|
||||
{WRITE_DONE_BY_SELF, "rocksdb.write.self"},
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 4
|
||||
#define ROCKSDB_MINOR 6
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 1
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -703,7 +703,6 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
|
||||
|
||||
if (type != kNoCompression && block_cache_compressed != nullptr) {
|
||||
|
||||
Cache::Handle* cache_handle = nullptr;
|
||||
size_t size = block_contents.size();
|
||||
|
||||
std::unique_ptr<char[]> ubuf(new char[size + 1]);
|
||||
@ -723,9 +722,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
|
||||
(end - r->compressed_cache_key_prefix));
|
||||
|
||||
// Insert into compressed block cache.
|
||||
cache_handle = block_cache_compressed->Insert(
|
||||
key, block, block->usable_size(), &DeleteCachedBlock);
|
||||
block_cache_compressed->Release(cache_handle);
|
||||
block_cache_compressed->Insert(key, block, block->usable_size(),
|
||||
&DeleteCachedBlock);
|
||||
|
||||
// Invalidate OS cache.
|
||||
r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
|
||||
|
@ -740,11 +740,16 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
assert(block->value->compression_type() == kNoCompression);
|
||||
if (block_cache != nullptr && block->value->cachable() &&
|
||||
read_options.fill_cache) {
|
||||
block->cache_handle = block_cache->Insert(block_cache_key, block->value,
|
||||
block->value->usable_size(),
|
||||
&DeleteCachedEntry<Block>);
|
||||
assert(reinterpret_cast<Block*>(
|
||||
block_cache->Value(block->cache_handle)) == block->value);
|
||||
s = block_cache->Insert(
|
||||
block_cache_key, block->value, block->value->usable_size(),
|
||||
&DeleteCachedEntry<Block>, &(block->cache_handle));
|
||||
if (s.ok()) {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
|
||||
delete block->value;
|
||||
block->value = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -784,27 +789,37 @@ Status BlockBasedTable::PutDataBlockToCache(
|
||||
// Release the hold on the compressed cache entry immediately.
|
||||
if (block_cache_compressed != nullptr && raw_block != nullptr &&
|
||||
raw_block->cachable()) {
|
||||
auto cache_handle = block_cache_compressed->Insert(
|
||||
compressed_block_cache_key, raw_block, raw_block->usable_size(),
|
||||
&DeleteCachedEntry<Block>);
|
||||
block_cache_compressed->Release(cache_handle);
|
||||
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS);
|
||||
// Avoid the following code to delete this cached block.
|
||||
raw_block = nullptr;
|
||||
s = block_cache_compressed->Insert(compressed_block_cache_key, raw_block,
|
||||
raw_block->usable_size(),
|
||||
&DeleteCachedEntry<Block>);
|
||||
if (s.ok()) {
|
||||
// Avoid the following code to delete this cached block.
|
||||
raw_block = nullptr;
|
||||
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD);
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
|
||||
}
|
||||
}
|
||||
delete raw_block;
|
||||
|
||||
// insert into uncompressed block cache
|
||||
assert((block->value->compression_type() == kNoCompression));
|
||||
if (block_cache != nullptr && block->value->cachable()) {
|
||||
block->cache_handle = block_cache->Insert(block_cache_key, block->value,
|
||||
block->value->usable_size(),
|
||||
&DeleteCachedEntry<Block>);
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
|
||||
block->value->usable_size());
|
||||
assert(reinterpret_cast<Block*>(block_cache->Value(block->cache_handle)) ==
|
||||
block->value);
|
||||
s = block_cache->Insert(block_cache_key, block->value,
|
||||
block->value->usable_size(),
|
||||
&DeleteCachedEntry<Block>, &(block->cache_handle));
|
||||
if (s.ok()) {
|
||||
assert(block->cache_handle != nullptr);
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
|
||||
block->value->usable_size());
|
||||
assert(reinterpret_cast<Block*>(
|
||||
block_cache->Value(block->cache_handle)) == block->value);
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
|
||||
delete block->value;
|
||||
block->value = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
return s;
|
||||
@ -891,10 +906,17 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
|
||||
filter = ReadFilter(rep_, &filter_size);
|
||||
if (filter != nullptr) {
|
||||
assert(filter_size > 0);
|
||||
cache_handle = block_cache->Insert(key, filter, filter_size,
|
||||
&DeleteCachedEntry<FilterBlockReader>);
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter_size);
|
||||
Status s = block_cache->Insert(key, filter, filter_size,
|
||||
&DeleteCachedEntry<FilterBlockReader>,
|
||||
&cache_handle);
|
||||
if (s.ok()) {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter_size);
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
|
||||
delete filter;
|
||||
return CachableEntry<FilterBlockReader>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -937,10 +959,18 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
|
||||
// Create index reader and put it in the cache.
|
||||
Status s;
|
||||
s = CreateIndexReader(&index_reader);
|
||||
if (s.ok()) {
|
||||
s = block_cache->Insert(key, index_reader, index_reader->usable_size(),
|
||||
&DeleteCachedEntry<IndexReader>, &cache_handle);
|
||||
}
|
||||
|
||||
if (!s.ok()) {
|
||||
if (s.ok()) {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
|
||||
index_reader->usable_size());
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
|
||||
// make sure if something goes wrong, index_reader shall remain intact.
|
||||
assert(index_reader == nullptr);
|
||||
if (input_iter != nullptr) {
|
||||
input_iter->SetStatus(s);
|
||||
return input_iter;
|
||||
@ -949,12 +979,6 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
|
||||
}
|
||||
}
|
||||
|
||||
cache_handle =
|
||||
block_cache->Insert(key, index_reader, index_reader->usable_size(),
|
||||
&DeleteCachedEntry<IndexReader>);
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
|
||||
index_reader->usable_size());
|
||||
}
|
||||
|
||||
assert(cache_handle);
|
||||
@ -1036,7 +1060,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
||||
}
|
||||
|
||||
// Didn't get any data from block caches.
|
||||
if (block.value == nullptr) {
|
||||
if (s.ok() && block.value == nullptr) {
|
||||
if (no_io) {
|
||||
// Could not read from block_cache and can't do IO
|
||||
if (input_iter != nullptr) {
|
||||
@ -1055,7 +1079,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
||||
}
|
||||
|
||||
InternalIterator* iter;
|
||||
if (block.value != nullptr) {
|
||||
if (s.ok() && block.value != nullptr) {
|
||||
iter = block.value->NewIterator(&rep->internal_comparator, input_iter);
|
||||
if (block.cache_handle != nullptr) {
|
||||
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
|
||||
|
117
util/cache.cc
117
util/cache.cc
@ -196,10 +196,13 @@ class LRUCache {
|
||||
// free the needed space
|
||||
void SetCapacity(size_t capacity);
|
||||
|
||||
// Set the flag to reject insertion if cache if full.
|
||||
void SetStrictCapacityLimit(bool strict_capacity_limit);
|
||||
|
||||
// Like Cache methods, but with an extra "hash" parameter.
|
||||
Cache::Handle* Insert(const Slice& key, uint32_t hash,
|
||||
void* value, size_t charge,
|
||||
void (*deleter)(const Slice& key, void* value));
|
||||
Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge,
|
||||
void (*deleter)(const Slice& key, void* value),
|
||||
Cache::Handle** handle);
|
||||
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
|
||||
void Release(Cache::Handle* handle);
|
||||
void Erase(const Slice& key, uint32_t hash);
|
||||
@ -245,6 +248,9 @@ class LRUCache {
|
||||
// Memory size for entries residing only in the LRU list
|
||||
size_t lru_usage_;
|
||||
|
||||
// Whether to reject insertion if cache reaches its full capacity.
|
||||
bool strict_capacity_limit_;
|
||||
|
||||
// mutex_ protects the following state.
|
||||
// We don't count mutex_ as the cache's internal state so semantically we
|
||||
// don't mind mutex_ invoking the non-const actions.
|
||||
@ -336,6 +342,11 @@ void LRUCache::SetCapacity(size_t capacity) {
|
||||
}
|
||||
}
|
||||
|
||||
void LRUCache::SetStrictCapacityLimit(bool strict_capacity_limit) {
|
||||
MutexLock l(&mutex_);
|
||||
strict_capacity_limit_ = strict_capacity_limit;
|
||||
}
|
||||
|
||||
Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
|
||||
MutexLock l(&mutex_);
|
||||
LRUHandle* e = table_.Lookup(key, hash);
|
||||
@ -350,6 +361,9 @@ Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
|
||||
}
|
||||
|
||||
void LRUCache::Release(Cache::Handle* handle) {
|
||||
if (handle == nullptr) {
|
||||
return;
|
||||
}
|
||||
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
|
||||
bool last_reference = false;
|
||||
{
|
||||
@ -383,15 +397,16 @@ void LRUCache::Release(Cache::Handle* handle) {
|
||||
}
|
||||
}
|
||||
|
||||
Cache::Handle* LRUCache::Insert(
|
||||
const Slice& key, uint32_t hash, void* value, size_t charge,
|
||||
void (*deleter)(const Slice& key, void* value)) {
|
||||
|
||||
Status LRUCache::Insert(const Slice& key, uint32_t hash, void* value,
|
||||
size_t charge,
|
||||
void (*deleter)(const Slice& key, void* value),
|
||||
Cache::Handle** handle) {
|
||||
// Allocate the memory here outside of the mutex
|
||||
// If the cache is full, we'll have to release it
|
||||
// It shouldn't happen very often though.
|
||||
LRUHandle* e = reinterpret_cast<LRUHandle*>(
|
||||
new char[sizeof(LRUHandle) - 1 + key.size()]);
|
||||
Status s;
|
||||
autovector<LRUHandle*> last_reference_list;
|
||||
|
||||
e->value = value;
|
||||
@ -399,7 +414,9 @@ Cache::Handle* LRUCache::Insert(
|
||||
e->charge = charge;
|
||||
e->key_length = key.size();
|
||||
e->hash = hash;
|
||||
e->refs = 2; // One from LRUCache, one for the returned handle
|
||||
e->refs = (handle == nullptr
|
||||
? 1
|
||||
: 2); // One from LRUCache, one for the returned handle
|
||||
e->next = e->prev = nullptr;
|
||||
e->in_cache = true;
|
||||
memcpy(e->key_data, key.data(), key.size());
|
||||
@ -411,20 +428,36 @@ Cache::Handle* LRUCache::Insert(
|
||||
// is freed or the lru list is empty
|
||||
EvictFromLRU(charge, &last_reference_list);
|
||||
|
||||
// insert into the cache
|
||||
// note that the cache might get larger than its capacity if not enough
|
||||
// space was freed
|
||||
LRUHandle* old = table_.Insert(e);
|
||||
usage_ += e->charge;
|
||||
if (old != nullptr) {
|
||||
old->in_cache = false;
|
||||
if (Unref(old)) {
|
||||
usage_ -= old->charge;
|
||||
// old is on LRU because it's in cache and its reference count
|
||||
// was just 1 (Unref returned 0)
|
||||
LRU_Remove(old);
|
||||
last_reference_list.push_back(old);
|
||||
if (strict_capacity_limit_ && usage_ - lru_usage_ + charge > capacity_) {
|
||||
if (handle == nullptr) {
|
||||
last_reference_list.push_back(e);
|
||||
} else {
|
||||
delete[] reinterpret_cast<char*>(e);
|
||||
*handle = nullptr;
|
||||
}
|
||||
s = Status::Incomplete("Insert failed due to LRU cache being full.");
|
||||
} else {
|
||||
// insert into the cache
|
||||
// note that the cache might get larger than its capacity if not enough
|
||||
// space was freed
|
||||
LRUHandle* old = table_.Insert(e);
|
||||
usage_ += e->charge;
|
||||
if (old != nullptr) {
|
||||
old->in_cache = false;
|
||||
if (Unref(old)) {
|
||||
usage_ -= old->charge;
|
||||
// old is on LRU because it's in cache and its reference count
|
||||
// was just 1 (Unref returned 0)
|
||||
LRU_Remove(old);
|
||||
last_reference_list.push_back(old);
|
||||
}
|
||||
}
|
||||
if (handle == nullptr) {
|
||||
LRU_Append(e);
|
||||
} else {
|
||||
*handle = reinterpret_cast<Cache::Handle*>(e);
|
||||
}
|
||||
s = Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
@ -434,7 +467,7 @@ Cache::Handle* LRUCache::Insert(
|
||||
entry->Free();
|
||||
}
|
||||
|
||||
return reinterpret_cast<Cache::Handle*>(e);
|
||||
return s;
|
||||
}
|
||||
|
||||
void LRUCache::Erase(const Slice& key, uint32_t hash) {
|
||||
@ -472,6 +505,7 @@ class ShardedLRUCache : public Cache {
|
||||
uint64_t last_id_;
|
||||
int num_shard_bits_;
|
||||
size_t capacity_;
|
||||
bool strict_capacity_limit_;
|
||||
|
||||
static inline uint32_t HashSlice(const Slice& s) {
|
||||
return Hash(s.data(), s.size(), 0);
|
||||
@ -483,13 +517,18 @@ class ShardedLRUCache : public Cache {
|
||||
}
|
||||
|
||||
public:
|
||||
ShardedLRUCache(size_t capacity, int num_shard_bits)
|
||||
: last_id_(0), num_shard_bits_(num_shard_bits), capacity_(capacity) {
|
||||
ShardedLRUCache(size_t capacity, int num_shard_bits,
|
||||
bool strict_capacity_limit)
|
||||
: last_id_(0),
|
||||
num_shard_bits_(num_shard_bits),
|
||||
capacity_(capacity),
|
||||
strict_capacity_limit_(strict_capacity_limit) {
|
||||
int num_shards = 1 << num_shard_bits_;
|
||||
shards_ = new LRUCache[num_shards];
|
||||
const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
|
||||
for (int s = 0; s < num_shards; s++) {
|
||||
shards_[s].SetCapacity(per_shard);
|
||||
shards_[s].SetStrictCapacityLimit(strict_capacity_limit);
|
||||
}
|
||||
}
|
||||
virtual ~ShardedLRUCache() {
|
||||
@ -504,11 +543,19 @@ class ShardedLRUCache : public Cache {
|
||||
}
|
||||
capacity_ = capacity;
|
||||
}
|
||||
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
|
||||
void (*deleter)(const Slice& key,
|
||||
void* value)) override {
|
||||
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
|
||||
int num_shards = 1 << num_shard_bits_;
|
||||
for (int s = 0; s < num_shards; s++) {
|
||||
shards_[s].SetStrictCapacityLimit(strict_capacity_limit);
|
||||
}
|
||||
strict_capacity_limit_ = strict_capacity_limit;
|
||||
}
|
||||
virtual Status Insert(const Slice& key, void* value, size_t charge,
|
||||
void (*deleter)(const Slice& key, void* value),
|
||||
Handle** handle) override {
|
||||
const uint32_t hash = HashSlice(key);
|
||||
return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter);
|
||||
return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter,
|
||||
handle);
|
||||
}
|
||||
virtual Handle* Lookup(const Slice& key) override {
|
||||
const uint32_t hash = HashSlice(key);
|
||||
@ -531,6 +578,10 @@ class ShardedLRUCache : public Cache {
|
||||
}
|
||||
virtual size_t GetCapacity() const override { return capacity_; }
|
||||
|
||||
virtual bool HasStrictCapacityLimit() const override {
|
||||
return strict_capacity_limit_;
|
||||
}
|
||||
|
||||
virtual size_t GetUsage() const override {
|
||||
// We will not lock the cache when getting the usage from shards.
|
||||
int num_shards = 1 << num_shard_bits_;
|
||||
@ -569,14 +620,20 @@ class ShardedLRUCache : public Cache {
|
||||
} // end anonymous namespace
|
||||
|
||||
shared_ptr<Cache> NewLRUCache(size_t capacity) {
|
||||
return NewLRUCache(capacity, kNumShardBits);
|
||||
return NewLRUCache(capacity, kNumShardBits, false);
|
||||
}
|
||||
|
||||
shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits) {
|
||||
return NewLRUCache(capacity, num_shard_bits, false);
|
||||
}
|
||||
|
||||
shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
|
||||
bool strict_capacity_limit) {
|
||||
if (num_shard_bits >= 20) {
|
||||
return nullptr; // the cache cannot be sharded into too many fine pieces
|
||||
}
|
||||
return std::make_shared<ShardedLRUCache>(capacity, num_shard_bits);
|
||||
return std::make_shared<ShardedLRUCache>(capacity, num_shard_bits,
|
||||
strict_capacity_limit);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -142,8 +142,7 @@ class CacheBench {
|
||||
// Cast uint64* to be char*, data would be copied to cache
|
||||
Slice key(reinterpret_cast<char*>(&rand_key), 8);
|
||||
// do insert
|
||||
auto handle = cache_->Insert(key, new char[10], 1, &deleter);
|
||||
cache_->Release(handle);
|
||||
cache_->Insert(key, new char[10], 1, &deleter);
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,8 +220,7 @@ class CacheBench {
|
||||
int32_t prob_op = thread->rnd.Uniform(100);
|
||||
if (prob_op >= 0 && prob_op < FLAGS_insert_percent) {
|
||||
// do insert
|
||||
auto handle = cache_->Insert(key, new char[10], 1, &deleter);
|
||||
cache_->Release(handle);
|
||||
cache_->Insert(key, new char[10], 1, &deleter);
|
||||
} else if (prob_op -= FLAGS_insert_percent &&
|
||||
prob_op < FLAGS_lookup_percent) {
|
||||
// do lookup
|
||||
|
@ -73,8 +73,8 @@ class CacheTest : public testing::Test {
|
||||
}
|
||||
|
||||
void Insert(shared_ptr<Cache> cache, int key, int value, int charge = 1) {
|
||||
cache->Release(cache->Insert(EncodeKey(key), EncodeValue(value), charge,
|
||||
&CacheTest::Deleter));
|
||||
cache->Insert(EncodeKey(key), EncodeValue(value), charge,
|
||||
&CacheTest::Deleter);
|
||||
}
|
||||
|
||||
void Erase(shared_ptr<Cache> cache, int key) {
|
||||
@ -118,14 +118,12 @@ TEST_F(CacheTest, UsageTest) {
|
||||
auto cache = NewLRUCache(kCapacity, 8);
|
||||
|
||||
size_t usage = 0;
|
||||
const char* value = "abcdef";
|
||||
char value[10] = "abcdef";
|
||||
// make sure everything will be cached
|
||||
for (int i = 1; i < 100; ++i) {
|
||||
std::string key(i, 'a');
|
||||
auto kv_size = key.size() + 5;
|
||||
cache->Release(
|
||||
cache->Insert(key, (void*)value, kv_size, dumbDeleter)
|
||||
);
|
||||
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter);
|
||||
usage += kv_size;
|
||||
ASSERT_EQ(usage, cache->GetUsage());
|
||||
}
|
||||
@ -133,9 +131,8 @@ TEST_F(CacheTest, UsageTest) {
|
||||
// make sure the cache will be overloaded
|
||||
for (uint64_t i = 1; i < kCapacity; ++i) {
|
||||
auto key = ToString(i);
|
||||
cache->Release(
|
||||
cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter)
|
||||
);
|
||||
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
|
||||
dumbDeleter);
|
||||
}
|
||||
|
||||
// the usage should be close to the capacity
|
||||
@ -149,7 +146,7 @@ TEST_F(CacheTest, PinnedUsageTest) {
|
||||
auto cache = NewLRUCache(kCapacity, 8);
|
||||
|
||||
size_t pinned_usage = 0;
|
||||
const char* value = "abcdef";
|
||||
char value[10] = "abcdef";
|
||||
|
||||
std::forward_list<Cache::Handle*> unreleased_handles;
|
||||
|
||||
@ -158,7 +155,9 @@ TEST_F(CacheTest, PinnedUsageTest) {
|
||||
for (int i = 1; i < 100; ++i) {
|
||||
std::string key(i, 'a');
|
||||
auto kv_size = key.size() + 5;
|
||||
auto handle = cache->Insert(key, (void*)value, kv_size, dumbDeleter);
|
||||
Cache::Handle* handle;
|
||||
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter,
|
||||
&handle);
|
||||
pinned_usage += kv_size;
|
||||
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
|
||||
if (i % 2 == 0) {
|
||||
@ -182,8 +181,8 @@ TEST_F(CacheTest, PinnedUsageTest) {
|
||||
// check that overloading the cache does not change the pinned usage
|
||||
for (uint64_t i = 1; i < 2 * kCapacity; ++i) {
|
||||
auto key = ToString(i);
|
||||
cache->Release(
|
||||
cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter));
|
||||
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
|
||||
dumbDeleter);
|
||||
}
|
||||
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
|
||||
|
||||
@ -408,7 +407,8 @@ TEST_F(CacheTest, SetCapacity) {
|
||||
// Insert 5 entries, but not releasing.
|
||||
for (size_t i = 0; i < 5; i++) {
|
||||
std::string key = ToString(i+1);
|
||||
handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter);
|
||||
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
|
||||
ASSERT_TRUE(s.ok());
|
||||
}
|
||||
ASSERT_EQ(5U, cache->GetCapacity());
|
||||
ASSERT_EQ(5U, cache->GetUsage());
|
||||
@ -422,7 +422,8 @@ TEST_F(CacheTest, SetCapacity) {
|
||||
// and usage should be 7
|
||||
for (size_t i = 5; i < 10; i++) {
|
||||
std::string key = ToString(i+1);
|
||||
handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter);
|
||||
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
|
||||
ASSERT_TRUE(s.ok());
|
||||
}
|
||||
ASSERT_EQ(10U, cache->GetCapacity());
|
||||
ASSERT_EQ(10U, cache->GetUsage());
|
||||
@ -441,6 +442,53 @@ TEST_F(CacheTest, SetCapacity) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CacheTest, SetStrictCapacityLimit) {
|
||||
// test1: set the flag to false. Insert more keys than capacity. See if they
|
||||
// all go through.
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(5, 0, false);
|
||||
std::vector<Cache::Handle*> handles(10);
|
||||
Status s;
|
||||
for (size_t i = 0; i < 10; i++) {
|
||||
std::string key = ToString(i + 1);
|
||||
s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
|
||||
ASSERT_TRUE(s.ok());
|
||||
ASSERT_NE(nullptr, handles[i]);
|
||||
}
|
||||
|
||||
// test2: set the flag to true. Insert and check if it fails.
|
||||
std::string extra_key = "extra";
|
||||
Value* extra_value = new Value(0);
|
||||
cache->SetStrictCapacityLimit(true);
|
||||
Cache::Handle* handle;
|
||||
s = cache->Insert(extra_key, extra_value, 1, &deleter, &handle);
|
||||
ASSERT_TRUE(s.IsIncomplete());
|
||||
ASSERT_EQ(nullptr, handle);
|
||||
|
||||
for (size_t i = 0; i < 10; i++) {
|
||||
cache->Release(handles[i]);
|
||||
}
|
||||
|
||||
// test3: init with flag being true.
|
||||
std::shared_ptr<Cache> cache2 = NewLRUCache(5, 0, true);
|
||||
for (size_t i = 0; i < 5; i++) {
|
||||
std::string key = ToString(i + 1);
|
||||
s = cache2->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
|
||||
ASSERT_TRUE(s.ok());
|
||||
ASSERT_NE(nullptr, handles[i]);
|
||||
}
|
||||
s = cache2->Insert(extra_key, extra_value, 1, &deleter, &handle);
|
||||
ASSERT_TRUE(s.IsIncomplete());
|
||||
ASSERT_EQ(nullptr, handle);
|
||||
// test insert without handle
|
||||
s = cache2->Insert(extra_key, extra_value, 1, &deleter);
|
||||
ASSERT_TRUE(s.IsIncomplete());
|
||||
ASSERT_EQ(5, cache->GetUsage());
|
||||
|
||||
for (size_t i = 0; i < 5; i++) {
|
||||
cache2->Release(handles[i]);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CacheTest, OverCapacity) {
|
||||
size_t n = 10;
|
||||
|
||||
@ -452,7 +500,8 @@ TEST_F(CacheTest, OverCapacity) {
|
||||
// Insert n+1 entries, but not releasing.
|
||||
for (size_t i = 0; i < n + 1; i++) {
|
||||
std::string key = ToString(i+1);
|
||||
handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter);
|
||||
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
|
||||
ASSERT_TRUE(s.ok());
|
||||
}
|
||||
|
||||
// Guess what's in the cache now?
|
||||
|
@ -4,6 +4,7 @@
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include <math.h>
|
||||
#include <cmath>
|
||||
#include <algorithm>
|
||||
#include "rocksdb/options.h"
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include <assert.h>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <unordered_set>
|
||||
|
@ -10,6 +10,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
@ -118,9 +118,9 @@ class BackupEngineImpl : public BackupEngine {
|
||||
void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
|
||||
|
||||
// Extends the "result" map with pathname->size mappings for the contents of
|
||||
// "dir". Pathnames are prefixed with "dir".
|
||||
// "dir" in "env". Pathnames are prefixed with "dir".
|
||||
Status InsertPathnameToSizeBytes(
|
||||
const std::string& dir,
|
||||
const std::string& dir, Env* env,
|
||||
std::unordered_map<std::string, uint64_t>* result);
|
||||
|
||||
struct FileInfo {
|
||||
@ -585,12 +585,13 @@ Status BackupEngineImpl::Initialize() {
|
||||
for (const auto& rel_dir :
|
||||
{GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
|
||||
const auto abs_dir = GetAbsolutePath(rel_dir);
|
||||
InsertPathnameToSizeBytes(abs_dir, &abs_path_to_size);
|
||||
InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
|
||||
}
|
||||
// load the backups if any
|
||||
for (auto& backup : backups_) {
|
||||
InsertPathnameToSizeBytes(
|
||||
GetAbsolutePath(GetPrivateFileRel(backup.first)), &abs_path_to_size);
|
||||
GetAbsolutePath(GetPrivateFileRel(backup.first)), backup_env_,
|
||||
&abs_path_to_size);
|
||||
Status s =
|
||||
backup.second->LoadFromFile(options_.backup_dir, abs_path_to_size);
|
||||
if (!s.ok()) {
|
||||
@ -704,7 +705,7 @@ Status BackupEngineImpl::CreateNewBackup(
|
||||
// Pre-fetch sizes for data files
|
||||
std::unordered_map<std::string, uint64_t> data_path_to_size;
|
||||
if (s.ok()) {
|
||||
s = InsertPathnameToSizeBytes(db->GetName(), &data_path_to_size);
|
||||
s = InsertPathnameToSizeBytes(db->GetName(), db_env_, &data_path_to_size);
|
||||
}
|
||||
|
||||
std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
|
||||
@ -762,7 +763,7 @@ Status BackupEngineImpl::CreateNewBackup(
|
||||
std::unordered_map<std::string, uint64_t> wal_path_to_size;
|
||||
if (s.ok()) {
|
||||
if (db->GetOptions().wal_dir != "") {
|
||||
s = InsertPathnameToSizeBytes(db->GetOptions().wal_dir,
|
||||
s = InsertPathnameToSizeBytes(db->GetOptions().wal_dir, db_env_,
|
||||
&wal_path_to_size);
|
||||
} else {
|
||||
wal_path_to_size = std::move(data_path_to_size);
|
||||
@ -1118,7 +1119,7 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
|
||||
for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
|
||||
GetSharedFileWithChecksumRel()}) {
|
||||
const auto abs_dir = GetAbsolutePath(rel_dir);
|
||||
InsertPathnameToSizeBytes(abs_dir, &curr_abs_path_to_size);
|
||||
InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size);
|
||||
}
|
||||
|
||||
for (const auto& file_info : backup->GetFiles()) {
|
||||
@ -1432,10 +1433,11 @@ void BackupEngineImpl::DeleteChildren(const std::string& dir,
|
||||
}
|
||||
|
||||
Status BackupEngineImpl::InsertPathnameToSizeBytes(
|
||||
const std::string& dir, std::unordered_map<std::string, uint64_t>* result) {
|
||||
const std::string& dir, Env* env,
|
||||
std::unordered_map<std::string, uint64_t>* result) {
|
||||
assert(result != nullptr);
|
||||
std::vector<Env::FileAttributes> files_attrs;
|
||||
Status status = backup_env_->GetChildrenFileAttributes(dir, &files_attrs);
|
||||
Status status = env->GetChildrenFileAttributes(dir, &files_attrs);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
|
||||
#include "db/db_impl.h"
|
||||
#include "db/filename.h"
|
||||
#include "port/port.h"
|
||||
#include "port/stack_trace.h"
|
||||
@ -1318,10 +1319,22 @@ TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) {
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
|
||||
|
||||
flush_thread.join();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
// The last manifest roll would've already been cleaned up by the full scan
|
||||
// that happens when CreateNewBackup invokes EnableFileDeletions. We need to
|
||||
// trigger another roll to verify non-full scan purges stale manifests.
|
||||
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_.get());
|
||||
std::string prev_manifest_path =
|
||||
DescriptorFileName(dbname_, db_impl->TEST_Current_Manifest_FileNo());
|
||||
FillDB(db_.get(), 0, 100);
|
||||
ASSERT_OK(env_->FileExists(prev_manifest_path));
|
||||
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||
ASSERT_TRUE(env_->FileExists(prev_manifest_path).IsNotFound());
|
||||
|
||||
CloseDBAndBackupEngine();
|
||||
DestroyDB(dbname_, Options());
|
||||
AssertBackupConsistency(0, 0, 100);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
// see https://github.com/facebook/rocksdb/issues/921
|
||||
|
Loading…
Reference in New Issue
Block a user