Compare commits

...

14 Commits
main ... 4.6.fb

Author SHA1 Message Date
sdong
b026bfdf4a Disable error as warning 2019-11-05 11:03:42 -08:00
sdong
ce7b6bced6 Add one more #include<functional> 2019-11-05 11:03:40 -08:00
sdong
2f3fac5a1d Add some include<functional> 2019-10-31 14:30:44 -07:00
sdong
b2ef1cb1ae [FB Internal] Point to the latest tool chain. 2019-10-31 14:30:43 -07:00
sdong
05ab235fe7 [fb only] revert unintended change of USE_SSE
The previuos change that use gcc-5 set USE_SSE to wrong flag by mistake. Fix it.
2017-07-17 22:21:57 -07:00
sdong
36a76d543e [FB Only] use gcc-5 2017-07-17 21:48:14 -07:00
Islam AbdelRahman
21b393c7f3 backport fbcode gcc path fix 2016-06-08 13:06:52 -07:00
Andrew Kryczka
8d7926a766 Increment patch number for 4.6.1 2016-05-06 17:29:53 -07:00
Andrew Kryczka
9ed80413e0 Retrieve file size from proper Env
Summary:
When db_env_ != backup_env_, InsertPathnameToSizeBytes() would
use the wrong Env during backup creation. This happened because this function
used backup_env_ instead of db_env_ to get WAL/data file sizes.

This diff adds an argument to InsertPathnameToSizeBytes() indicating which Env
to use.

Test Plan: ran @anirbanb's BackupTestTool

Reviewers: sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D57159
2016-05-06 17:28:15 -07:00
Andrew Kryczka
15ec7d4f95 Update change log for 4.6 release
Summary: as titled

Test Plan: N/A

Reviewers: sdong, kradhakrishnan, anthony, yhchiang, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D55323
2016-03-14 11:11:09 -07:00
Islam AbdelRahman
0612f472b2 Aggregate hot Iterator counters in LocalStatistics (DBIter::Next perf regression)
Summary:
This patch bump the counters in the frequent code path DBIter::Next() / DBIter::Prev() in a local data members and send them to Statistics when the iterator is destroyed
A better solution will be to have thread_local implementation for Statistics

New performance
```
readseq      :       0.035 micros/op 28597881 ops/sec; 3163.7 MB/s
     1,851,568,819      stalled-cycles-frontend   #   31.29% frontend cycles idle    [49.86%]
       884,929,823      stalled-cycles-backend    #   14.95% backend  cycles idle    [50.21%]
readreverse  :       0.071 micros/op 14077393 ops/sec; 1557.3 MB/s
     3,239,575,993      stalled-cycles-frontend   #   27.36% frontend cycles idle    [49.96%]
     1,558,253,983      stalled-cycles-backend    #   13.16% backend  cycles idle    [50.14%]

```

Existing performance

```
readreverse  :       0.174 micros/op 5732342 ops/sec;  634.1 MB/s
    20,570,209,389      stalled-cycles-frontend   #   70.71% frontend cycles idle    [50.01%]
    18,422,816,837      stalled-cycles-backend    #   63.33% backend  cycles idle    [50.04%]

readseq      :       0.119 micros/op 8400537 ops/sec;  929.3 MB/s
    15,634,225,844      stalled-cycles-frontend   #   79.07% frontend cycles idle    [49.96%]
    14,227,427,453      stalled-cycles-backend    #   71.95% backend  cycles idle    [50.09%]
```

Test Plan: unit tests

Reviewers: yhchiang, sdong, igor

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D55107
2016-03-14 11:11:09 -07:00
Andrew Kryczka
187cb23caa Cleanup stale manifests outside of full purge
Summary:
- Keep track of obsolete manifests in VersionSet
- Updated FindObsoleteFiles() to put obsolete manifests in the JobContext for later use by PurgeObsoleteFiles()
- Added test case that verifies a stale manifest is deleted by a non-full purge

Test Plan:
  $ ./backupable_db_test --gtest_filter=BackupableDBTest.ChangeManifestDuringBackupCreation

Reviewers: IslamAbdelRahman, yoshinorim, sdong

Reviewed By: sdong

Subscribers: andrewkr, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D55269
2016-03-14 11:11:09 -07:00
Yi Wu
848cf937d9 Cache to have an option to fail Cache::Insert() when full
Summary:
Cache to have an option to fail Cache::Insert() when full. Update call sites to check status and handle error.

I totally have no idea what's correct behavior of all the call sites when they encounter error. Please let me know if you see something wrong or more unit test is needed.

Test Plan: make check -j32, see tests pass.

Reviewers: anthony, yhchiang, andrewkr, IslamAbdelRahman, kradhakrishnan, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D54705
2016-03-14 11:11:09 -07:00
Yueh-Hsuan Chiang
c5cfed2e4a Update compaction score right after CompactFiles forms a compaction
Summary:
This is a follow-up patch of https://reviews.facebook.net/D54891.
As the information about files being compacted will also be used
when making compaction decision, it is necessary to update the compaction
score when a compaction plan has been made but not yet execute.

This patch adds a missing call to update the compaction score in
CompactFiles().

Test Plan: compact_files_test

Reviewers: sdong, IslamAbdelRahman, kradhakrishnan, yiwu, andrewkr

Reviewed By: andrewkr

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D55227
2016-03-14 11:11:09 -07:00
29 changed files with 761 additions and 180 deletions

View File

@ -338,6 +338,7 @@ set(TESTS
db/db_tailing_iter_test.cc
db/db_test.cc
db/db_test2.cc
db/db_block_cache_test.cc
db/db_universal_compaction_test.cc
db/db_wal_test.cc
db/dbformat_test.cc

View File

@ -1,7 +1,10 @@
# Rocksdb Change Log
## Unreleased
## 4.6.0 (3/10/2016)
### Public API Changes
* Change default of BlockBasedTableOptions.format_version to 2. It means default DB created by 4.6 or up cannot be opened by RocksDB version 3.9 or earlier.
* Added strict_capacity_limit option to NewLRUCache. If the flag is set to true, insert to cache will fail if no enough capacity can be free. Signiture of Cache::Insert() is updated accordingly.
* Tickers [NUMBER_DB_NEXT, NUMBER_DB_PREV, NUMBER_DB_NEXT_FOUND, NUMBER_DB_PREV_FOUND, ITER_BYTES_READ] are not updated immediately. The are updated when the Iterator is deleted.
* Add monotonically increasing counter (DB property "rocksdb.current-super-version-number") that increments upon any change to the LSM tree.
### New Features
* Add CompactionPri::kMinOverlappingRatio, a compaction picking mode friendly to write amplification.
* Deprecate Iterator::IsKeyPinned() and replace it with Iterator::GetProperty() with prop_name="rocksdb.iterator.is.key.pinned"

View File

@ -191,10 +191,6 @@ default: all
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
-Wno-unused-parameter
ifndef DISABLE_WARNING_AS_ERROR
WARNING_FLAGS += -Werror
endif
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers
@ -246,6 +242,7 @@ BENCHTOOLOBJECTS = $(BENCH_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL)
TESTS = \
db_test \
db_test2 \
db_block_cache_test \
db_iter_test \
db_log_iter_test \
db_compaction_filter_test \
@ -794,6 +791,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_block_cache_test: db/db_block_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_log_iter_test: db/db_log_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

View File

@ -52,12 +52,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
FBCODE_BUILD="true"
# If we're compiling with TSAN we need pic build
PIC_BUILD=$COMPILE_WITH_TSAN
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
source "$PWD/build_tools/fbcode_config.sh"
else
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
fi
source "$PWD/build_tools/fbcode_config.sh"
fi
# Delete existing output, if it exists

View File

@ -1,16 +1,19 @@
GCC_BASE=/mnt/vol/engshare/fbcode/third-party2/gcc/4.9.x/centos6-native/1317bc4/
CLANG_BASE=/mnt/gvfs/third-party2/clang/fc904e50a9266b9d7b98cae1993afa0c5aae1440/3.7.1/centos6-native/9d9ecb9/
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/ea2fd1278810d3af2ea52218d2767e09d786dbd0/4.9.x/gcc-4.9-glibc-2.20/024dbc3
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/f5484f168c0e4d19823d41df052c5870c6e575a4/2.20/gcc-4.9-glibc-2.20/500e281
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/cbf6f1f209e5bd160bdc5d971744e039f36b1566/1.1.3/gcc-4.9-glibc-2.20/e9936bf
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/6d39cb54708049f527e713ad19f2aadb9d3667e8/1.2.8/gcc-4.9-glibc-2.20/e9936bf
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/2ddd45f0853bfc8bb1c27f0f447236a1a26c338a/1.0.6/gcc-4.9-glibc-2.20/e9936bf
LZ4_BASE=/mnt/gvfs/third-party2/lz4/6858fac689e0f92e584224d91bdb0e39f6c8320d/r131/gcc-4.9-glibc-2.20/e9936bf
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/cb6c4880fcb4fee471574ba6af63a3882155a16a/0.5.1/gcc-4.9-glibc-2.20/e9936bf
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/c7275a4ceae0aca0929e56964a31dafc53c1ee96/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/40791a3fef9206a77f2c4bc51f8169e5bf10d68e/master/gcc-4.9-glibc-2.20/a6c5e1e
NUMA_BASE=/mnt/gvfs/third-party2/numa/ae54a5ed22cdabb1c6446dce4e8ffae5b4446d73/2.0.8/gcc-4.9-glibc-2.20/e9936bf
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/303048f72efc92ae079e62dfc84823401aecfd94/trunk/gcc-4.9-glibc-2.20/12266b1
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/1a48835975c66d30e47770ec419758ed3b9ba010/3.10.62-62_fbk17_03959_ge29cc63/gcc-4.9-glibc-2.20/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/a5b8152b2a15ce8a98808cf954fbccec825a97bc/2.25/centos6-native/da39a3e
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/af85c56f424cd5edfc2c97588299b44ecdec96bb/3.10.0/gcc-4.9-glibc-2.20/e9936bf
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832

View File

@ -1,4 +1,4 @@
GCC_BASE=/mnt/vol/engshare/fbcode/third-party2/gcc/4.8.1/centos6-native/cc6c9dc/
GCC_BASE=/mnt/gvfs/third-party2/gcc/ebc96bc2fb751b5a0300b8d91a95bdf24ac1d88b/4.8.1/centos6-native/cc6c9dc
CLANG_BASE=/mnt/gvfs/third-party2/clang/fc904e50a9266b9d7b98cae1993afa0c5aae1440/3.7.1/centos6-native/9d9ecb9/
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/ea2fd1278810d3af2ea52218d2767e09d786dbd0/4.8.1/gcc-4.8.1-glibc-2.17/8aac7fc
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/f5484f168c0e4d19823d41df052c5870c6e575a4/2.17/gcc-4.8.1-glibc-2.17/99df8fc

View File

@ -13,12 +13,12 @@ source "$BASEDIR/dependencies.sh"
CFLAGS=""
# libgcc
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
LIBGCC_LIBS=" -L $LIBGCC_BASE/libs"
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
# glibc
GLIBC_INCLUDE="$GLIBC_BASE/include"
GLIBC_LIBS=" -L $GLIB_BASE/lib"
GLIBC_LIBS=" -L $GLIBC_BASE/lib"
# snappy
SNAPPY_INCLUDE=" -I $SNAPPY_BASE/include/"
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
CFLAGS+=" -DLZ4"
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
CFLAGS+=" -DZSTD"
fi
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
if test -z $PIC_BUILD; then
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
# location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
if test -z $PIC_BUILD; then
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
else
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
fi
CFLAGS+=" -DGFLAGS=google"
CFLAGS+=" -DGFLAGS=gflags"
# location of jemalloc
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
@ -72,13 +76,22 @@ if test -z $PIC_BUILD; then
LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a"
fi
# location of TBB
TBB_INCLUDE=" -isystem $TBB_BASE/include/"
if test -z $PIC_BUILD; then
TBB_LIBS="$TBB_BASE/lib/libtbb.a"
else
TBB_LIBS="$TBB_BASE/lib/libtbb_pic.a"
fi
CFLAGS+=" -DTBB"
# use Intel SSE support for checksum calculations
export USE_SSE=1
BINUTILS="$BINUTILS_BASE/bin"
AR="$BINUTILS/ar"
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE"
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
STDLIBS="-L $GCC_BASE/lib64"
@ -87,7 +100,7 @@ CLANG_LIB="$CLANG_BASE/lib"
CLANG_SRC="$CLANG_BASE/../../src"
CLANG_ANALYZER="$CLANG_BIN/clang++"
CLANG_SCAN_BUILD="$CLANG_SRC/clang/tools/scan-build/scan-build"
CLANG_SCAN_BUILD="$CLANG_SRC/llvm/tools/clang/tools/scan-build/bin/scan-build"
if [ -z "$USE_CLANG" ]; then
# gcc
@ -95,39 +108,44 @@ if [ -z "$USE_CLANG" ]; then
CXX="$GCC_BASE/bin/g++"
CFLAGS+=" -B$BINUTILS/gold"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
JEMALLOC=1
else
# clang
CLANG_INCLUDE="$CLANG_LIB/clang/*/include"
CLANG_INCLUDE="$CLANG_LIB/clang/stable/include"
CC="$CLANG_BIN/clang"
CXX="$CLANG_BIN/clang++"
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $CLANG_INCLUDE"
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE/linux "
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE "
CFLAGS+=" -Wno-expansion-to-defined "
CXXFLAGS="-nostdinc++"
fi
CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
# required by libtbb
EXEC_LDFLAGS+=" -ldl"
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS"
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
VALGRIND_VER="$VALGRIND_BASE/bin/"

View File

@ -64,7 +64,7 @@ touch "$OUTPUT"
echo "Writing dependencies to $OUTPUT"
# Compilers locations
GCC_BASE=`ls -d1 $TP2_LATEST/gcc/4.9.x/centos6-native/*/ | head -n1`
GCC_BASE=`readlink -f $TP2_LATEST/gcc/4.9.x/centos6-native/*/`
CLANG_BASE=`ls -d1 /mnt/gvfs/third-party2/clang/fc904e50a9266b9d7b98cae1993afa0c5aae1440/3.7.1/centos6-native/*/ | head -n1`
log_variable GCC_BASE
@ -101,7 +101,7 @@ touch "$OUTPUT"
echo "Writing 4.8.1 dependencies to $OUTPUT"
# Compilers locations
GCC_BASE=`ls -d1 $TP2_LATEST/gcc/4.8.1/centos6-native/*/ | head -n1`
GCC_BASE=`readlink -f $TP2_LATEST/gcc/4.8.1/centos6-native/*/`
CLANG_BASE=`ls -d1 /mnt/gvfs/third-party2/clang/fc904e50a9266b9d7b98cae1993afa0c5aae1440/3.7.1/centos6-native/*/ | head -n1`
log_variable GCC_BASE

237
db/db_block_cache_test.cc Normal file
View File

@ -0,0 +1,237 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <cstdlib>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
namespace rocksdb {
static uint64_t TestGetTickerCount(const Options& options,
Tickers ticker_type) {
return options.statistics->getTickerCount(ticker_type);
}
class DBBlockCacheTest : public DBTestBase {
private:
size_t miss_count_ = 0;
size_t hit_count_ = 0;
size_t insert_count_ = 0;
size_t failure_count_ = 0;
size_t compressed_miss_count_ = 0;
size_t compressed_hit_count_ = 0;
size_t compressed_insert_count_ = 0;
size_t compressed_failure_count_ = 0;
public:
const size_t kNumBlocks = 10;
const size_t kValueSize = 100;
DBBlockCacheTest() : DBTestBase("/db_block_cache_test") {}
BlockBasedTableOptions GetTableOptions() {
BlockBasedTableOptions table_options;
// Set a small enough block size so that each key-value get its own block.
table_options.block_size = 1;
return table_options;
}
Options GetOptions(const BlockBasedTableOptions& table_options) {
Options options = CurrentOptions();
options.create_if_missing = true;
// options.compression = kNoCompression;
options.statistics = rocksdb::CreateDBStatistics();
options.table_factory.reset(new BlockBasedTableFactory(table_options));
return options;
}
void InitTable(const Options& options) {
std::string value(kValueSize, 'a');
for (size_t i = 0; i < kNumBlocks; i++) {
ASSERT_OK(Put(ToString(i), value.c_str()));
}
}
void RecordCacheCounters(const Options& options) {
miss_count_ = TestGetTickerCount(options, BLOCK_CACHE_MISS);
hit_count_ = TestGetTickerCount(options, BLOCK_CACHE_HIT);
insert_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD);
failure_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES);
compressed_miss_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
compressed_hit_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
compressed_insert_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD);
compressed_failure_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
}
void CheckCacheCounters(const Options& options, size_t expected_misses,
size_t expected_hits, size_t expected_inserts,
size_t expected_failures) {
size_t new_miss_count = TestGetTickerCount(options, BLOCK_CACHE_MISS);
size_t new_hit_count = TestGetTickerCount(options, BLOCK_CACHE_HIT);
size_t new_insert_count = TestGetTickerCount(options, BLOCK_CACHE_ADD);
size_t new_failure_count =
TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES);
ASSERT_EQ(miss_count_ + expected_misses, new_miss_count);
ASSERT_EQ(hit_count_ + expected_hits, new_hit_count);
ASSERT_EQ(insert_count_ + expected_inserts, new_insert_count);
ASSERT_EQ(failure_count_ + expected_failures, new_failure_count);
miss_count_ = new_miss_count;
hit_count_ = new_hit_count;
insert_count_ = new_insert_count;
failure_count_ = new_failure_count;
}
void CheckCompressedCacheCounters(const Options& options,
size_t expected_misses,
size_t expected_hits,
size_t expected_inserts,
size_t expected_failures) {
size_t new_miss_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
size_t new_hit_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
size_t new_insert_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD);
size_t new_failure_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
ASSERT_EQ(compressed_miss_count_ + expected_misses, new_miss_count);
ASSERT_EQ(compressed_hit_count_ + expected_hits, new_hit_count);
ASSERT_EQ(compressed_insert_count_ + expected_inserts, new_insert_count);
ASSERT_EQ(compressed_failure_count_ + expected_failures, new_failure_count);
compressed_miss_count_ = new_miss_count;
compressed_hit_count_ = new_hit_count;
compressed_insert_count_ = new_insert_count;
compressed_failure_count_ = new_failure_count;
}
};
TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) {
ReadOptions read_options;
auto table_options = GetTableOptions();
auto options = GetOptions(table_options);
InitTable(options);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
table_options.block_cache = cache;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
Reopen(options);
RecordCacheCounters(options);
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks - 1);
Iterator* iter = nullptr;
// Load blocks into cache.
for (size_t i = 0; i < kNumBlocks - 1; i++) {
iter = db_->NewIterator(read_options);
iter->Seek(ToString(i));
ASSERT_OK(iter->status());
CheckCacheCounters(options, 1, 0, 1, 0);
iterators[i].reset(iter);
}
size_t usage = cache->GetUsage();
ASSERT_LT(0, usage);
cache->SetCapacity(usage);
ASSERT_EQ(usage, cache->GetPinnedUsage());
// Test with strict capacity limit.
cache->SetStrictCapacityLimit(true);
iter = db_->NewIterator(read_options);
iter->Seek(ToString(kNumBlocks - 1));
ASSERT_TRUE(iter->status().IsIncomplete());
CheckCacheCounters(options, 1, 0, 0, 1);
delete iter;
iter = nullptr;
// Release interators and access cache again.
for (size_t i = 0; i < kNumBlocks - 1; i++) {
iterators[i].reset();
CheckCacheCounters(options, 0, 0, 0, 0);
}
ASSERT_EQ(0, cache->GetPinnedUsage());
for (size_t i = 0; i < kNumBlocks - 1; i++) {
iter = db_->NewIterator(read_options);
iter->Seek(ToString(i));
ASSERT_OK(iter->status());
CheckCacheCounters(options, 0, 1, 0, 0);
iterators[i].reset(iter);
}
}
TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
ReadOptions read_options;
auto table_options = GetTableOptions();
auto options = GetOptions(table_options);
InitTable(options);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
std::shared_ptr<Cache> compressed_cache = NewLRUCache(0, 0, false);
table_options.block_cache = cache;
table_options.block_cache_compressed = compressed_cache;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
Reopen(options);
RecordCacheCounters(options);
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks - 1);
Iterator* iter = nullptr;
// Load blocks into cache.
for (size_t i = 0; i < kNumBlocks - 1; i++) {
iter = db_->NewIterator(read_options);
iter->Seek(ToString(i));
ASSERT_OK(iter->status());
CheckCacheCounters(options, 1, 0, 1, 0);
CheckCompressedCacheCounters(options, 1, 0, 1, 0);
iterators[i].reset(iter);
}
size_t usage = cache->GetUsage();
ASSERT_LT(0, usage);
ASSERT_EQ(usage, cache->GetPinnedUsage());
size_t compressed_usage = compressed_cache->GetUsage();
ASSERT_LT(0, compressed_usage);
// Compressed block cache cannot be pinned.
ASSERT_EQ(0, compressed_cache->GetPinnedUsage());
// Set strict capacity limit flag. Now block will only load into compressed
// block cache.
cache->SetCapacity(usage);
cache->SetStrictCapacityLimit(true);
ASSERT_EQ(usage, cache->GetPinnedUsage());
// compressed_cache->SetCapacity(compressed_usage);
compressed_cache->SetCapacity(0);
// compressed_cache->SetStrictCapacityLimit(true);
iter = db_->NewIterator(read_options);
iter->Seek(ToString(kNumBlocks - 1));
ASSERT_TRUE(iter->status().IsIncomplete());
CheckCacheCounters(options, 1, 0, 0, 1);
CheckCompressedCacheCounters(options, 1, 0, 1, 0);
delete iter;
iter = nullptr;
// Clear strict capacity limit flag. This time we shall hit compressed block
// cache.
cache->SetStrictCapacityLimit(false);
iter = db_->NewIterator(read_options);
iter->Seek(ToString(kNumBlocks - 1));
ASSERT_OK(iter->status());
CheckCacheCounters(options, 1, 0, 1, 0);
CheckCompressedCacheCounters(options, 0, 1, 0, 0);
delete iter;
iter = nullptr;
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -572,6 +572,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// Get obsolete files. This function will also update the list of
// pending files in VersionSet().
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
&job_context->manifest_delete_files,
job_context->min_pending_output);
// store the current filenum, lognum, etc
@ -689,9 +690,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
}
auto candidate_files = state.full_scan_candidate_files;
candidate_files.reserve(candidate_files.size() +
state.sst_delete_files.size() +
state.log_delete_files.size());
candidate_files.reserve(
candidate_files.size() + state.sst_delete_files.size() +
state.log_delete_files.size() + state.manifest_delete_files.size());
// We may ignore the dbname when generating the file names.
const char* kDumbDbName = "";
for (auto file : state.sst_delete_files) {
@ -707,6 +708,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
0);
}
}
for (const auto& filename : state.manifest_delete_files) {
candidate_files.emplace_back(filename, 0);
}
// dedup state.candidate_files so we don't try to delete the same
// file twice
@ -1844,6 +1848,17 @@ Status DBImpl::CompactFilesImpl(
// support for CompactFiles, we should have CompactFiles API
// pass a pointer of CompactionJobStats as the out-value
// instead of using EventListener.
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
// being compacted). Since we just changed compaction score, we recalculate it
// here.
{
CompactionOptionsFIFO dummy_compaction_options_fifo;
version->storage_info()->ComputeCompactionScore(
*c->mutable_cf_options(), dummy_compaction_options_fifo);
}
compaction_job.Prepare();
mutex_.Unlock();

View File

@ -60,6 +60,44 @@ class DBIter: public Iterator {
kReverse
};
// LocalStatistics contain Statistics counters that will be aggregated per
// each iterator instance and then will be sent to the global statistics when
// the iterator is destroyed.
//
// The purpose of this approach is to avoid perf regression happening
// when multiple threads bump the atomic counters from a DBIter::Next().
struct LocalStatistics {
explicit LocalStatistics() { ResetCounters(); }
void ResetCounters() {
next_count_ = 0;
next_found_count_ = 0;
prev_count_ = 0;
prev_found_count_ = 0;
bytes_read_ = 0;
}
void BumpGlobalStatistics(Statistics* global_statistics) {
RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
ResetCounters();
}
// Map to Tickers::NUMBER_DB_NEXT
uint64_t next_count_;
// Map to Tickers::NUMBER_DB_NEXT_FOUND
uint64_t next_found_count_;
// Map to Tickers::NUMBER_DB_PREV
uint64_t prev_count_;
// Map to Tickers::NUMBER_DB_PREV_FOUND
uint64_t prev_found_count_;
// Map to Tickers::ITER_BYTES_READ
uint64_t bytes_read_;
};
DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
@ -86,6 +124,7 @@ class DBIter: public Iterator {
}
virtual ~DBIter() {
RecordTick(statistics_, NO_ITERATORS, -1);
local_stats_.BumpGlobalStatistics(statistics_);
if (!arena_mode_) {
delete iter_;
} else {
@ -213,6 +252,7 @@ class DBIter: public Iterator {
bool iter_pinned_;
// List of operands for merge operator.
std::deque<std::string> merge_operands_;
LocalStatistics local_stats_;
// No copying allowed
DBIter(const DBIter&);
@ -250,6 +290,9 @@ void DBIter::Next() {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}
if (statistics_ != nullptr) {
local_stats_.next_count_++;
}
// Now we point to the next internal position, for both of merge and
// not merge cases.
if (!iter_->Valid()) {
@ -257,18 +300,15 @@ void DBIter::Next() {
return;
}
FindNextUserEntry(true /* skipping the current user key */);
if (statistics_ != nullptr) {
RecordTick(statistics_, NUMBER_DB_NEXT);
if (valid_) {
RecordTick(statistics_, NUMBER_DB_NEXT_FOUND);
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
}
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
prefix_extractor_->Transform(saved_key_.GetKey())
.compare(prefix_start_.GetKey()) != 0) {
valid_ = false;
}
if (statistics_ != nullptr && valid_) {
local_stats_.next_found_count_++;
local_stats_.bytes_read_ += (key().size() + value().size());
}
}
// PRE: saved_key_ has the current user key if skipping
@ -436,10 +476,10 @@ void DBIter::Prev() {
}
PrevInternal();
if (statistics_ != nullptr) {
RecordTick(statistics_, NUMBER_DB_PREV);
local_stats_.prev_count_++;
if (valid_) {
RecordTick(statistics_, NUMBER_DB_PREV_FOUND);
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
local_stats_.prev_found_count_++;
local_stats_.bytes_read_ += (key().size() + value().size());
}
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&

View File

@ -10611,6 +10611,88 @@ TEST_F(DBTest, PrefixExtractorBlockFilter) {
delete iter;
}
TEST_F(DBTest, IteratorWithLocalStatistics) {
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 1000; i++) {
// Key 10 bytes / Value 10 bytes
ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
}
std::atomic<uint64_t> total_next(0);
std::atomic<uint64_t> total_next_found(0);
std::atomic<uint64_t> total_prev(0);
std::atomic<uint64_t> total_prev_found(0);
std::atomic<uint64_t> total_bytes(0);
std::vector<std::thread> threads;
std::function<void()> reader_func_next = [&]() {
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
// Seek will bump ITER_BYTES_READ
total_bytes += iter->key().size();
total_bytes += iter->value().size();
while (true) {
iter->Next();
total_next++;
if (!iter->Valid()) {
break;
}
total_next_found++;
total_bytes += iter->key().size();
total_bytes += iter->value().size();
}
delete iter;
};
std::function<void()> reader_func_prev = [&]() {
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToLast();
// Seek will bump ITER_BYTES_READ
total_bytes += iter->key().size();
total_bytes += iter->value().size();
while (true) {
iter->Prev();
total_prev++;
if (!iter->Valid()) {
break;
}
total_prev_found++;
total_bytes += iter->key().size();
total_bytes += iter->value().size();
}
delete iter;
};
for (int i = 0; i < 10; i++) {
threads.emplace_back(reader_func_next);
}
for (int i = 0; i < 15; i++) {
threads.emplace_back(reader_func_prev);
}
for (auto& t : threads) {
t.join();
}
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT), total_next);
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT_FOUND),
total_next_found);
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV), total_prev);
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV_FOUND),
total_prev_found);
ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes);
}
#ifndef ROCKSDB_LITE
class BloomStatsTestWithParam
: public DBTest,

View File

@ -22,9 +22,9 @@ class MemTable;
struct JobContext {
inline bool HaveSomethingToDelete() const {
return full_scan_candidate_files.size() || sst_delete_files.size() ||
log_delete_files.size() || new_superversion != nullptr ||
superversions_to_free.size() > 0 || memtables_to_free.size() > 0 ||
logs_to_free.size() > 0;
log_delete_files.size() || manifest_delete_files.size() ||
new_superversion != nullptr || superversions_to_free.size() > 0 ||
memtables_to_free.size() > 0 || logs_to_free.size() > 0;
}
// Structure to store information for candidate files to delete.
@ -56,6 +56,9 @@ struct JobContext {
// a list of log files that we need to delete
std::vector<uint64_t> log_delete_files;
// a list of manifest files that we need to delete
std::vector<std::string> manifest_delete_files;
// a list of memtables to be free
autovector<MemTable*> memtables_to_free;

View File

@ -143,8 +143,12 @@ Status TableCache::FindTable(const EnvOptions& env_options,
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
*handle = cache_->Insert(key, table_reader.release(), 1,
&DeleteEntry<TableReader>);
s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
handle);
if (s.ok()) {
// Release ownership of table reader.
table_reader.release();
}
}
}
return s;
@ -285,9 +289,8 @@ Status TableCache::Get(const ReadOptions& options,
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
auto row_handle = ioptions_.row_cache->Insert(
row_cache_key.GetKey(), row_ptr, charge, &DeleteEntry<std::string>);
ioptions_.row_cache->Release(row_handle);
ioptions_.row_cache->Insert(row_cache_key.GetKey(), row_ptr, charge,
&DeleteEntry<std::string>);
}
#endif // ROCKSDB_LITE

View File

@ -2254,6 +2254,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
db_options_->disableDataSync ? nullptr : db_directory);
// Leave the old file behind since PurgeObsoleteFiles will take care of it
// later. It's unsafe to delete now since file deletion may be disabled.
obsolete_manifests_.emplace_back(
DescriptorFileName("", manifest_file_number_));
}
if (s.ok()) {
@ -3388,7 +3390,10 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
}
void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output) {
assert(manifest_filenames->empty());
obsolete_manifests_.swap(*manifest_filenames);
std::vector<FileMetaData*> pending_files;
for (auto f : obsolete_files_) {
if (f->fd.GetNumber() < min_pending_output) {

View File

@ -24,6 +24,7 @@
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
@ -697,6 +698,7 @@ class VersionSet {
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
void GetObsoleteFiles(std::vector<FileMetaData*>* files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output);
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
@ -758,6 +760,7 @@ class VersionSet {
uint64_t manifest_file_size_;
std::vector<FileMetaData*> obsolete_files_;
std::vector<std::string> obsolete_manifests_;
// env options for all reads and writes except compactions
const EnvOptions& env_options_;

View File

@ -25,6 +25,7 @@
#include <memory>
#include <stdint.h>
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
namespace rocksdb {
@ -33,12 +34,15 @@ using std::shared_ptr;
class Cache;
// Create a new cache with a fixed size capacity. The cache is sharded
// to 2^numShardBits shards, by hash of the key. The total capacity
// to 2^num_shard_bits shards, by hash of the key. The total capacity
// is divided and evenly assigned to each shard.
//
// The functions without parameter numShardBits uses default value, which is 4
// The parameter num_shard_bits defaults to 4, and strict_capacity_limit
// defaults to false.
extern shared_ptr<Cache> NewLRUCache(size_t capacity);
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int numShardBits);
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits);
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit);
class Cache {
public:
@ -55,15 +59,22 @@ class Cache {
// Insert a mapping from key->value into the cache and assign it
// the specified charge against the total cache capacity.
// If strict_capacity_limit is true and cache reaches its full capacity,
// return Status::Incomplete.
//
// Returns a handle that corresponds to the mapping. The caller
// must call this->Release(handle) when the returned mapping is no
// longer needed.
// If handle is not nullptr, returns a handle that corresponds to the
// mapping. The caller must call this->Release(handle) when the returned
// mapping is no longer needed. In case of error caller is responsible to
// cleanup the value (i.e. calling "deleter").
//
// If handle is nullptr, it is as if Release is called immediately after
// insert. In case of error value will be cleanup.
//
// When the inserted entry is no longer needed, the key and
// value will be passed to "deleter".
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) = 0;
virtual Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr) = 0;
// If the cache has no mapping for "key", returns nullptr.
//
@ -100,6 +111,14 @@ class Cache {
// purge the released entries from the cache in order to lower the usage
virtual void SetCapacity(size_t capacity) = 0;
// Set whether to return error on insertion when cache reaches its full
// capacity.
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0;
// Set whether to return error on insertion when cache reaches its full
// capacity.
virtual bool HasStrictCapacityLimit() const = 0;
// returns the maximum configured capacity of the cache
virtual size_t GetCapacity() const = 0;

View File

@ -33,6 +33,8 @@ enum Tickers : uint32_t {
BLOCK_CACHE_HIT,
// # of blocks added to block cache.
BLOCK_CACHE_ADD,
// # of failures when adding blocks to block cache.
BLOCK_CACHE_ADD_FAILURES,
// # of times cache miss when accessing index block from block cache.
BLOCK_CACHE_INDEX_MISS,
// # of times cache hit when accessing index block from block cache.
@ -140,8 +142,12 @@ enum Tickers : uint32_t {
GET_UPDATES_SINCE_CALLS,
BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache
BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache
WAL_FILE_SYNCED, // Number of times WAL sync is done
WAL_FILE_BYTES, // Number of bytes written to WAL
// Number of blocks added to comopressed block cache
BLOCK_CACHE_COMPRESSED_ADD,
// Number of failures when adding blocks to compressed block cache
BLOCK_CACHE_COMPRESSED_ADD_FAILURES,
WAL_FILE_SYNCED, // Number of times WAL sync is done
WAL_FILE_BYTES, // Number of bytes written to WAL
// Writes can be processed by requesting thread or by the thread at the
// head of the writers queue.
@ -176,6 +182,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{BLOCK_CACHE_MISS, "rocksdb.block.cache.miss"},
{BLOCK_CACHE_HIT, "rocksdb.block.cache.hit"},
{BLOCK_CACHE_ADD, "rocksdb.block.cache.add"},
{BLOCK_CACHE_ADD_FAILURES, "rocksdb.block.cache.add.failures"},
{BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss"},
{BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit"},
{BLOCK_CACHE_FILTER_MISS, "rocksdb.block.cache.filter.miss"},
@ -227,6 +234,9 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls"},
{BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss"},
{BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit"},
{BLOCK_CACHE_COMPRESSED_ADD, "rocksdb.block.cachecompressed.add"},
{BLOCK_CACHE_COMPRESSED_ADD_FAILURES,
"rocksdb.block.cachecompressed.add.failures"},
{WAL_FILE_SYNCED, "rocksdb.wal.synced"},
{WAL_FILE_BYTES, "rocksdb.wal.bytes"},
{WRITE_DONE_BY_SELF, "rocksdb.write.self"},

View File

@ -6,7 +6,7 @@
#define ROCKSDB_MAJOR 4
#define ROCKSDB_MINOR 6
#define ROCKSDB_PATCH 0
#define ROCKSDB_PATCH 1
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -703,7 +703,6 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
if (type != kNoCompression && block_cache_compressed != nullptr) {
Cache::Handle* cache_handle = nullptr;
size_t size = block_contents.size();
std::unique_ptr<char[]> ubuf(new char[size + 1]);
@ -723,9 +722,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
cache_handle = block_cache_compressed->Insert(
key, block, block->usable_size(), &DeleteCachedBlock);
block_cache_compressed->Release(cache_handle);
block_cache_compressed->Insert(key, block, block->usable_size(),
&DeleteCachedBlock);
// Invalidate OS cache.
r->file->InvalidateCache(static_cast<size_t>(r->offset), size);

View File

@ -740,11 +740,16 @@ Status BlockBasedTable::GetDataBlockFromCache(
assert(block->value->compression_type() == kNoCompression);
if (block_cache != nullptr && block->value->cachable() &&
read_options.fill_cache) {
block->cache_handle = block_cache->Insert(block_cache_key, block->value,
block->value->usable_size(),
&DeleteCachedEntry<Block>);
assert(reinterpret_cast<Block*>(
block_cache->Value(block->cache_handle)) == block->value);
s = block_cache->Insert(
block_cache_key, block->value, block->value->usable_size(),
&DeleteCachedEntry<Block>, &(block->cache_handle));
if (s.ok()) {
RecordTick(statistics, BLOCK_CACHE_ADD);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete block->value;
block->value = nullptr;
}
}
}
@ -784,27 +789,37 @@ Status BlockBasedTable::PutDataBlockToCache(
// Release the hold on the compressed cache entry immediately.
if (block_cache_compressed != nullptr && raw_block != nullptr &&
raw_block->cachable()) {
auto cache_handle = block_cache_compressed->Insert(
compressed_block_cache_key, raw_block, raw_block->usable_size(),
&DeleteCachedEntry<Block>);
block_cache_compressed->Release(cache_handle);
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS);
// Avoid the following code to delete this cached block.
raw_block = nullptr;
s = block_cache_compressed->Insert(compressed_block_cache_key, raw_block,
raw_block->usable_size(),
&DeleteCachedEntry<Block>);
if (s.ok()) {
// Avoid the following code to delete this cached block.
raw_block = nullptr;
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD);
} else {
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
}
}
delete raw_block;
// insert into uncompressed block cache
assert((block->value->compression_type() == kNoCompression));
if (block_cache != nullptr && block->value->cachable()) {
block->cache_handle = block_cache->Insert(block_cache_key, block->value,
block->value->usable_size(),
&DeleteCachedEntry<Block>);
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
block->value->usable_size());
assert(reinterpret_cast<Block*>(block_cache->Value(block->cache_handle)) ==
block->value);
s = block_cache->Insert(block_cache_key, block->value,
block->value->usable_size(),
&DeleteCachedEntry<Block>, &(block->cache_handle));
if (s.ok()) {
assert(block->cache_handle != nullptr);
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
block->value->usable_size());
assert(reinterpret_cast<Block*>(
block_cache->Value(block->cache_handle)) == block->value);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete block->value;
block->value = nullptr;
}
}
return s;
@ -891,10 +906,17 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
filter = ReadFilter(rep_, &filter_size);
if (filter != nullptr) {
assert(filter_size > 0);
cache_handle = block_cache->Insert(key, filter, filter_size,
&DeleteCachedEntry<FilterBlockReader>);
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter_size);
Status s = block_cache->Insert(key, filter, filter_size,
&DeleteCachedEntry<FilterBlockReader>,
&cache_handle);
if (s.ok()) {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter_size);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete filter;
return CachableEntry<FilterBlockReader>();
}
}
}
@ -937,10 +959,18 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
// Create index reader and put it in the cache.
Status s;
s = CreateIndexReader(&index_reader);
if (s.ok()) {
s = block_cache->Insert(key, index_reader, index_reader->usable_size(),
&DeleteCachedEntry<IndexReader>, &cache_handle);
}
if (!s.ok()) {
if (s.ok()) {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
index_reader->usable_size());
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
// make sure if something goes wrong, index_reader shall remain intact.
assert(index_reader == nullptr);
if (input_iter != nullptr) {
input_iter->SetStatus(s);
return input_iter;
@ -949,12 +979,6 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
}
}
cache_handle =
block_cache->Insert(key, index_reader, index_reader->usable_size(),
&DeleteCachedEntry<IndexReader>);
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
index_reader->usable_size());
}
assert(cache_handle);
@ -1036,7 +1060,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
}
// Didn't get any data from block caches.
if (block.value == nullptr) {
if (s.ok() && block.value == nullptr) {
if (no_io) {
// Could not read from block_cache and can't do IO
if (input_iter != nullptr) {
@ -1055,7 +1079,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
}
InternalIterator* iter;
if (block.value != nullptr) {
if (s.ok() && block.value != nullptr) {
iter = block.value->NewIterator(&rep->internal_comparator, input_iter);
if (block.cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,

View File

@ -196,10 +196,13 @@ class LRUCache {
// free the needed space
void SetCapacity(size_t capacity);
// Set the flag to reject insertion if cache if full.
void SetStrictCapacityLimit(bool strict_capacity_limit);
// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* Insert(const Slice& key, uint32_t hash,
void* value, size_t charge,
void (*deleter)(const Slice& key, void* value));
Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle);
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
@ -245,6 +248,9 @@ class LRUCache {
// Memory size for entries residing only in the LRU list
size_t lru_usage_;
// Whether to reject insertion if cache reaches its full capacity.
bool strict_capacity_limit_;
// mutex_ protects the following state.
// We don't count mutex_ as the cache's internal state so semantically we
// don't mind mutex_ invoking the non-const actions.
@ -336,6 +342,11 @@ void LRUCache::SetCapacity(size_t capacity) {
}
}
void LRUCache::SetStrictCapacityLimit(bool strict_capacity_limit) {
MutexLock l(&mutex_);
strict_capacity_limit_ = strict_capacity_limit;
}
Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle* e = table_.Lookup(key, hash);
@ -350,6 +361,9 @@ Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
}
void LRUCache::Release(Cache::Handle* handle) {
if (handle == nullptr) {
return;
}
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
bool last_reference = false;
{
@ -383,15 +397,16 @@ void LRUCache::Release(Cache::Handle* handle) {
}
}
Cache::Handle* LRUCache::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
Status LRUCache::Insert(const Slice& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle) {
// Allocate the memory here outside of the mutex
// If the cache is full, we'll have to release it
// It shouldn't happen very often though.
LRUHandle* e = reinterpret_cast<LRUHandle*>(
new char[sizeof(LRUHandle) - 1 + key.size()]);
Status s;
autovector<LRUHandle*> last_reference_list;
e->value = value;
@ -399,7 +414,9 @@ Cache::Handle* LRUCache::Insert(
e->charge = charge;
e->key_length = key.size();
e->hash = hash;
e->refs = 2; // One from LRUCache, one for the returned handle
e->refs = (handle == nullptr
? 1
: 2); // One from LRUCache, one for the returned handle
e->next = e->prev = nullptr;
e->in_cache = true;
memcpy(e->key_data, key.data(), key.size());
@ -411,20 +428,36 @@ Cache::Handle* LRUCache::Insert(
// is freed or the lru list is empty
EvictFromLRU(charge, &last_reference_list);
// insert into the cache
// note that the cache might get larger than its capacity if not enough
// space was freed
LRUHandle* old = table_.Insert(e);
usage_ += e->charge;
if (old != nullptr) {
old->in_cache = false;
if (Unref(old)) {
usage_ -= old->charge;
// old is on LRU because it's in cache and its reference count
// was just 1 (Unref returned 0)
LRU_Remove(old);
last_reference_list.push_back(old);
if (strict_capacity_limit_ && usage_ - lru_usage_ + charge > capacity_) {
if (handle == nullptr) {
last_reference_list.push_back(e);
} else {
delete[] reinterpret_cast<char*>(e);
*handle = nullptr;
}
s = Status::Incomplete("Insert failed due to LRU cache being full.");
} else {
// insert into the cache
// note that the cache might get larger than its capacity if not enough
// space was freed
LRUHandle* old = table_.Insert(e);
usage_ += e->charge;
if (old != nullptr) {
old->in_cache = false;
if (Unref(old)) {
usage_ -= old->charge;
// old is on LRU because it's in cache and its reference count
// was just 1 (Unref returned 0)
LRU_Remove(old);
last_reference_list.push_back(old);
}
}
if (handle == nullptr) {
LRU_Append(e);
} else {
*handle = reinterpret_cast<Cache::Handle*>(e);
}
s = Status::OK();
}
}
@ -434,7 +467,7 @@ Cache::Handle* LRUCache::Insert(
entry->Free();
}
return reinterpret_cast<Cache::Handle*>(e);
return s;
}
void LRUCache::Erase(const Slice& key, uint32_t hash) {
@ -472,6 +505,7 @@ class ShardedLRUCache : public Cache {
uint64_t last_id_;
int num_shard_bits_;
size_t capacity_;
bool strict_capacity_limit_;
static inline uint32_t HashSlice(const Slice& s) {
return Hash(s.data(), s.size(), 0);
@ -483,13 +517,18 @@ class ShardedLRUCache : public Cache {
}
public:
ShardedLRUCache(size_t capacity, int num_shard_bits)
: last_id_(0), num_shard_bits_(num_shard_bits), capacity_(capacity) {
ShardedLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit)
: last_id_(0),
num_shard_bits_(num_shard_bits),
capacity_(capacity),
strict_capacity_limit_(strict_capacity_limit) {
int num_shards = 1 << num_shard_bits_;
shards_ = new LRUCache[num_shards];
const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
for (int s = 0; s < num_shards; s++) {
shards_[s].SetCapacity(per_shard);
shards_[s].SetStrictCapacityLimit(strict_capacity_limit);
}
}
virtual ~ShardedLRUCache() {
@ -504,11 +543,19 @@ class ShardedLRUCache : public Cache {
}
capacity_ = capacity;
}
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key,
void* value)) override {
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
int num_shards = 1 << num_shard_bits_;
for (int s = 0; s < num_shards; s++) {
shards_[s].SetStrictCapacityLimit(strict_capacity_limit);
}
strict_capacity_limit_ = strict_capacity_limit;
}
virtual Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle) override {
const uint32_t hash = HashSlice(key);
return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter);
return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter,
handle);
}
virtual Handle* Lookup(const Slice& key) override {
const uint32_t hash = HashSlice(key);
@ -531,6 +578,10 @@ class ShardedLRUCache : public Cache {
}
virtual size_t GetCapacity() const override { return capacity_; }
virtual bool HasStrictCapacityLimit() const override {
return strict_capacity_limit_;
}
virtual size_t GetUsage() const override {
// We will not lock the cache when getting the usage from shards.
int num_shards = 1 << num_shard_bits_;
@ -569,14 +620,20 @@ class ShardedLRUCache : public Cache {
} // end anonymous namespace
shared_ptr<Cache> NewLRUCache(size_t capacity) {
return NewLRUCache(capacity, kNumShardBits);
return NewLRUCache(capacity, kNumShardBits, false);
}
shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits) {
return NewLRUCache(capacity, num_shard_bits, false);
}
shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit) {
if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces
}
return std::make_shared<ShardedLRUCache>(capacity, num_shard_bits);
return std::make_shared<ShardedLRUCache>(capacity, num_shard_bits,
strict_capacity_limit);
}
} // namespace rocksdb

View File

@ -142,8 +142,7 @@ class CacheBench {
// Cast uint64* to be char*, data would be copied to cache
Slice key(reinterpret_cast<char*>(&rand_key), 8);
// do insert
auto handle = cache_->Insert(key, new char[10], 1, &deleter);
cache_->Release(handle);
cache_->Insert(key, new char[10], 1, &deleter);
}
}
@ -221,8 +220,7 @@ class CacheBench {
int32_t prob_op = thread->rnd.Uniform(100);
if (prob_op >= 0 && prob_op < FLAGS_insert_percent) {
// do insert
auto handle = cache_->Insert(key, new char[10], 1, &deleter);
cache_->Release(handle);
cache_->Insert(key, new char[10], 1, &deleter);
} else if (prob_op -= FLAGS_insert_percent &&
prob_op < FLAGS_lookup_percent) {
// do lookup

View File

@ -73,8 +73,8 @@ class CacheTest : public testing::Test {
}
void Insert(shared_ptr<Cache> cache, int key, int value, int charge = 1) {
cache->Release(cache->Insert(EncodeKey(key), EncodeValue(value), charge,
&CacheTest::Deleter));
cache->Insert(EncodeKey(key), EncodeValue(value), charge,
&CacheTest::Deleter);
}
void Erase(shared_ptr<Cache> cache, int key) {
@ -118,14 +118,12 @@ TEST_F(CacheTest, UsageTest) {
auto cache = NewLRUCache(kCapacity, 8);
size_t usage = 0;
const char* value = "abcdef";
char value[10] = "abcdef";
// make sure everything will be cached
for (int i = 1; i < 100; ++i) {
std::string key(i, 'a');
auto kv_size = key.size() + 5;
cache->Release(
cache->Insert(key, (void*)value, kv_size, dumbDeleter)
);
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter);
usage += kv_size;
ASSERT_EQ(usage, cache->GetUsage());
}
@ -133,9 +131,8 @@ TEST_F(CacheTest, UsageTest) {
// make sure the cache will be overloaded
for (uint64_t i = 1; i < kCapacity; ++i) {
auto key = ToString(i);
cache->Release(
cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter)
);
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
}
// the usage should be close to the capacity
@ -149,7 +146,7 @@ TEST_F(CacheTest, PinnedUsageTest) {
auto cache = NewLRUCache(kCapacity, 8);
size_t pinned_usage = 0;
const char* value = "abcdef";
char value[10] = "abcdef";
std::forward_list<Cache::Handle*> unreleased_handles;
@ -158,7 +155,9 @@ TEST_F(CacheTest, PinnedUsageTest) {
for (int i = 1; i < 100; ++i) {
std::string key(i, 'a');
auto kv_size = key.size() + 5;
auto handle = cache->Insert(key, (void*)value, kv_size, dumbDeleter);
Cache::Handle* handle;
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter,
&handle);
pinned_usage += kv_size;
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
if (i % 2 == 0) {
@ -182,8 +181,8 @@ TEST_F(CacheTest, PinnedUsageTest) {
// check that overloading the cache does not change the pinned usage
for (uint64_t i = 1; i < 2 * kCapacity; ++i) {
auto key = ToString(i);
cache->Release(
cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter));
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
}
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
@ -408,7 +407,8 @@ TEST_F(CacheTest, SetCapacity) {
// Insert 5 entries, but not releasing.
for (size_t i = 0; i < 5; i++) {
std::string key = ToString(i+1);
handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter);
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
}
ASSERT_EQ(5U, cache->GetCapacity());
ASSERT_EQ(5U, cache->GetUsage());
@ -422,7 +422,8 @@ TEST_F(CacheTest, SetCapacity) {
// and usage should be 7
for (size_t i = 5; i < 10; i++) {
std::string key = ToString(i+1);
handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter);
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
}
ASSERT_EQ(10U, cache->GetCapacity());
ASSERT_EQ(10U, cache->GetUsage());
@ -441,6 +442,53 @@ TEST_F(CacheTest, SetCapacity) {
}
}
TEST_F(CacheTest, SetStrictCapacityLimit) {
// test1: set the flag to false. Insert more keys than capacity. See if they
// all go through.
std::shared_ptr<Cache> cache = NewLRUCache(5, 0, false);
std::vector<Cache::Handle*> handles(10);
Status s;
for (size_t i = 0; i < 10; i++) {
std::string key = ToString(i + 1);
s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
ASSERT_NE(nullptr, handles[i]);
}
// test2: set the flag to true. Insert and check if it fails.
std::string extra_key = "extra";
Value* extra_value = new Value(0);
cache->SetStrictCapacityLimit(true);
Cache::Handle* handle;
s = cache->Insert(extra_key, extra_value, 1, &deleter, &handle);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_EQ(nullptr, handle);
for (size_t i = 0; i < 10; i++) {
cache->Release(handles[i]);
}
// test3: init with flag being true.
std::shared_ptr<Cache> cache2 = NewLRUCache(5, 0, true);
for (size_t i = 0; i < 5; i++) {
std::string key = ToString(i + 1);
s = cache2->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
ASSERT_NE(nullptr, handles[i]);
}
s = cache2->Insert(extra_key, extra_value, 1, &deleter, &handle);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_EQ(nullptr, handle);
// test insert without handle
s = cache2->Insert(extra_key, extra_value, 1, &deleter);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_EQ(5, cache->GetUsage());
for (size_t i = 0; i < 5; i++) {
cache2->Release(handles[i]);
}
}
TEST_F(CacheTest, OverCapacity) {
size_t n = 10;
@ -452,7 +500,8 @@ TEST_F(CacheTest, OverCapacity) {
// Insert n+1 entries, but not releasing.
for (size_t i = 0; i < n + 1; i++) {
std::string key = ToString(i+1);
handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter);
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
}
// Guess what's in the cache now?

View File

@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include <math.h>
#include <cmath>
#include <algorithm>
#include "rocksdb/options.h"

View File

@ -6,6 +6,7 @@
#include <assert.h>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <unordered_set>

View File

@ -10,6 +10,7 @@
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <unordered_map>
#include <vector>

View File

@ -118,9 +118,9 @@ class BackupEngineImpl : public BackupEngine {
void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
// Extends the "result" map with pathname->size mappings for the contents of
// "dir". Pathnames are prefixed with "dir".
// "dir" in "env". Pathnames are prefixed with "dir".
Status InsertPathnameToSizeBytes(
const std::string& dir,
const std::string& dir, Env* env,
std::unordered_map<std::string, uint64_t>* result);
struct FileInfo {
@ -585,12 +585,13 @@ Status BackupEngineImpl::Initialize() {
for (const auto& rel_dir :
{GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
const auto abs_dir = GetAbsolutePath(rel_dir);
InsertPathnameToSizeBytes(abs_dir, &abs_path_to_size);
InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
}
// load the backups if any
for (auto& backup : backups_) {
InsertPathnameToSizeBytes(
GetAbsolutePath(GetPrivateFileRel(backup.first)), &abs_path_to_size);
GetAbsolutePath(GetPrivateFileRel(backup.first)), backup_env_,
&abs_path_to_size);
Status s =
backup.second->LoadFromFile(options_.backup_dir, abs_path_to_size);
if (!s.ok()) {
@ -704,7 +705,7 @@ Status BackupEngineImpl::CreateNewBackup(
// Pre-fetch sizes for data files
std::unordered_map<std::string, uint64_t> data_path_to_size;
if (s.ok()) {
s = InsertPathnameToSizeBytes(db->GetName(), &data_path_to_size);
s = InsertPathnameToSizeBytes(db->GetName(), db_env_, &data_path_to_size);
}
std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
@ -762,7 +763,7 @@ Status BackupEngineImpl::CreateNewBackup(
std::unordered_map<std::string, uint64_t> wal_path_to_size;
if (s.ok()) {
if (db->GetOptions().wal_dir != "") {
s = InsertPathnameToSizeBytes(db->GetOptions().wal_dir,
s = InsertPathnameToSizeBytes(db->GetOptions().wal_dir, db_env_,
&wal_path_to_size);
} else {
wal_path_to_size = std::move(data_path_to_size);
@ -1118,7 +1119,7 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
GetSharedFileWithChecksumRel()}) {
const auto abs_dir = GetAbsolutePath(rel_dir);
InsertPathnameToSizeBytes(abs_dir, &curr_abs_path_to_size);
InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size);
}
for (const auto& file_info : backup->GetFiles()) {
@ -1432,10 +1433,11 @@ void BackupEngineImpl::DeleteChildren(const std::string& dir,
}
Status BackupEngineImpl::InsertPathnameToSizeBytes(
const std::string& dir, std::unordered_map<std::string, uint64_t>* result) {
const std::string& dir, Env* env,
std::unordered_map<std::string, uint64_t>* result) {
assert(result != nullptr);
std::vector<Env::FileAttributes> files_attrs;
Status status = backup_env_->GetChildrenFileAttributes(dir, &files_attrs);
Status status = env->GetChildrenFileAttributes(dir, &files_attrs);
if (!status.ok()) {
return status;
}

View File

@ -13,6 +13,7 @@
#include <algorithm>
#include <iostream>
#include "db/db_impl.h"
#include "db/filename.h"
#include "port/port.h"
#include "port/stack_trace.h"
@ -1318,10 +1319,22 @@ TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) {
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
flush_thread.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// The last manifest roll would've already been cleaned up by the full scan
// that happens when CreateNewBackup invokes EnableFileDeletions. We need to
// trigger another roll to verify non-full scan purges stale manifests.
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_.get());
std::string prev_manifest_path =
DescriptorFileName(dbname_, db_impl->TEST_Current_Manifest_FileNo());
FillDB(db_.get(), 0, 100);
ASSERT_OK(env_->FileExists(prev_manifest_path));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_TRUE(env_->FileExists(prev_manifest_path).IsNotFound());
CloseDBAndBackupEngine();
DestroyDB(dbname_, Options());
AssertBackupConsistency(0, 0, 100);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// see https://github.com/facebook/rocksdb/issues/921