Compare commits
29 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
0113c61ce9 | ||
|
405017bb8c | ||
|
cfdea78fde | ||
|
fef38d9477 | ||
|
4d461733e4 | ||
|
1e7cca20fb | ||
|
ab21dc6323 | ||
|
fa37825fce | ||
|
4f6da54e8d | ||
|
98f11d01bd | ||
|
7b378c0de2 | ||
|
e90493a865 | ||
|
e36e75d7c8 | ||
|
895a4ad185 | ||
|
377d4d2c31 | ||
|
5271d0750a | ||
|
29713489b6 | ||
|
abcad80559 | ||
|
b6a8f16b5e | ||
|
eddc3ebd90 | ||
|
73fcd6b8ff | ||
|
7981e040a7 | ||
|
5279417e73 | ||
|
8c7b501d0f | ||
|
3c1781d6af | ||
|
9d5eb4063c | ||
|
0e35aa9930 | ||
|
023e9bf214 | ||
|
096e5f5d02 |
@ -612,6 +612,7 @@ set(SOURCES
|
||||
utilities/blob_db/blob_compaction_filter.cc
|
||||
utilities/blob_db/blob_db.cc
|
||||
utilities/blob_db/blob_db_impl.cc
|
||||
utilities/blob_db/blob_db_impl_filesnapshot.cc
|
||||
utilities/blob_db/blob_dump_tool.cc
|
||||
utilities/blob_db/blob_file.cc
|
||||
utilities/blob_db/blob_log_reader.cc
|
||||
|
35
HISTORY.md
35
HISTORY.md
@ -1,12 +1,38 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
### Public API Change
|
||||
### New Features
|
||||
|
||||
# 5.16.6 (11/24/2018)
|
||||
### Bug Fixes
|
||||
* Fix the bug that WriteBatchWithIndex's SeekForPrev() doesn't see the entries with the same key.
|
||||
|
||||
## 5.16.5 (10/16/2018)
|
||||
### Bug Fixes
|
||||
* Fix slow flush/compaction when DB contains many snapshots. The problem became noticeable to us in DBs with 100,000+ snapshots, though it will affect others at different thresholds.
|
||||
* Properly set the stop key for a truncated manual CompactRange
|
||||
|
||||
## 5.16.4 (10/10/2018)
|
||||
### Bug Fixes
|
||||
* Fix corner case where a write group leader blocked due to write stall blocks other writers in queue with WriteOptions::no_slowdown set.
|
||||
|
||||
## 5.16.3 (10/1/2018)
|
||||
### Bug Fixes
|
||||
* Fix crash caused when `CompactFiles` run with `CompactionOptions::compression == CompressionType::kDisableCompressionOption`. Now that setting causes the compression type to be chosen according to the column family-wide compression options.
|
||||
|
||||
## 5.16.2 (9/21/2018)
|
||||
### Bug Fixes
|
||||
* Fix bug in partition filters with format_version=4.
|
||||
|
||||
## 5.16.1 (9/17/2018)
|
||||
### Bug Fixes
|
||||
* Remove trace_analyzer_tool from rocksdb_lib target in TARGETS file.
|
||||
* Fix RocksDB Java build and tests.
|
||||
* Remove sync point in Block destructor.
|
||||
|
||||
## 5.16.0 (8/21/2018)
|
||||
### Public API Change
|
||||
* The merge operands are passed to `MergeOperator::ShouldMerge` in the reversed order relative to how they were merged (passed to FullMerge or FullMergeV2) for performance reasons
|
||||
* `OnTableFileCreated` will now be called for empty files generated during compaction. In that case, `TableFileCreationInfo::file_path` will be "(nil)" and `TableFileCreationInfo::file_size` will be zero.
|
||||
* Add `FlushOptions::allow_write_stall`, which controls whether Flush calls start working immediately, even if it causes user writes to stall, or will wait until flush can be performed without causing write stall (similar to `CompactRangeOptions::allow_write_stall`). Note that the default value is false, meaning we add delay to Flush calls until stalling can be avoided when possible. This is behavior change compared to previous RocksDB versions, where Flush calls didn't check if they might cause stall or not.
|
||||
|
||||
* The merge operands are passed to `MergeOperator::ShouldMerge` in the reversed order relative to how they were merged (passed to FullMerge or FullMergeV2) for performance reasons
|
||||
* GetAllKeyVersions() to take an extra argument of `max_num_ikeys`.
|
||||
|
||||
### New Features
|
||||
@ -16,6 +42,7 @@
|
||||
|
||||
### Bug Fixes
|
||||
* Fix a bug in misreporting the estimated partition index size in properties block.
|
||||
* Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction.
|
||||
|
||||
## 5.15.0 (7/17/2018)
|
||||
### Public API Change
|
||||
|
4
Makefile
4
Makefile
@ -1621,7 +1621,7 @@ ZLIB_SHA256 ?= c3e5e9fdd5004dcb542feda5ee4f0ff0744628baf8ed2dd5d66f8ca1197cb1a1
|
||||
ZLIB_DOWNLOAD_BASE ?= http://zlib.net
|
||||
BZIP2_VER ?= 1.0.6
|
||||
BZIP2_SHA256 ?= a2848f34fcd5d6cf47def00461fcb528a0484d8edef8208d6d2e2909dc61d9cd
|
||||
BZIP2_DOWNLOAD_BASE ?= http://www.bzip.org
|
||||
BZIP2_DOWNLOAD_BASE ?= https://web.archive.org/web/20180624184835/http://www.bzip.org
|
||||
SNAPPY_VER ?= 1.1.4
|
||||
SNAPPY_SHA256 ?= 134bfe122fd25599bb807bb8130e7ba6d9bdb851e0b16efcb83ac4f5d0b70057
|
||||
SNAPPY_DOWNLOAD_BASE ?= https://github.com/google/snappy/releases/download
|
||||
@ -1856,7 +1856,7 @@ $(java_libobjects): jl/%.o: %.cc
|
||||
rocksdbjava: $(java_all_libobjects)
|
||||
$(AM_V_GEN)cd java;$(MAKE) javalib;
|
||||
$(AM_V_at)rm -f ./java/target/$(ROCKSDBJNILIB)
|
||||
$(AM_V_at)$(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o ./java/target/$(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(java_libobjects) $(JAVA_LDFLAGS) $(COVERAGEFLAGS)
|
||||
$(AM_V_at)$(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o ./java/target/$(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(java_all_libobjects) $(JAVA_LDFLAGS) $(COVERAGEFLAGS)
|
||||
$(AM_V_at)cd java;jar -cf target/$(ROCKSDB_JAR) HISTORY*.md
|
||||
$(AM_V_at)cd java/target;jar -uf $(ROCKSDB_JAR) $(ROCKSDBJNILIB)
|
||||
$(AM_V_at)cd java/target/classes;jar -uf ../$(ROCKSDB_JAR) org/rocksdb/*.class org/rocksdb/util/*.class
|
||||
|
25
TARGETS
25
TARGETS
@ -1,3 +1,5 @@
|
||||
load("@fbcode_macros//build_defs:auto_headers.bzl", "AutoHeaders")
|
||||
|
||||
REPO_PATH = package_name() + "/"
|
||||
|
||||
BUCK_BINS = "buck-out/gen/" + REPO_PATH
|
||||
@ -195,7 +197,6 @@ cpp_library(
|
||||
"tools/ldb_cmd.cc",
|
||||
"tools/ldb_tool.cc",
|
||||
"tools/sst_dump_tool.cc",
|
||||
"tools/trace_analyzer_tool.cc",
|
||||
"util/arena.cc",
|
||||
"util/auto_roll_logger.cc",
|
||||
"util/bloom.cc",
|
||||
@ -233,6 +234,7 @@ cpp_library(
|
||||
"utilities/blob_db/blob_compaction_filter.cc",
|
||||
"utilities/blob_db/blob_db.cc",
|
||||
"utilities/blob_db/blob_db_impl.cc",
|
||||
"utilities/blob_db/blob_db_impl_filesnapshot.cc",
|
||||
"utilities/blob_db/blob_dump_tool.cc",
|
||||
"utilities/blob_db/blob_file.cc",
|
||||
"utilities/blob_db/blob_log_format.cc",
|
||||
@ -303,6 +305,7 @@ cpp_library(
|
||||
srcs = [
|
||||
"db/db_test_util.cc",
|
||||
"table/mock_table.cc",
|
||||
"tools/trace_analyzer_tool.cc",
|
||||
"util/fault_injection_test_env.cc",
|
||||
"util/testharness.cc",
|
||||
"util/testutil.cc",
|
||||
@ -376,11 +379,6 @@ ROCKS_TESTS = [
|
||||
"table/block_based_filter_block_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"data_block_hash_index_test",
|
||||
"table/data_block_hash_index_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"block_test",
|
||||
"table/block_test.cc",
|
||||
@ -506,6 +504,11 @@ ROCKS_TESTS = [
|
||||
"table/cuckoo_table_reader_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"data_block_hash_index_test",
|
||||
"table/data_block_hash_index_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"date_tiered_test",
|
||||
"utilities/date_tiered/date_tiered_test.cc",
|
||||
@ -956,11 +959,6 @@ ROCKS_TESTS = [
|
||||
"tools/sst_dump_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"trace_analyzer_test",
|
||||
"tools/trace_analyzer_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"statistics_test",
|
||||
"monitoring/statistics_test.cc",
|
||||
@ -996,6 +994,11 @@ ROCKS_TESTS = [
|
||||
"util/timer_queue_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"trace_analyzer_test",
|
||||
"tools/trace_analyzer_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"transaction_test",
|
||||
"utilities/transactions/transaction_test.cc",
|
||||
|
@ -2,7 +2,10 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
rocksdb_target_header = """REPO_PATH = package_name() + "/"
|
||||
rocksdb_target_header = """
|
||||
load("@fbcode_macros//build_defs:auto_headers.bzl", "AutoHeaders")
|
||||
|
||||
REPO_PATH = package_name() + "/"
|
||||
|
||||
BUCK_BINS = "buck-out/gen/" + REPO_PATH
|
||||
|
||||
|
@ -53,11 +53,13 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
FBCODE_BUILD="true"
|
||||
# If we're compiling with TSAN we need pic build
|
||||
PIC_BUILD=$COMPILE_WITH_TSAN
|
||||
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
if [ -n "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
# we need this to build with MySQL. Don't use for other purposes.
|
||||
source "$PWD/build_tools/fbcode_config4.8.1.sh"
|
||||
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_5xx" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
source "$PWD/build_tools/fbcode_config_platform007.sh"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
18
build_tools/dependencies_platform007.sh
Normal file
18
build_tools/dependencies_platform007.sh
Normal file
@ -0,0 +1,18 @@
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/6e8e715624fd15256a7970073387793dfcf79b46/7.x/centos7-native/b2ef2b6
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/ef37e1faa1c29782abfac1ae65a291b9b7966f6d/stable/centos7-native/c9f9104
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/c67031f0f739ac61575a061518d6ef5038f99f90/7.x/platform007/5620abc
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/60d6f124a78798b73944f5ba87c2306ae3460153/2.26/platform007/f259413
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/22c2d65676fb7c23cfa797c4f6937f38b026f3cf/1.2.8/platform007/ca4da3d
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/907b498203d297947f3bb70b9466f47e100f1873/r131/platform007/ca4da3d
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/3ee276cbacfad3074e3f07bf826ac47f06970f4e/1.3.5/platform007/15a3614
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/9c910d36d6235cc40e8ff559358f1833452300ca/master/platform007/5b0f53e
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/9cbf2460284c669ed19c3ccb200a71f7dd7e53c7/2.0.11/platform007/ca4da3d
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/bf3d7497fe4e6d007354f0adffa16ce3003f8338/1.3/platform007/6f3e0a9
|
||||
TBB_BASE=/mnt/gvfs/third-party2/tbb/ff4e0b093534704d8abab678a4fd7f5ea7b094c7/2018_U5/platform007/ca4da3d
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/b5c4a61a5c483ba24722005ae07895971a2ac707/fb/platform007/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/92ff90349e2f43ea0a8246d8b1cf17b6869013e3/2.29.1/centos7-native/da39a3e
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/f3f697a28122e6bcd513273dd9c1ff23852fc59f/3.13.0/platform007/ca4da3d
|
||||
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832
|
157
build_tools/fbcode_config_platform007.sh
Normal file
157
build_tools/fbcode_config_platform007.sh
Normal file
@ -0,0 +1,157 @@
|
||||
#!/bin/sh
|
||||
#
|
||||
# Set environment variables so that we can compile rocksdb using
|
||||
# fbcode settings. It uses the latest g++ and clang compilers and also
|
||||
# uses jemalloc
|
||||
# Environment variables that change the behavior of this script:
|
||||
# PIC_BUILD -- if true, it will only take pic versions of libraries from fbcode. libraries that don't have pic variant will not be included
|
||||
|
||||
|
||||
BASEDIR=`dirname $BASH_SOURCE`
|
||||
source "$BASEDIR/dependencies_platform007.sh"
|
||||
|
||||
CFLAGS=""
|
||||
|
||||
# libgcc
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
|
||||
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
|
||||
|
||||
# glibc
|
||||
GLIBC_INCLUDE="$GLIBC_BASE/include"
|
||||
GLIBC_LIBS=" -L $GLIBC_BASE/lib"
|
||||
|
||||
# snappy
|
||||
SNAPPY_INCLUDE=" -I $SNAPPY_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
SNAPPY_LIBS=" $SNAPPY_BASE/lib/libsnappy.a"
|
||||
else
|
||||
SNAPPY_LIBS=" $SNAPPY_BASE/lib/libsnappy_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DSNAPPY"
|
||||
|
||||
if test -z $PIC_BUILD; then
|
||||
# location of zlib headers and libraries
|
||||
ZLIB_INCLUDE=" -I $ZLIB_BASE/include/"
|
||||
ZLIB_LIBS=" $ZLIB_BASE/lib/libz.a"
|
||||
CFLAGS+=" -DZLIB"
|
||||
|
||||
# location of bzip headers and libraries
|
||||
BZIP_INCLUDE=" -I $BZIP2_BASE/include/"
|
||||
BZIP_LIBS=" $BZIP2_BASE/lib/libbz2.a"
|
||||
CFLAGS+=" -DBZIP2"
|
||||
|
||||
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
|
||||
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
|
||||
CFLAGS+=" -DLZ4"
|
||||
fi
|
||||
|
||||
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
|
||||
else
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DZSTD"
|
||||
|
||||
# location of gflags headers and libraries
|
||||
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags.a"
|
||||
else
|
||||
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DGFLAGS=gflags"
|
||||
|
||||
# location of jemalloc
|
||||
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
|
||||
JEMALLOC_LIB=" $JEMALLOC_BASE/lib/libjemalloc.a"
|
||||
|
||||
if test -z $PIC_BUILD; then
|
||||
# location of numa
|
||||
NUMA_INCLUDE=" -I $NUMA_BASE/include/"
|
||||
NUMA_LIB=" $NUMA_BASE/lib/libnuma.a"
|
||||
CFLAGS+=" -DNUMA"
|
||||
|
||||
# location of libunwind
|
||||
LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a"
|
||||
fi
|
||||
|
||||
# location of TBB
|
||||
TBB_INCLUDE=" -isystem $TBB_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
TBB_LIBS="$TBB_BASE/lib/libtbb.a"
|
||||
else
|
||||
TBB_LIBS="$TBB_BASE/lib/libtbb_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DTBB"
|
||||
|
||||
# use Intel SSE support for checksum calculations
|
||||
export USE_SSE=1
|
||||
export PORTABLE=1
|
||||
|
||||
BINUTILS="$BINUTILS_BASE/bin"
|
||||
AR="$BINUTILS/ar"
|
||||
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
|
||||
|
||||
STDLIBS="-L $GCC_BASE/lib64"
|
||||
|
||||
CLANG_BIN="$CLANG_BASE/bin"
|
||||
CLANG_LIB="$CLANG_BASE/lib"
|
||||
CLANG_SRC="$CLANG_BASE/../../src"
|
||||
|
||||
CLANG_ANALYZER="$CLANG_BIN/clang++"
|
||||
CLANG_SCAN_BUILD="$CLANG_SRC/llvm/tools/clang/tools/scan-build/bin/scan-build"
|
||||
|
||||
if [ -z "$USE_CLANG" ]; then
|
||||
# gcc
|
||||
CC="$GCC_BASE/bin/gcc"
|
||||
CXX="$GCC_BASE/bin/g++"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
JEMALLOC=1
|
||||
else
|
||||
# clang
|
||||
CLANG_INCLUDE="$CLANG_LIB/clang/stable/include"
|
||||
CC="$CLANG_BIN/clang"
|
||||
CXX="$CLANG_BIN/clang++"
|
||||
|
||||
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $CLANG_INCLUDE"
|
||||
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE/linux "
|
||||
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE "
|
||||
CFLAGS+=" -Wno-expansion-to-defined "
|
||||
CXXFLAGS="-nostdinc++"
|
||||
fi
|
||||
|
||||
CFLAGS+=" $DEPS_INCLUDE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
|
||||
CXXFLAGS+=" $CFLAGS"
|
||||
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
|
||||
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" $LIBUNWIND"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
|
||||
# required by libtbb
|
||||
EXEC_LDFLAGS+=" -ldl"
|
||||
|
||||
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
|
||||
|
||||
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
|
||||
|
||||
VALGRIND_VER="$VALGRIND_BASE/bin/"
|
||||
|
||||
# lua not supported because it's on track for deprecation, I think
|
||||
LUA_PATH=
|
||||
LUA_LIB=
|
||||
|
||||
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD LUA_PATH LUA_LIB
|
@ -85,8 +85,9 @@ NON_SHM="TMPD=/tmp/rocksdb_test_tmp"
|
||||
GCC_481="ROCKSDB_FBCODE_BUILD_WITH_481=1"
|
||||
ASAN="COMPILE_WITH_ASAN=1"
|
||||
CLANG="USE_CLANG=1"
|
||||
LITE="OPT=\"-DROCKSDB_LITE -g\""
|
||||
TSAN="COMPILE_WITH_TSAN=1"
|
||||
# in gcc-5 there are known problems with TSAN like https://gcc.gnu.org/bugzilla/show_bug.cgi?id=71090.
|
||||
# using platform007 gives us gcc-8 or higher which has that bug fixed.
|
||||
TSAN="ROCKSDB_FBCODE_BUILD_WITH_PLATFORM007=1 COMPILE_WITH_TSAN=1"
|
||||
UBSAN="COMPILE_WITH_UBSAN=1"
|
||||
TSAN_CRASH='CRASH_TEST_EXT_ARGS="--compression_type=zstd --log2_keys_per_lock=22"'
|
||||
NON_TSAN_CRASH="CRASH_TEST_EXT_ARGS=--compression_type=zstd"
|
||||
|
@ -53,6 +53,45 @@ function get_lib_base()
|
||||
log_variable $__res_var
|
||||
}
|
||||
|
||||
###########################################################
|
||||
# platform007 dependencies #
|
||||
###########################################################
|
||||
|
||||
OUTPUT="$BASEDIR/dependencies_platform007.sh"
|
||||
|
||||
rm -f "$OUTPUT"
|
||||
touch "$OUTPUT"
|
||||
|
||||
echo "Writing dependencies to $OUTPUT"
|
||||
|
||||
# Compilers locations
|
||||
GCC_BASE=`readlink -f $TP2_LATEST/gcc/7.x/centos7-native/*/`
|
||||
CLANG_BASE=`readlink -f $TP2_LATEST/llvm-fb/stable/centos7-native/*/`
|
||||
|
||||
log_variable GCC_BASE
|
||||
log_variable CLANG_BASE
|
||||
|
||||
# Libraries locations
|
||||
get_lib_base libgcc 7.x platform007
|
||||
get_lib_base glibc 2.26 platform007
|
||||
get_lib_base snappy LATEST platform007
|
||||
get_lib_base zlib LATEST platform007
|
||||
get_lib_base bzip2 LATEST platform007
|
||||
get_lib_base lz4 LATEST platform007
|
||||
get_lib_base zstd LATEST platform007
|
||||
get_lib_base gflags LATEST platform007
|
||||
get_lib_base jemalloc LATEST platform007
|
||||
get_lib_base numa LATEST platform007
|
||||
get_lib_base libunwind LATEST platform007
|
||||
get_lib_base tbb LATEST platform007
|
||||
|
||||
get_lib_base kernel-headers fb platform007
|
||||
get_lib_base binutils LATEST centos7-native
|
||||
get_lib_base valgrind LATEST platform007
|
||||
get_lib_base lua 5.3.4 platform007
|
||||
|
||||
git diff $OUTPUT
|
||||
|
||||
###########################################################
|
||||
# 5.x dependencies #
|
||||
###########################################################
|
||||
|
@ -308,6 +308,47 @@ TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
|
||||
delete db;
|
||||
}
|
||||
|
||||
TEST_F(CompactFilesTest, SentinelCompressionType) {
|
||||
// Check that passing `CompressionType::kDisableCompressionOption` to
|
||||
// `CompactFiles` causes it to use the column family compression options.
|
||||
for (auto compaction_style :
|
||||
{CompactionStyle::kCompactionStyleLevel,
|
||||
CompactionStyle::kCompactionStyleUniversal,
|
||||
CompactionStyle::kCompactionStyleNone}) {
|
||||
DestroyDB(db_name_, Options());
|
||||
Options options;
|
||||
options.compaction_style = compaction_style;
|
||||
// L0: Snappy, L1: ZSTD, L2: Snappy
|
||||
options.compression_per_level = {CompressionType::kSnappyCompression,
|
||||
CompressionType::kZlibCompression,
|
||||
CompressionType::kSnappyCompression};
|
||||
options.create_if_missing = true;
|
||||
FlushedFileCollector* collector = new FlushedFileCollector();
|
||||
options.listeners.emplace_back(collector);
|
||||
DB* db = nullptr;
|
||||
ASSERT_OK(DB::Open(options, db_name_, &db));
|
||||
|
||||
db->Put(WriteOptions(), "key", "val");
|
||||
db->Flush(FlushOptions());
|
||||
|
||||
auto l0_files = collector->GetFlushedFiles();
|
||||
ASSERT_EQ(1, l0_files.size());
|
||||
|
||||
// L0->L1 compaction, so output should be ZSTD-compressed
|
||||
CompactionOptions compaction_opts;
|
||||
compaction_opts.compression = CompressionType::kDisableCompressionOption;
|
||||
ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1));
|
||||
|
||||
rocksdb::TablePropertiesCollection all_tables_props;
|
||||
ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props));
|
||||
for (const auto& name_and_table_props : all_tables_props) {
|
||||
ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression),
|
||||
name_and_table_props.second->compression_name);
|
||||
}
|
||||
delete db;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -77,6 +77,12 @@ CompactionIterator::CompactionIterator(
|
||||
earliest_snapshot_ = snapshots_->at(0);
|
||||
latest_snapshot_ = snapshots_->back();
|
||||
}
|
||||
#ifndef NDEBUG
|
||||
// findEarliestVisibleSnapshot assumes this ordering.
|
||||
for (size_t i = 1; i < snapshots_->size(); ++i) {
|
||||
assert(snapshots_->at(i - 1) <= snapshots_->at(i));
|
||||
}
|
||||
#endif
|
||||
if (compaction_filter_ != nullptr) {
|
||||
if (compaction_filter_->IgnoreSnapshots()) {
|
||||
ignore_snapshots_ = true;
|
||||
@ -603,18 +609,23 @@ void CompactionIterator::PrepareOutput() {
|
||||
inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
|
||||
SequenceNumber in, SequenceNumber* prev_snapshot) {
|
||||
assert(snapshots_->size());
|
||||
SequenceNumber prev = kMaxSequenceNumber;
|
||||
for (const auto cur : *snapshots_) {
|
||||
assert(prev == kMaxSequenceNumber || prev <= cur);
|
||||
if (cur >= in && (snapshot_checker_ == nullptr ||
|
||||
snapshot_checker_->IsInSnapshot(in, cur))) {
|
||||
*prev_snapshot = prev == kMaxSequenceNumber ? 0 : prev;
|
||||
auto snapshots_iter = std::lower_bound(
|
||||
snapshots_->begin(), snapshots_->end(), in);
|
||||
if (snapshots_iter == snapshots_->begin()) {
|
||||
*prev_snapshot = 0;
|
||||
} else {
|
||||
*prev_snapshot = *std::prev(snapshots_iter);
|
||||
assert(*prev_snapshot < in);
|
||||
}
|
||||
for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
|
||||
auto cur = *snapshots_iter;
|
||||
assert(in <= cur);
|
||||
if (snapshot_checker_ == nullptr ||
|
||||
snapshot_checker_->IsInSnapshot(in, cur)) {
|
||||
return cur;
|
||||
}
|
||||
prev = cur;
|
||||
assert(prev < kMaxSequenceNumber);
|
||||
*prev_snapshot = cur;
|
||||
}
|
||||
*prev_snapshot = prev;
|
||||
return kMaxSequenceNumber;
|
||||
}
|
||||
|
||||
|
@ -1077,7 +1077,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
}
|
||||
|
||||
if (status.ok() && sub_compact->builder == nullptr &&
|
||||
sub_compact->outputs.size() == 0) {
|
||||
sub_compact->outputs.size() == 0 &&
|
||||
!range_del_agg->IsEmpty()) {
|
||||
// handle subcompaction containing only range deletions
|
||||
status = OpenCompactionOutputFile(sub_compact);
|
||||
}
|
||||
|
@ -219,7 +219,8 @@ void CompactionPicker::GetRange(const std::vector<CompactionInputFiles>& inputs,
|
||||
|
||||
bool CompactionPicker::ExpandInputsToCleanCut(const std::string& /*cf_name*/,
|
||||
VersionStorageInfo* vstorage,
|
||||
CompactionInputFiles* inputs) {
|
||||
CompactionInputFiles* inputs,
|
||||
InternalKey** next_smallest) {
|
||||
// This isn't good compaction
|
||||
assert(!inputs->empty());
|
||||
|
||||
@ -242,7 +243,8 @@ bool CompactionPicker::ExpandInputsToCleanCut(const std::string& /*cf_name*/,
|
||||
GetRange(*inputs, &smallest, &largest);
|
||||
inputs->clear();
|
||||
vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
|
||||
hint_index, &hint_index);
|
||||
hint_index, &hint_index, true,
|
||||
next_smallest);
|
||||
} while (inputs->size() > old_size);
|
||||
|
||||
// we started off with inputs non-empty and the previous loop only grew
|
||||
@ -315,13 +317,29 @@ Compaction* CompactionPicker::CompactFiles(
|
||||
// shouldn't have been released since.
|
||||
assert(!FilesRangeOverlapWithCompaction(input_files, output_level));
|
||||
|
||||
auto c =
|
||||
new Compaction(vstorage, ioptions_, mutable_cf_options, input_files,
|
||||
output_level, compact_options.output_file_size_limit,
|
||||
mutable_cf_options.max_compaction_bytes, output_path_id,
|
||||
compact_options.compression, ioptions_.compression_opts,
|
||||
compact_options.max_subcompactions,
|
||||
/* grandparents */ {}, true);
|
||||
CompressionType compression_type;
|
||||
if (compact_options.compression == kDisableCompressionOption) {
|
||||
int base_level;
|
||||
if (ioptions_.compaction_style == kCompactionStyleLevel) {
|
||||
base_level = vstorage->base_level();
|
||||
} else {
|
||||
base_level = 1;
|
||||
}
|
||||
compression_type =
|
||||
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
|
||||
output_level, base_level);
|
||||
} else {
|
||||
// TODO(ajkr): `CompactionOptions` offers configurable `CompressionType`
|
||||
// without configurable `CompressionOptions`, which is inconsistent.
|
||||
compression_type = compact_options.compression;
|
||||
}
|
||||
auto c = new Compaction(
|
||||
vstorage, ioptions_, mutable_cf_options, input_files, output_level,
|
||||
compact_options.output_file_size_limit,
|
||||
mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
|
||||
GetCompressionOptions(ioptions_, vstorage, output_level),
|
||||
compact_options.max_subcompactions,
|
||||
/* grandparents */ {}, true);
|
||||
RegisterCompaction(c);
|
||||
return c;
|
||||
}
|
||||
@ -633,7 +651,6 @@ Compaction* CompactionPicker::CompactRange(
|
||||
uint64_t s = inputs[i]->compensated_file_size;
|
||||
total += s;
|
||||
if (total >= limit) {
|
||||
**compaction_end = inputs[i + 1]->smallest;
|
||||
covering_the_whole_range = false;
|
||||
inputs.files.resize(i + 1);
|
||||
break;
|
||||
@ -642,7 +659,10 @@ Compaction* CompactionPicker::CompactRange(
|
||||
}
|
||||
assert(output_path_id < static_cast<uint32_t>(ioptions_.cf_paths.size()));
|
||||
|
||||
if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs) == false) {
|
||||
InternalKey key_storage;
|
||||
InternalKey* next_smallest = &key_storage;
|
||||
if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs, &next_smallest) ==
|
||||
false) {
|
||||
// manual compaction is now multi-threaded, so it can
|
||||
// happen that ExpandWhileOverlapping fails
|
||||
// we handle it higher in RunManualCompaction
|
||||
@ -650,8 +670,10 @@ Compaction* CompactionPicker::CompactRange(
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (covering_the_whole_range) {
|
||||
if (covering_the_whole_range || !next_smallest) {
|
||||
*compaction_end = nullptr;
|
||||
} else {
|
||||
**compaction_end = *next_smallest;
|
||||
}
|
||||
|
||||
CompactionInputFiles output_level_inputs;
|
||||
|
@ -151,7 +151,8 @@ class CompactionPicker {
|
||||
// Will return false if it is impossible to apply this compaction.
|
||||
bool ExpandInputsToCleanCut(const std::string& cf_name,
|
||||
VersionStorageInfo* vstorage,
|
||||
CompactionInputFiles* inputs);
|
||||
CompactionInputFiles* inputs,
|
||||
InternalKey** next_smallest = nullptr);
|
||||
|
||||
// Returns true if any one of the parent files are being compacted
|
||||
bool IsRangeInCompaction(VersionStorageInfo* vstorage,
|
||||
|
@ -22,11 +22,12 @@ class DBBloomFilterTest : public DBTestBase {
|
||||
|
||||
class DBBloomFilterTestWithParam
|
||||
: public DBTestBase,
|
||||
public testing::WithParamInterface<std::tuple<bool, bool>> {
|
||||
public testing::WithParamInterface<std::tuple<bool, bool, uint32_t>> {
|
||||
// public testing::WithParamInterface<bool> {
|
||||
protected:
|
||||
bool use_block_based_filter_;
|
||||
bool partition_filters_;
|
||||
uint32_t format_version_;
|
||||
|
||||
public:
|
||||
DBBloomFilterTestWithParam() : DBTestBase("/db_bloom_filter_tests") {}
|
||||
@ -36,9 +37,12 @@ class DBBloomFilterTestWithParam
|
||||
void SetUp() override {
|
||||
use_block_based_filter_ = std::get<0>(GetParam());
|
||||
partition_filters_ = std::get<1>(GetParam());
|
||||
format_version_ = std::get<2>(GetParam());
|
||||
}
|
||||
};
|
||||
|
||||
class DBBloomFilterTestDefFormatVersion : public DBBloomFilterTestWithParam {};
|
||||
|
||||
class SliceTransformLimitedDomainGeneric : public SliceTransform {
|
||||
const char* Name() const override {
|
||||
return "SliceTransformLimitedDomainGeneric";
|
||||
@ -62,7 +66,7 @@ class SliceTransformLimitedDomainGeneric : public SliceTransform {
|
||||
// KeyMayExist can lead to a few false positives, but not false negatives.
|
||||
// To make test deterministic, use a much larger number of bits per key-20 than
|
||||
// bits in the key, so that false positives are eliminated
|
||||
TEST_P(DBBloomFilterTestWithParam, KeyMayExist) {
|
||||
TEST_P(DBBloomFilterTestDefFormatVersion, KeyMayExist) {
|
||||
do {
|
||||
ReadOptions ropts;
|
||||
std::string value;
|
||||
@ -401,6 +405,11 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
|
||||
table_options.index_type =
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
||||
}
|
||||
table_options.format_version = format_version_;
|
||||
if (format_version_ >= 4) {
|
||||
// value delta encoding challenged more with index interval > 1
|
||||
table_options.index_block_restart_interval = 8;
|
||||
}
|
||||
table_options.metadata_block_size = 32;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
|
||||
@ -456,10 +465,26 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBBloomFilterTestWithParam, DBBloomFilterTestWithParam,
|
||||
::testing::Values(std::make_tuple(true, false),
|
||||
std::make_tuple(false, true),
|
||||
std::make_tuple(false, false)));
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
FormatDef, DBBloomFilterTestDefFormatVersion,
|
||||
::testing::Values(std::make_tuple(true, false, test::kDefaultFormatVersion),
|
||||
std::make_tuple(false, true, test::kDefaultFormatVersion),
|
||||
std::make_tuple(false, false,
|
||||
test::kDefaultFormatVersion)));
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
FormatDef, DBBloomFilterTestWithParam,
|
||||
::testing::Values(std::make_tuple(true, false, test::kDefaultFormatVersion),
|
||||
std::make_tuple(false, true, test::kDefaultFormatVersion),
|
||||
std::make_tuple(false, false,
|
||||
test::kDefaultFormatVersion)));
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
FormatLatest, DBBloomFilterTestWithParam,
|
||||
::testing::Values(std::make_tuple(true, false, test::kLatestFormatVersion),
|
||||
std::make_tuple(false, true, test::kLatestFormatVersion),
|
||||
std::make_tuple(false, false,
|
||||
test::kLatestFormatVersion)));
|
||||
|
||||
TEST_F(DBBloomFilterTest, BloomFilterRate) {
|
||||
while (ChangeFilterOptions()) {
|
||||
|
@ -116,6 +116,22 @@ private:
|
||||
std::vector<std::atomic<int>> compaction_completed_;
|
||||
};
|
||||
|
||||
class SstStatsCollector : public EventListener {
|
||||
public:
|
||||
SstStatsCollector() : num_ssts_creation_started_(0) {}
|
||||
|
||||
void OnTableFileCreationStarted(const TableFileCreationBriefInfo& /* info */) override {
|
||||
++num_ssts_creation_started_;
|
||||
}
|
||||
|
||||
int num_ssts_creation_started() {
|
||||
return num_ssts_creation_started_;
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<int> num_ssts_creation_started_;
|
||||
};
|
||||
|
||||
static const int kCDTValueSize = 1000;
|
||||
static const int kCDTKeysPerBuffer = 4;
|
||||
static const int kCDTNumLevels = 8;
|
||||
@ -3485,12 +3501,13 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
|
||||
// ensure the auto compaction doesn't finish until manual compaction has
|
||||
// had a chance to be delayed.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWait", "CompactionJob::Run():End"}});
|
||||
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
|
||||
"CompactionJob::Run():End"}});
|
||||
} else {
|
||||
// ensure the auto-compaction doesn't finish until manual compaction has
|
||||
// continued without delay.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}});
|
||||
{{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
|
||||
}
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
@ -3538,12 +3555,13 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
|
||||
// ensure the flush doesn't finish until manual compaction has had a
|
||||
// chance to be delayed.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWait", "FlushJob::WriteLevel0Table"}});
|
||||
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
|
||||
"FlushJob::WriteLevel0Table"}});
|
||||
} else {
|
||||
// ensure the flush doesn't finish until manual compaction has continued
|
||||
// without delay.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWaitDone",
|
||||
{{"DBImpl::FlushMemTable:StallWaitDone",
|
||||
"FlushJob::WriteLevel0Table"}});
|
||||
}
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
@ -3553,6 +3571,7 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
|
||||
ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
|
||||
FlushOptions flush_opts;
|
||||
flush_opts.wait = false;
|
||||
flush_opts.allow_write_stall = true;
|
||||
dbfull()->Flush(flush_opts);
|
||||
}
|
||||
|
||||
@ -3588,7 +3607,7 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
|
||||
// The auto-compaction waits until the manual compaction finishes to ensure
|
||||
// the signal comes from closing CF/DB, not from compaction making progress.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWait",
|
||||
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
|
||||
"DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
|
||||
{"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
|
||||
"CompactionJob::Run():End"}});
|
||||
@ -3639,18 +3658,21 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
|
||||
// began. So it unblocks CompactRange and precludes its flush. Throughout the
|
||||
// test, stall conditions are upheld via high L0 file count.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWait",
|
||||
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
|
||||
"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
|
||||
{"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
|
||||
"DBImpl::CompactRange:StallWaitDone"},
|
||||
{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}});
|
||||
"DBImpl::FlushMemTable:StallWaitDone"},
|
||||
{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
//used for the delayable flushes
|
||||
FlushOptions flush_opts;
|
||||
flush_opts.allow_write_stall = true;
|
||||
for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
|
||||
for (int j = 0; j < 2; ++j) {
|
||||
ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
|
||||
}
|
||||
Flush();
|
||||
dbfull()->Flush(flush_opts);
|
||||
}
|
||||
auto manual_compaction_thread = port::Thread([this]() {
|
||||
CompactRangeOptions cro;
|
||||
@ -3660,7 +3682,7 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
|
||||
|
||||
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
|
||||
Put(ToString(0), RandomString(&rnd, 1024));
|
||||
Flush();
|
||||
dbfull()->Flush(flush_opts);
|
||||
Put(ToString(0), RandomString(&rnd, 1024));
|
||||
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
|
||||
manual_compaction_thread.join();
|
||||
@ -3816,6 +3838,30 @@ TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
|
||||
bg_thread.join();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
|
||||
Options options = CurrentOptions();
|
||||
SstStatsCollector* collector = new SstStatsCollector();
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.listeners.emplace_back(collector);
|
||||
Reopen(options);
|
||||
|
||||
// Make sure the L0 files overlap to prevent trivial move.
|
||||
ASSERT_OK(Put("a", "val"));
|
||||
ASSERT_OK(Put("b", "val"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(Delete("a"));
|
||||
ASSERT_OK(Delete("b"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
|
||||
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
|
||||
|
||||
// Expect one file creation to start for each flush, and zero for compaction
|
||||
// since no keys are written.
|
||||
ASSERT_EQ(2, collector->num_ssts_creation_started());
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
|
||||
::testing::Values(std::make_tuple(1, true),
|
||||
std::make_tuple(1, false),
|
||||
@ -3913,6 +3959,50 @@ INSTANTIATE_TEST_CASE_P(
|
||||
CompactionPri::kOldestSmallestSeqFirst,
|
||||
CompactionPri::kMinOverlappingRatio));
|
||||
|
||||
class NoopMergeOperator : public MergeOperator {
|
||||
public:
|
||||
NoopMergeOperator() {}
|
||||
|
||||
virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
|
||||
MergeOperationOutput* merge_out) const override {
|
||||
std::string val("bar");
|
||||
merge_out->new_value = val;
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override { return "Noop"; }
|
||||
};
|
||||
|
||||
TEST_F(DBCompactionTest, PartialManualCompaction) {
|
||||
Options opts = CurrentOptions();
|
||||
opts.num_levels = 3;
|
||||
opts.level0_file_num_compaction_trigger = 10;
|
||||
opts.compression = kNoCompression;
|
||||
opts.merge_operator.reset(new NoopMergeOperator());
|
||||
opts.target_file_size_base = 10240;
|
||||
DestroyAndReopen(opts);
|
||||
|
||||
Random rnd(301);
|
||||
for (auto i = 0; i < 8; ++i) {
|
||||
for (auto j = 0; j < 10; ++j) {
|
||||
Merge("foo", RandomString(&rnd, 1024));
|
||||
}
|
||||
Flush();
|
||||
}
|
||||
|
||||
MoveFilesToLevel(2);
|
||||
|
||||
std::string prop;
|
||||
EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
|
||||
uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
|
||||
ASSERT_OK(dbfull()->SetOptions(
|
||||
{{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
|
||||
|
||||
CompactRangeOptions cro;
|
||||
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
||||
dbfull()->CompactRange(cro, nullptr, nullptr);
|
||||
}
|
||||
|
||||
#endif // !defined(ROCKSDB_LITE)
|
||||
} // namespace rocksdb
|
||||
|
||||
|
@ -35,6 +35,7 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
|
||||
Reopen(options);
|
||||
FlushOptions no_wait;
|
||||
no_wait.wait = false;
|
||||
no_wait.allow_write_stall=true;
|
||||
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"VersionSet::LogAndApply:WriteManifest",
|
||||
|
@ -396,7 +396,7 @@ class DBImpl : public DB {
|
||||
Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
|
||||
|
||||
// Force current memtable contents to be flushed.
|
||||
Status TEST_FlushMemTable(bool wait = true,
|
||||
Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
|
||||
ColumnFamilyHandle* cfh = nullptr);
|
||||
|
||||
// Wait for memtable compaction
|
||||
@ -811,6 +811,7 @@ class DBImpl : public DB {
|
||||
friend struct SuperVersion;
|
||||
friend class CompactedDBImpl;
|
||||
friend class DBTest_ConcurrentFlushWAL_Test;
|
||||
friend class DBTest_MixedSlowdownOptionsStop_Test;
|
||||
#ifndef NDEBUG
|
||||
friend class DBTest2_ReadCallbackTest_Test;
|
||||
friend class WriteCallbackTest_WriteWithCallbackTest_Test;
|
||||
@ -918,6 +919,10 @@ class DBImpl : public DB {
|
||||
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
|
||||
FlushReason flush_reason, bool writes_stopped = false);
|
||||
|
||||
// Wait until flushing this column family won't stall writes
|
||||
Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
|
||||
bool* flush_needed);
|
||||
|
||||
// Wait for memtable flushed.
|
||||
// If flush_memtable_id is non-null, wait until the memtable with the ID
|
||||
// gets flush. Otherwise, wait until the column family don't have any
|
||||
|
@ -324,60 +324,12 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
|
||||
CleanupSuperVersion(super_version);
|
||||
}
|
||||
|
||||
if (!options.allow_write_stall && flush_needed) {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
|
||||
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
|
||||
do {
|
||||
if (write_stall_condition != WriteStallCondition::kNormal) {
|
||||
TEST_SYNC_POINT("DBImpl::CompactRange:StallWait");
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"[%s] CompactRange waiting on stall conditions to clear",
|
||||
cfd->GetName().c_str());
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status::ShutdownInProgress();
|
||||
}
|
||||
|
||||
uint64_t earliest_memtable_id =
|
||||
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
|
||||
if (earliest_memtable_id > orig_active_memtable_id) {
|
||||
// We waited so long that the memtable we were originally waiting on was
|
||||
// flushed.
|
||||
flush_needed = false;
|
||||
break;
|
||||
}
|
||||
|
||||
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
|
||||
const auto* vstorage = cfd->current()->storage_info();
|
||||
|
||||
// Skip stalling check if we're below auto-flush and auto-compaction
|
||||
// triggers. If it stalled in these conditions, that'd mean the stall
|
||||
// triggers are so low that stalling is needed for any background work. In
|
||||
// that case we shouldn't wait since background work won't be scheduled.
|
||||
if (cfd->imm()->NumNotFlushed() <
|
||||
cfd->ioptions()->min_write_buffer_number_to_merge &&
|
||||
vstorage->l0_delay_trigger_count() <
|
||||
mutable_cf_options.level0_file_num_compaction_trigger) {
|
||||
break;
|
||||
}
|
||||
|
||||
// check whether one extra immutable memtable or an extra L0 file would
|
||||
// cause write stalling mode to be entered. It could still enter stall
|
||||
// mode due to pending compaction bytes, but that's less common
|
||||
write_stall_condition =
|
||||
ColumnFamilyData::GetWriteStallConditionAndCause(
|
||||
cfd->imm()->NumNotFlushed() + 1,
|
||||
vstorage->l0_delay_trigger_count() + 1,
|
||||
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
|
||||
.first;
|
||||
} while (write_stall_condition != WriteStallCondition::kNormal);
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::CompactRange:StallWaitDone");
|
||||
Status s;
|
||||
if (flush_needed) {
|
||||
s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction);
|
||||
FlushOptions fo;
|
||||
fo.allow_write_stall = options.allow_write_stall;
|
||||
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
|
||||
false /* writes_stopped*/);
|
||||
if (!s.ok()) {
|
||||
LogFlush(immutable_db_options_.info_log);
|
||||
return s;
|
||||
@ -1077,6 +1029,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
FlushReason flush_reason, bool writes_stopped) {
|
||||
Status s;
|
||||
uint64_t flush_memtable_id = 0;
|
||||
if (!flush_options.allow_write_stall) {
|
||||
bool flush_needed = true;
|
||||
s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
|
||||
TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
|
||||
if (!s.ok() || !flush_needed) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
{
|
||||
WriteContext context;
|
||||
InstrumentedMutexLock guard_lock(&mutex_);
|
||||
@ -1115,6 +1075,69 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
return s;
|
||||
}
|
||||
|
||||
// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
|
||||
// cause write stall, for example if one memtable is being flushed already.
|
||||
// This method tries to avoid write stall (similar to CompactRange() behavior)
|
||||
// it emulates how the SuperVersion / LSM would change if flush happens, checks
|
||||
// it against various constrains and delays flush if it'd cause write stall.
|
||||
// Called should check status and flush_needed to see if flush already happened.
|
||||
Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
|
||||
bool* flush_needed) {
|
||||
{
|
||||
*flush_needed = true;
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
|
||||
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
|
||||
do {
|
||||
if (write_stall_condition != WriteStallCondition::kNormal) {
|
||||
TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"[%s] WaitUntilFlushWouldNotStallWrites"
|
||||
" waiting on stall conditions to clear",
|
||||
cfd->GetName().c_str());
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status::ShutdownInProgress();
|
||||
}
|
||||
|
||||
uint64_t earliest_memtable_id =
|
||||
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
|
||||
if (earliest_memtable_id > orig_active_memtable_id) {
|
||||
// We waited so long that the memtable we were originally waiting on was
|
||||
// flushed.
|
||||
*flush_needed = false;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
|
||||
const auto* vstorage = cfd->current()->storage_info();
|
||||
|
||||
// Skip stalling check if we're below auto-flush and auto-compaction
|
||||
// triggers. If it stalled in these conditions, that'd mean the stall
|
||||
// triggers are so low that stalling is needed for any background work. In
|
||||
// that case we shouldn't wait since background work won't be scheduled.
|
||||
if (cfd->imm()->NumNotFlushed() <
|
||||
cfd->ioptions()->min_write_buffer_number_to_merge &&
|
||||
vstorage->l0_delay_trigger_count() <
|
||||
mutable_cf_options.level0_file_num_compaction_trigger) {
|
||||
break;
|
||||
}
|
||||
|
||||
// check whether one extra immutable memtable or an extra L0 file would
|
||||
// cause write stalling mode to be entered. It could still enter stall
|
||||
// mode due to pending compaction bytes, but that's less common
|
||||
write_stall_condition =
|
||||
ColumnFamilyData::GetWriteStallConditionAndCause(
|
||||
cfd->imm()->NumNotFlushed() + 1,
|
||||
vstorage->l0_delay_trigger_count() + 1,
|
||||
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
|
||||
.first;
|
||||
} while (write_stall_condition != WriteStallCondition::kNormal);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
|
||||
const uint64_t* flush_memtable_id) {
|
||||
Status s;
|
||||
|
@ -100,9 +100,11 @@ Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {
|
||||
return SwitchMemtable(cfd, &write_context);
|
||||
}
|
||||
|
||||
Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) {
|
||||
Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
|
||||
ColumnFamilyHandle* cfh) {
|
||||
FlushOptions fo;
|
||||
fo.wait = wait;
|
||||
fo.allow_write_stall = allow_write_stall;
|
||||
ColumnFamilyData* cfd;
|
||||
if (cfh == nullptr) {
|
||||
cfd = default_cf_handle_->cfd();
|
||||
|
@ -1146,10 +1146,14 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
|
||||
uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
|
||||
if (delay > 0) {
|
||||
if (write_options.no_slowdown) {
|
||||
return Status::Incomplete();
|
||||
return Status::Incomplete("Write stall");
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
|
||||
|
||||
// Notify write_thread_ about the stall so it can setup a barrier and
|
||||
// fail any pending writers with no_slowdown
|
||||
write_thread_.BeginWriteStall();
|
||||
TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
|
||||
mutex_.Unlock();
|
||||
// We will delay the write until we have slept for delay ms or
|
||||
// we don't need a delay anymore
|
||||
@ -1166,15 +1170,21 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
|
||||
env_->SleepForMicroseconds(kDelayInterval);
|
||||
}
|
||||
mutex_.Lock();
|
||||
write_thread_.EndWriteStall();
|
||||
}
|
||||
|
||||
while (!error_handler_.IsDBStopped() && write_controller_.IsStopped()) {
|
||||
if (write_options.no_slowdown) {
|
||||
return Status::Incomplete();
|
||||
return Status::Incomplete("Write stall");
|
||||
}
|
||||
delayed = true;
|
||||
|
||||
// Notify write_thread_ about the stall so it can setup a barrier and
|
||||
// fail any pending writers with no_slowdown
|
||||
write_thread_.BeginWriteStall();
|
||||
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
|
||||
bg_cv_.Wait();
|
||||
write_thread_.EndWriteStall();
|
||||
}
|
||||
}
|
||||
assert(!delayed || !write_options.no_slowdown);
|
||||
|
206
db/db_test.cc
206
db/db_test.cc
@ -262,6 +262,196 @@ TEST_F(DBTest, SkipDelay) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, MixedSlowdownOptions) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.write_buffer_size = 100000;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
std::vector<port::Thread> threads;
|
||||
std::atomic<int> thread_num(0);
|
||||
|
||||
std::function<void()> write_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = false;
|
||||
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
std::function<void()> write_no_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
// Use a small number to ensure a large delay that is still effective
|
||||
// when we do Put
|
||||
// TODO(myabandeh): this is time dependent and could potentially make
|
||||
// the test flaky
|
||||
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
|
||||
std::atomic<int> sleep_count(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::DelayWrite:BeginWriteStallDone",
|
||||
[&](void* /*arg*/) {
|
||||
sleep_count.fetch_add(1);
|
||||
if (threads.empty()) {
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
}
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_no_slowdown_func);
|
||||
}
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = false;
|
||||
wo.disableWAL = false;
|
||||
wo.no_slowdown = false;
|
||||
dbfull()->Put(wo, "foo", "bar");
|
||||
// We need the 2nd write to trigger delay. This is because delay is
|
||||
// estimated based on the last write size which is 0 for the first write.
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
|
||||
token.reset();
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
ASSERT_GE(sleep_count.load(), 1);
|
||||
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
|
||||
}
|
||||
|
||||
TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.write_buffer_size = 100000;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
std::vector<port::Thread> threads;
|
||||
std::atomic<int> thread_num(0);
|
||||
|
||||
std::function<void()> write_no_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
// Use a small number to ensure a large delay that is still effective
|
||||
// when we do Put
|
||||
// TODO(myabandeh): this is time dependent and could potentially make
|
||||
// the test flaky
|
||||
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
|
||||
std::atomic<int> sleep_count(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::DelayWrite:Sleep",
|
||||
[&](void* /*arg*/) {
|
||||
sleep_count.fetch_add(1);
|
||||
if (threads.empty()) {
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_no_slowdown_func);
|
||||
}
|
||||
// Sleep for 2s to allow the threads to insert themselves into the
|
||||
// write queue
|
||||
env_->SleepForMicroseconds(3000000ULL);
|
||||
}
|
||||
});
|
||||
std::atomic<int> wait_count(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::DelayWrite:Wait",
|
||||
[&](void* /*arg*/) { wait_count.fetch_add(1); });
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = false;
|
||||
wo.disableWAL = false;
|
||||
wo.no_slowdown = false;
|
||||
dbfull()->Put(wo, "foo", "bar");
|
||||
// We need the 2nd write to trigger delay. This is because delay is
|
||||
// estimated based on the last write size which is 0 for the first write.
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
|
||||
token.reset();
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
ASSERT_EQ(sleep_count.load(), 1);
|
||||
ASSERT_GE(wait_count.load(), 0);
|
||||
}
|
||||
|
||||
TEST_F(DBTest, MixedSlowdownOptionsStop) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.write_buffer_size = 100000;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
std::vector<port::Thread> threads;
|
||||
std::atomic<int> thread_num(0);
|
||||
|
||||
std::function<void()> write_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = false;
|
||||
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
std::function<void()> write_no_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
std::function<void()> wakeup_writer = [&]() {
|
||||
dbfull()->mutex_.Lock();
|
||||
dbfull()->bg_cv_.SignalAll();
|
||||
dbfull()->mutex_.Unlock();
|
||||
};
|
||||
// Use a small number to ensure a large delay that is still effective
|
||||
// when we do Put
|
||||
// TODO(myabandeh): this is time dependent and could potentially make
|
||||
// the test flaky
|
||||
auto token = dbfull()->TEST_write_controler().GetStopToken();
|
||||
std::atomic<int> wait_count(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::DelayWrite:Wait",
|
||||
[&](void* /*arg*/) {
|
||||
wait_count.fetch_add(1);
|
||||
if (threads.empty()) {
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
}
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_no_slowdown_func);
|
||||
}
|
||||
// Sleep for 2s to allow the threads to insert themselves into the
|
||||
// write queue
|
||||
env_->SleepForMicroseconds(3000000ULL);
|
||||
}
|
||||
token.reset();
|
||||
threads.emplace_back(wakeup_writer);
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = false;
|
||||
wo.disableWAL = false;
|
||||
wo.no_slowdown = false;
|
||||
dbfull()->Put(wo, "foo", "bar");
|
||||
// We need the 2nd write to trigger delay. This is because delay is
|
||||
// estimated based on the last write size which is 0 for the first write.
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
|
||||
token.reset();
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
ASSERT_GE(wait_count.load(), 1);
|
||||
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
|
||||
}
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBTest, LevelLimitReopen) {
|
||||
@ -4331,7 +4521,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
|
||||
// Clean up memtable and L0. Block compaction threads. If continue to write
|
||||
// and flush memtables. We should see put stop after 8 memtable flushes
|
||||
// since level0_stop_writes_trigger = 8
|
||||
dbfull()->TEST_FlushMemTable(true);
|
||||
dbfull()->TEST_FlushMemTable(true, true);
|
||||
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
||||
// Block compaction
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
@ -4344,7 +4534,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
|
||||
WriteOptions wo;
|
||||
while (count < 64) {
|
||||
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
|
||||
dbfull()->TEST_FlushMemTable(true);
|
||||
dbfull()->TEST_FlushMemTable(true, true);
|
||||
count++;
|
||||
if (dbfull()->TEST_write_controler().IsStopped()) {
|
||||
sleeping_task_low.WakeUp();
|
||||
@ -4372,7 +4562,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
|
||||
count = 0;
|
||||
while (count < 64) {
|
||||
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
|
||||
dbfull()->TEST_FlushMemTable(true);
|
||||
dbfull()->TEST_FlushMemTable(true, true);
|
||||
count++;
|
||||
if (dbfull()->TEST_write_controler().IsStopped()) {
|
||||
sleeping_task_low.WakeUp();
|
||||
@ -5512,7 +5702,7 @@ TEST_F(DBTest, SoftLimit) {
|
||||
for (int i = 0; i < 72; i++) {
|
||||
Put(Key(i), std::string(5000, 'x'));
|
||||
if (i % 10 == 0) {
|
||||
Flush();
|
||||
dbfull()->TEST_FlushMemTable(true, true);
|
||||
}
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
@ -5522,7 +5712,7 @@ TEST_F(DBTest, SoftLimit) {
|
||||
for (int i = 0; i < 72; i++) {
|
||||
Put(Key(i), std::string(5000, 'x'));
|
||||
if (i % 10 == 0) {
|
||||
Flush();
|
||||
dbfull()->TEST_FlushMemTable(true, true);
|
||||
}
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
@ -5541,7 +5731,7 @@ TEST_F(DBTest, SoftLimit) {
|
||||
Put(Key(i), std::string(5000, 'x'));
|
||||
Put(Key(100 - i), std::string(5000, 'x'));
|
||||
// Flush the file. File size is around 30KB.
|
||||
Flush();
|
||||
dbfull()->TEST_FlushMemTable(true, true);
|
||||
}
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
|
||||
@ -5576,7 +5766,7 @@ TEST_F(DBTest, SoftLimit) {
|
||||
Put(Key(10 + i), std::string(5000, 'x'));
|
||||
Put(Key(90 - i), std::string(5000, 'x'));
|
||||
// Flush the file. File size is around 30KB.
|
||||
Flush();
|
||||
dbfull()->TEST_FlushMemTable(true, true);
|
||||
}
|
||||
|
||||
// Wake up sleep task to enable compaction to run and waits
|
||||
@ -5597,7 +5787,7 @@ TEST_F(DBTest, SoftLimit) {
|
||||
Put(Key(20 + i), std::string(5000, 'x'));
|
||||
Put(Key(80 - i), std::string(5000, 'x'));
|
||||
// Flush the file. File size is around 30KB.
|
||||
Flush();
|
||||
dbfull()->TEST_FlushMemTable(true, true);
|
||||
}
|
||||
// Wake up sleep task to enable compaction to run and waits
|
||||
// for it to go to sleep state again to make sure one compaction
|
||||
|
@ -2649,7 +2649,7 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
|
||||
#endif
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, IteratorPinnedMemory) {
|
||||
TEST_F(DBTest2, DISABLED_IteratorPinnedMemory) {
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
options.statistics = rocksdb::CreateDBStatistics();
|
||||
|
@ -451,13 +451,14 @@ Options DBTestBase::GetOptions(
|
||||
}
|
||||
case kBlockBasedTableWithPartitionedIndexFormat4: {
|
||||
table_options.format_version = 4;
|
||||
// Format 3 changes the binary index format. Since partitioned index is a
|
||||
// Format 4 changes the binary index format. Since partitioned index is a
|
||||
// super-set of simple indexes, we are also using kTwoLevelIndexSearch to
|
||||
// test this format.
|
||||
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
|
||||
// The top-level index in partition filters are also affected by format 3.
|
||||
// The top-level index in partition filters are also affected by format 4.
|
||||
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
||||
table_options.partition_filters = true;
|
||||
table_options.index_block_restart_interval = 8;
|
||||
break;
|
||||
}
|
||||
case kBlockBasedTableWithIndexRestartInterval: {
|
||||
|
@ -109,8 +109,6 @@ struct OptionsOverride {
|
||||
// These will be used only if filter_policy is set
|
||||
bool partition_filters = false;
|
||||
uint64_t metadata_block_size = 1024;
|
||||
BlockBasedTableOptions::IndexType index_type =
|
||||
BlockBasedTableOptions::IndexType::kBinarySearch;
|
||||
|
||||
// Used as a bit mask of individual enums in which to skip an XF test point
|
||||
int skip_policy = 0;
|
||||
|
@ -417,7 +417,9 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
|
||||
for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
|
||||
++i) {
|
||||
Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
|
||||
db_->Flush(FlushOptions(), handles_[1]);
|
||||
FlushOptions fo;
|
||||
fo.allow_write_stall = true;
|
||||
db_->Flush(fo, handles_[1]);
|
||||
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
|
||||
}
|
||||
ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
|
||||
|
@ -344,7 +344,7 @@ class CollapsedRangeDelMap : public RangeDelMap {
|
||||
}
|
||||
}
|
||||
|
||||
size_t Size() const override { return rep_.size() - 1; }
|
||||
size_t Size() const override { return rep_.empty() ? 0 : rep_.size() - 1; }
|
||||
|
||||
void InvalidatePosition() override { iter_ = rep_.end(); }
|
||||
|
||||
@ -493,7 +493,8 @@ Status RangeDelAggregator::AddTombstones(
|
||||
tombstone.end_key_ = largest->user_key();
|
||||
}
|
||||
}
|
||||
GetRangeDelMap(tombstone.seq_).AddTombstone(std::move(tombstone));
|
||||
auto seq = tombstone.seq_;
|
||||
GetRangeDelMap(seq).AddTombstone(std::move(tombstone));
|
||||
input->Next();
|
||||
}
|
||||
if (!first_iter) {
|
||||
|
@ -2032,7 +2032,7 @@ bool VersionStorageInfo::OverlapInLevel(int level,
|
||||
void VersionStorageInfo::GetOverlappingInputs(
|
||||
int level, const InternalKey* begin, const InternalKey* end,
|
||||
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
|
||||
bool expand_range) const {
|
||||
bool expand_range, InternalKey** next_smallest) const {
|
||||
if (level >= num_non_empty_levels_) {
|
||||
// this level is empty, no overlapping inputs
|
||||
return;
|
||||
@ -2051,11 +2051,17 @@ void VersionStorageInfo::GetOverlappingInputs(
|
||||
}
|
||||
const Comparator* user_cmp = user_comparator_;
|
||||
if (level > 0) {
|
||||
GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
|
||||
hint_index, file_index);
|
||||
GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
|
||||
file_index, false, next_smallest);
|
||||
return;
|
||||
}
|
||||
|
||||
if (next_smallest) {
|
||||
// next_smallest key only makes sense for non-level 0, where files are
|
||||
// non-overlapping
|
||||
*next_smallest = nullptr;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < level_files_brief_[level].num_files; ) {
|
||||
FdWithKeyRange* f = &(level_files_brief_[level].files[i++]);
|
||||
const Slice file_start = ExtractUserKey(f->smallest_key);
|
||||
@ -2183,7 +2189,7 @@ int sstableKeyCompare(const Comparator* user_cmp,
|
||||
void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
|
||||
int level, const InternalKey* begin, const InternalKey* end,
|
||||
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
|
||||
bool within_interval) const {
|
||||
bool within_interval, InternalKey** next_smallest) const {
|
||||
assert(level > 0);
|
||||
int min = 0;
|
||||
int mid = 0;
|
||||
@ -2219,6 +2225,9 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
|
||||
|
||||
// If there were no overlapping files, return immediately.
|
||||
if (!foundOverlap) {
|
||||
if (next_smallest) {
|
||||
next_smallest = nullptr;
|
||||
}
|
||||
return;
|
||||
}
|
||||
// returns the index where an overlap is found
|
||||
@ -2239,6 +2248,15 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
|
||||
for (int i = start_index; i <= end_index; i++) {
|
||||
inputs->push_back(files_[level][i]);
|
||||
}
|
||||
|
||||
if (next_smallest != nullptr) {
|
||||
// Provide the next key outside the range covered by inputs
|
||||
if (++end_index < static_cast<int>(files_[level].size())) {
|
||||
**next_smallest = files_[level][end_index]->smallest;
|
||||
} else {
|
||||
*next_smallest = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Store in *start_index and *end_index the range of all files in
|
||||
|
@ -188,9 +188,11 @@ class VersionStorageInfo {
|
||||
std::vector<FileMetaData*>* inputs,
|
||||
int hint_index = -1, // index of overlap file
|
||||
int* file_index = nullptr, // return index of overlap file
|
||||
bool expand_range = true) // if set, returns files which overlap the
|
||||
const; // range and overlap each other. If false,
|
||||
bool expand_range = true, // if set, returns files which overlap the
|
||||
// range and overlap each other. If false,
|
||||
// then just files intersecting the range
|
||||
InternalKey** next_smallest = nullptr) // if non-null, returns the
|
||||
const; // smallest key of next file not included
|
||||
void GetCleanInputsWithinInterval(
|
||||
int level, const InternalKey* begin, // nullptr means before all keys
|
||||
const InternalKey* end, // nullptr means after all keys
|
||||
@ -200,14 +202,15 @@ class VersionStorageInfo {
|
||||
const;
|
||||
|
||||
void GetOverlappingInputsRangeBinarySearch(
|
||||
int level, // level > 0
|
||||
int level, // level > 0
|
||||
const InternalKey* begin, // nullptr means before all keys
|
||||
const InternalKey* end, // nullptr means after all keys
|
||||
std::vector<FileMetaData*>* inputs,
|
||||
int hint_index, // index of overlap file
|
||||
int* file_index, // return index of overlap file
|
||||
bool within_interval = false) // if set, force the inputs within interval
|
||||
const;
|
||||
bool within_interval = false, // if set, force the inputs within interval
|
||||
InternalKey** next_smallest = nullptr) // if non-null, returns the
|
||||
const; // smallest key of next file not included
|
||||
|
||||
void ExtendFileRangeOverlappingInterval(
|
||||
int level,
|
||||
|
@ -24,7 +24,10 @@ WriteThread::WriteThread(const ImmutableDBOptions& db_options)
|
||||
enable_pipelined_write_(db_options.enable_pipelined_write),
|
||||
newest_writer_(nullptr),
|
||||
newest_memtable_writer_(nullptr),
|
||||
last_sequence_(0) {}
|
||||
last_sequence_(0),
|
||||
write_stall_dummy_(),
|
||||
stall_mu_(),
|
||||
stall_cv_(&stall_mu_) {}
|
||||
|
||||
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
|
||||
// We're going to block. Lazily create the mutex. We guarantee
|
||||
@ -219,6 +222,28 @@ bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
|
||||
assert(w->state == STATE_INIT);
|
||||
Writer* writers = newest_writer->load(std::memory_order_relaxed);
|
||||
while (true) {
|
||||
// If write stall in effect, and w->no_slowdown is not true,
|
||||
// block here until stall is cleared. If its true, then return
|
||||
// immediately
|
||||
if (writers == &write_stall_dummy_) {
|
||||
if (w->no_slowdown) {
|
||||
w->status = Status::Incomplete("Write stall");
|
||||
SetState(w, STATE_COMPLETED);
|
||||
return false;
|
||||
}
|
||||
// Since no_slowdown is false, wait here to be notified of the write
|
||||
// stall clearing
|
||||
{
|
||||
MutexLock lock(&stall_mu_);
|
||||
writers = newest_writer->load(std::memory_order_relaxed);
|
||||
if (writers == &write_stall_dummy_) {
|
||||
stall_cv_.Wait();
|
||||
// Load newest_writers_ again since it may have changed
|
||||
writers = newest_writer->load(std::memory_order_relaxed);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
w->link_older = writers;
|
||||
if (newest_writer->compare_exchange_weak(writers, w)) {
|
||||
return (writers == nullptr);
|
||||
@ -303,12 +328,44 @@ void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
|
||||
SetState(w, STATE_COMPLETED);
|
||||
}
|
||||
|
||||
void WriteThread::BeginWriteStall() {
|
||||
LinkOne(&write_stall_dummy_, &newest_writer_);
|
||||
|
||||
// Walk writer list until w->write_group != nullptr. The current write group
|
||||
// will not have a mix of slowdown/no_slowdown, so its ok to stop at that
|
||||
// point
|
||||
Writer* w = write_stall_dummy_.link_older;
|
||||
Writer* prev = &write_stall_dummy_;
|
||||
while (w != nullptr && w->write_group == nullptr) {
|
||||
if (w->no_slowdown) {
|
||||
prev->link_older = w->link_older;
|
||||
w->status = Status::Incomplete("Write stall");
|
||||
SetState(w, STATE_COMPLETED);
|
||||
w = prev->link_older;
|
||||
} else {
|
||||
prev = w;
|
||||
w = w->link_older;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WriteThread::EndWriteStall() {
|
||||
MutexLock lock(&stall_mu_);
|
||||
|
||||
assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
|
||||
newest_writer_.exchange(write_stall_dummy_.link_older);
|
||||
|
||||
// Wake up writers
|
||||
stall_cv_.SignalAll();
|
||||
}
|
||||
|
||||
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
|
||||
void WriteThread::JoinBatchGroup(Writer* w) {
|
||||
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
|
||||
assert(w->batch != nullptr);
|
||||
|
||||
bool linked_as_leader = LinkOne(w, &newest_writer_);
|
||||
|
||||
if (linked_as_leader) {
|
||||
SetState(w, STATE_GROUP_LEADER);
|
||||
}
|
||||
|
@ -342,6 +342,13 @@ class WriteThread {
|
||||
return last_sequence_;
|
||||
}
|
||||
|
||||
// Insert a dummy writer at the tail of the write queue to indicate a write
|
||||
// stall, and fail any writers in the queue with no_slowdown set to true
|
||||
void BeginWriteStall();
|
||||
|
||||
// Remove the dummy writer and wake up waiting writers
|
||||
void EndWriteStall();
|
||||
|
||||
private:
|
||||
// See AwaitState.
|
||||
const uint64_t max_yield_usec_;
|
||||
@ -365,6 +372,17 @@ class WriteThread {
|
||||
// is not necessary visible to reads because the writer can be ongoing.
|
||||
SequenceNumber last_sequence_;
|
||||
|
||||
// A dummy writer to indicate a write stall condition. This will be inserted
|
||||
// at the tail of the writer queue by the leader, so newer writers can just
|
||||
// check for this and bail
|
||||
Writer write_stall_dummy_;
|
||||
|
||||
// Mutex and condvar for writers to block on a write stall. During a write
|
||||
// stall, writers with no_slowdown set to false will wait on this rather
|
||||
// on the writer queue
|
||||
port::Mutex stall_mu_;
|
||||
port::CondVar stall_cv_;
|
||||
|
||||
// Waits for w->state & goal_mask using w->StateMutex(). Returns
|
||||
// the state that satisfies goal_mask.
|
||||
uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
|
||||
|
@ -1183,8 +1183,13 @@ struct FlushOptions {
|
||||
// If true, the flush will wait until the flush is done.
|
||||
// Default: true
|
||||
bool wait;
|
||||
|
||||
FlushOptions() : wait(true) {}
|
||||
// If true, the flush would proceed immediately even it means writes will
|
||||
// stall for the duration of the flush; if false the operation will wait
|
||||
// until it's possible to do flush w/o causing stall or until required flush
|
||||
// is performed by someone else (foreground call or background thread).
|
||||
// Default: false
|
||||
bool allow_write_stall;
|
||||
FlushOptions() : wait(true), allow_write_stall(false) {}
|
||||
};
|
||||
|
||||
// Create a Logger from provided DBOptions
|
||||
@ -1196,6 +1201,9 @@ extern Status CreateLoggerFromOptions(const std::string& dbname,
|
||||
struct CompactionOptions {
|
||||
// Compaction output compression type
|
||||
// Default: snappy
|
||||
// If set to `kDisableCompressionOption`, RocksDB will choose compression type
|
||||
// according to the `ColumnFamilyOptions`, taking into account the output
|
||||
// level if `compression_per_level` is specified.
|
||||
CompressionType compression;
|
||||
// Compaction will create files of size `output_file_size_limit`.
|
||||
// Default: MAX, which means that compaction will create a single file
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 5
|
||||
#define ROCKSDB_MINOR 16
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 6
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -612,7 +612,11 @@ void txn_write_kv_parts_helper(JNIEnv* env,
|
||||
const jint& jkey_parts_len,
|
||||
const jobjectArray& jvalue_parts,
|
||||
const jint& jvalue_parts_len) {
|
||||
#ifndef DEBUG
|
||||
(void) jvalue_parts_len;
|
||||
#else
|
||||
assert(jkey_parts_len == jvalue_parts_len);
|
||||
#endif
|
||||
|
||||
auto key_parts = std::vector<rocksdb::Slice>();
|
||||
auto value_parts = std::vector<rocksdb::Slice>();
|
||||
|
@ -306,7 +306,11 @@ rocksdb::Status WriteBatchHandlerJniCallback::PutBlobIndexCF(uint32_t column_fam
|
||||
}
|
||||
|
||||
rocksdb::Status WriteBatchHandlerJniCallback::MarkBeginPrepare(bool unprepare) {
|
||||
#ifndef DEBUG
|
||||
(void) unprepare;
|
||||
#else
|
||||
assert(!unprepare);
|
||||
#endif
|
||||
m_env->CallVoidMethod(m_jcallback_obj, m_jMarkBeginPrepareMethodId);
|
||||
|
||||
// check for Exception, in-particular RocksDBException
|
||||
|
2
src.mk
2
src.mk
@ -122,6 +122,7 @@ LIB_SOURCES = \
|
||||
table/plain_table_reader.cc \
|
||||
table/sst_file_writer.cc \
|
||||
table/table_properties.cc \
|
||||
tools/trace_analyzer_tool.cc \
|
||||
table/two_level_iterator.cc \
|
||||
tools/dump/db_dump_tool.cc \
|
||||
util/arena.cc \
|
||||
@ -161,6 +162,7 @@ LIB_SOURCES = \
|
||||
utilities/blob_db/blob_compaction_filter.cc \
|
||||
utilities/blob_db/blob_db.cc \
|
||||
utilities/blob_db/blob_db_impl.cc \
|
||||
utilities/blob_db/blob_db_impl_filesnapshot.cc \
|
||||
utilities/blob_db/blob_file.cc \
|
||||
utilities/blob_db/blob_log_format.cc \
|
||||
utilities/blob_db/blob_log_reader.cc \
|
||||
|
@ -235,7 +235,7 @@ void DataBlockIter::Seek(const Slice& target) {
|
||||
//
|
||||
// If the return value is FALSE, iter location is undefined, and it means:
|
||||
// 1) there is no key in this block falling into the range:
|
||||
// ["seek_user_key @ type | seqno", "seek_user_key @ type | 0"],
|
||||
// ["seek_user_key @ type | seqno", "seek_user_key @ kTypeDeletion | 0"],
|
||||
// inclusive; AND
|
||||
// 2) the last key of this block has a greater user_key from seek_user_key
|
||||
//
|
||||
@ -243,13 +243,21 @@ void DataBlockIter::Seek(const Slice& target) {
|
||||
// 1) If iter is valid, it is set to a location as if set by BinarySeek. In
|
||||
// this case, it points to the first key_ with a larger user_key or a
|
||||
// matching user_key with a seqno no greater than the seeking seqno.
|
||||
// 2) If the iter is invalid, it means either the block has no such user_key,
|
||||
// or the block ends with a matching user_key but with a larger seqno.
|
||||
// 2) If the iter is invalid, it means that either all the user_key is less
|
||||
// than the seek_user_key, or the block ends with a matching user_key but
|
||||
// with a smaller [ type | seqno ] (i.e. a larger seqno, or the same seqno
|
||||
// but larger type).
|
||||
bool DataBlockIter::SeekForGetImpl(const Slice& target) {
|
||||
Slice user_key = ExtractUserKey(target);
|
||||
uint32_t map_offset = restarts_ + num_restarts_ * sizeof(uint32_t);
|
||||
uint8_t entry = data_block_hash_index_->Lookup(data_, map_offset, user_key);
|
||||
|
||||
if (entry == kCollision) {
|
||||
// HashSeek not effective, falling back
|
||||
Seek(target);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (entry == kNoEntry) {
|
||||
// Even if we cannot find the user_key in this block, the result may
|
||||
// exist in the next block. Consider this exmpale:
|
||||
@ -260,16 +268,13 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
|
||||
//
|
||||
// If seek_key = axy@60, the search will starts from Block N.
|
||||
// Even if the user_key is not found in the hash map, the caller still
|
||||
// have to conntinue searching the next block. So we invalidate the
|
||||
// iterator to tell the caller to go on.
|
||||
current_ = restarts_; // Invalidate the iter
|
||||
return true;
|
||||
}
|
||||
|
||||
if (entry == kCollision) {
|
||||
// HashSeek not effective, falling back
|
||||
Seek(target);
|
||||
return true;
|
||||
// have to conntinue searching the next block.
|
||||
//
|
||||
// In this case, we pretend the key is the the last restart interval.
|
||||
// The while-loop below will search the last restart interval for the
|
||||
// key. It will stop at the first key that is larger than the seek_key,
|
||||
// or to the end of the block if no one is larger.
|
||||
entry = static_cast<uint8_t>(num_restarts_ - 1);
|
||||
}
|
||||
|
||||
uint32_t restart_index = entry;
|
||||
@ -299,24 +304,26 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
|
||||
}
|
||||
|
||||
if (current_ == restarts_) {
|
||||
// Search reaches to the end of the block. There are two possibilites;
|
||||
// Search reaches to the end of the block. There are three possibilites:
|
||||
// 1) there is only one user_key match in the block (otherwise collsion).
|
||||
// the matching user_key resides in the last restart interval.
|
||||
// it is the last key of the restart interval and of the block too.
|
||||
// ParseNextDataKey() skiped it as its seqno is newer.
|
||||
// the matching user_key resides in the last restart interval, and it
|
||||
// is the last key of the restart interval and of the block as well.
|
||||
// ParseNextDataKey() skiped it as its [ type | seqno ] is smaller.
|
||||
//
|
||||
// 2) The seek_key is a false positive and got hashed to the last restart
|
||||
// interval.
|
||||
// All existing keys in the restart interval are less than seek_key.
|
||||
// 2) The seek_key is not found in the HashIndex Lookup(), i.e. kNoEntry,
|
||||
// AND all existing user_keys in the restart interval are smaller than
|
||||
// seek_user_key.
|
||||
//
|
||||
// The result may exist in the next block in either case, so may_exist is
|
||||
// returned as true.
|
||||
// 3) The seek_key is a false positive and happens to be hashed to the
|
||||
// last restart interval, AND all existing user_keys in the restart
|
||||
// interval are smaller than seek_user_key.
|
||||
//
|
||||
// The result may exist in the next block each case, so we return true.
|
||||
return true;
|
||||
}
|
||||
|
||||
if (user_comparator_->Compare(key_.GetUserKey(), user_key) != 0) {
|
||||
// the key is not in this block and cannot be at the next block either.
|
||||
// return false to tell the caller to break from the top-level for-loop
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -349,10 +356,10 @@ void IndexBlockIter::Seek(const Slice& target) {
|
||||
ok = PrefixSeek(target, &index);
|
||||
} else if (value_delta_encoded_) {
|
||||
ok = BinarySeek<DecodeKeyV4>(seek_key, 0, num_restarts_ - 1, &index,
|
||||
active_comparator_);
|
||||
comparator_);
|
||||
} else {
|
||||
ok = BinarySeek<DecodeKey>(seek_key, 0, num_restarts_ - 1, &index,
|
||||
active_comparator_);
|
||||
comparator_);
|
||||
}
|
||||
|
||||
if (!ok) {
|
||||
@ -725,13 +732,16 @@ uint32_t Block::NumRestarts() const {
|
||||
uint32_t block_footer = DecodeFixed32(data_ + size_ - sizeof(uint32_t));
|
||||
uint32_t num_restarts = block_footer;
|
||||
if (size_ > kMaxBlockSizeSupportedByHashIndex) {
|
||||
// We ensure a block with HashIndex is less than 64KiB in BlockBuilder.
|
||||
// Therefore the footer cannot be encoded as a packed index type and
|
||||
// In BlockBuilder, we have ensured a block with HashIndex is less than
|
||||
// kMaxBlockSizeSupportedByHashIndex (64KiB).
|
||||
//
|
||||
// Therefore, if we encounter a block with a size > 64KiB, the block
|
||||
// cannot have HashIndex. So the footer will directly interpreted as
|
||||
// num_restarts.
|
||||
// Such check can ensure legacy block with a vary large num_restarts
|
||||
// i.e. >= 0x10000000 can be interpreted correctly as no HashIndex.
|
||||
// If a legacy block hash a num_restarts >= 0x10000000, size_ will be
|
||||
// much large than 64KiB.
|
||||
//
|
||||
// Such check is for backward compatibility. We can ensure legacy block
|
||||
// with a vary large num_restarts i.e. >= 0x80000000 can be interpreted
|
||||
// correctly as no HashIndex even if the MSB of num_restarts is set.
|
||||
return num_restarts;
|
||||
}
|
||||
BlockBasedTableOptions::DataBlockIndexType index_type;
|
||||
@ -752,7 +762,11 @@ BlockBasedTableOptions::DataBlockIndexType Block::IndexType() const {
|
||||
return index_type;
|
||||
}
|
||||
|
||||
Block::~Block() { TEST_SYNC_POINT("Block::~Block"); }
|
||||
Block::~Block() {
|
||||
// This sync point can be re-enabled if RocksDB can control the
|
||||
// initialization order of any/all static options created by the user.
|
||||
// TEST_SYNC_POINT("Block::~Block");
|
||||
}
|
||||
|
||||
Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
|
||||
size_t read_amp_bytes_per_bit, Statistics* statistics)
|
||||
|
@ -468,10 +468,10 @@ class IndexBlockIter final : public BlockIter<BlockHandle> {
|
||||
BlockPrefixIndex* prefix_index, bool key_includes_seq,
|
||||
bool value_is_full, bool block_contents_pinned,
|
||||
DataBlockHashIndex* /*data_block_hash_index*/) {
|
||||
InitializeBase(comparator, data, restarts, num_restarts,
|
||||
kDisableGlobalSequenceNumber, block_contents_pinned);
|
||||
InitializeBase(key_includes_seq ? comparator : user_comparator, data,
|
||||
restarts, num_restarts, kDisableGlobalSequenceNumber,
|
||||
block_contents_pinned);
|
||||
key_includes_seq_ = key_includes_seq;
|
||||
active_comparator_ = key_includes_seq_ ? comparator_ : user_comparator;
|
||||
key_.SetIsUserKey(!key_includes_seq_);
|
||||
prefix_index_ = prefix_index;
|
||||
value_delta_encoded_ = !value_is_full;
|
||||
@ -517,8 +517,6 @@ class IndexBlockIter final : public BlockIter<BlockHandle> {
|
||||
// Key is in InternalKey format
|
||||
bool key_includes_seq_;
|
||||
bool value_delta_encoded_;
|
||||
// key_includes_seq_ ? comparator_ : user_comparator_
|
||||
const Comparator* active_comparator_;
|
||||
BlockPrefixIndex* prefix_index_;
|
||||
// Whether the value is delta encoded. In that case the value is assumed to be
|
||||
// BlockHandle. The first value in each restart interval is the full encoded
|
||||
@ -535,11 +533,11 @@ class IndexBlockIter final : public BlockIter<BlockHandle> {
|
||||
inline int CompareBlockKey(uint32_t block_index, const Slice& target);
|
||||
|
||||
inline int Compare(const Slice& a, const Slice& b) const {
|
||||
return active_comparator_->Compare(a, b);
|
||||
return comparator_->Compare(a, b);
|
||||
}
|
||||
|
||||
inline int Compare(const IterKey& ikey, const Slice& b) const {
|
||||
return active_comparator_->Compare(ikey.GetKey(), b);
|
||||
return comparator_->Compare(ikey.GetKey(), b);
|
||||
}
|
||||
|
||||
inline bool ParseNextIndexKey();
|
||||
|
@ -54,13 +54,6 @@ void BlockHandle::EncodeTo(std::string* dst) const {
|
||||
PutVarint64Varint64(dst, offset_, size_);
|
||||
}
|
||||
|
||||
void BlockHandle::EncodeSizeTo(std::string* dst) const {
|
||||
// Sanity check that all fields have been set
|
||||
assert(offset_ != ~static_cast<uint64_t>(0));
|
||||
assert(size_ != ~static_cast<uint64_t>(0));
|
||||
PutVarint64(dst, size_);
|
||||
}
|
||||
|
||||
Status BlockHandle::DecodeFrom(Slice* input) {
|
||||
if (GetVarint64(input, &offset_) &&
|
||||
GetVarint64(input, &size_)) {
|
||||
|
@ -55,7 +55,6 @@ class BlockHandle {
|
||||
void EncodeTo(std::string* dst) const;
|
||||
Status DecodeFrom(Slice* input);
|
||||
Status DecodeSizeFrom(uint64_t offset, Slice* input);
|
||||
void EncodeSizeTo(std::string* dst) const;
|
||||
|
||||
// Return a string that contains the copy of handle.
|
||||
std::string ToString(bool hex = true) const;
|
||||
|
@ -79,7 +79,10 @@ Slice PartitionedFilterBlockBuilder::Finish(
|
||||
std::string handle_encoding;
|
||||
last_partition_block_handle.EncodeTo(&handle_encoding);
|
||||
std::string handle_delta_encoding;
|
||||
last_partition_block_handle.EncodeSizeTo(&handle_delta_encoding);
|
||||
PutVarsignedint64(
|
||||
&handle_delta_encoding,
|
||||
last_partition_block_handle.size() - last_encoded_handle_.size());
|
||||
last_encoded_handle_ = last_partition_block_handle;
|
||||
const Slice handle_delta_encoding_slice(handle_delta_encoding);
|
||||
index_on_filter_block_builder_.Add(last_entry.key, handle_encoding,
|
||||
&handle_delta_encoding_slice);
|
||||
|
@ -66,6 +66,7 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder {
|
||||
uint32_t filters_in_partition_;
|
||||
// Number of keys added
|
||||
size_t num_added_;
|
||||
BlockHandle last_encoded_handle_;
|
||||
};
|
||||
|
||||
class PartitionedFilterBlockReader : public FilterBlockReader,
|
||||
|
@ -50,7 +50,9 @@ class MockedBlockBasedTable : public BlockBasedTable {
|
||||
}
|
||||
};
|
||||
|
||||
class PartitionedFilterBlockTest : public testing::Test {
|
||||
class PartitionedFilterBlockTest
|
||||
: public testing::Test,
|
||||
virtual public ::testing::WithParamInterface<uint32_t> {
|
||||
public:
|
||||
BlockBasedTableOptions table_options_;
|
||||
InternalKeyComparator icomp = InternalKeyComparator(BytewiseComparator());
|
||||
@ -60,6 +62,8 @@ class PartitionedFilterBlockTest : public testing::Test {
|
||||
table_options_.no_block_cache = true; // Otherwise BlockBasedTable::Close
|
||||
// will access variable that are not
|
||||
// initialized in our mocked version
|
||||
table_options_.format_version = GetParam();
|
||||
table_options_.index_block_restart_interval = 3;
|
||||
}
|
||||
|
||||
std::shared_ptr<Cache> cache_;
|
||||
@ -279,14 +283,19 @@ class PartitionedFilterBlockTest : public testing::Test {
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(PartitionedFilterBlockTest, EmptyBuilder) {
|
||||
INSTANTIATE_TEST_CASE_P(FormatDef, PartitionedFilterBlockTest,
|
||||
testing::Values(test::kDefaultFormatVersion));
|
||||
INSTANTIATE_TEST_CASE_P(FormatLatest, PartitionedFilterBlockTest,
|
||||
testing::Values(test::kLatestFormatVersion));
|
||||
|
||||
TEST_P(PartitionedFilterBlockTest, EmptyBuilder) {
|
||||
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
|
||||
std::unique_ptr<PartitionedFilterBlockBuilder> builder(NewBuilder(pib.get()));
|
||||
const bool empty = true;
|
||||
VerifyReader(builder.get(), pib.get(), empty);
|
||||
}
|
||||
|
||||
TEST_F(PartitionedFilterBlockTest, OneBlock) {
|
||||
TEST_P(PartitionedFilterBlockTest, OneBlock) {
|
||||
uint64_t max_index_size = MaxIndexSize();
|
||||
for (uint64_t i = 1; i < max_index_size + 1; i++) {
|
||||
table_options_.metadata_block_size = i;
|
||||
@ -294,7 +303,7 @@ TEST_F(PartitionedFilterBlockTest, OneBlock) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(PartitionedFilterBlockTest, TwoBlocksPerKey) {
|
||||
TEST_P(PartitionedFilterBlockTest, TwoBlocksPerKey) {
|
||||
uint64_t max_index_size = MaxIndexSize();
|
||||
for (uint64_t i = 1; i < max_index_size + 1; i++) {
|
||||
table_options_.metadata_block_size = i;
|
||||
@ -304,7 +313,7 @@ TEST_F(PartitionedFilterBlockTest, TwoBlocksPerKey) {
|
||||
|
||||
// This reproduces the bug that a prefix is the same among multiple consecutive
|
||||
// blocks but the bug would add it only to the first block.
|
||||
TEST_F(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
|
||||
TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
|
||||
// some small number to cause partition cuts
|
||||
table_options_.metadata_block_size = 1;
|
||||
std::unique_ptr<const SliceTransform> prefix_extractor
|
||||
@ -330,7 +339,7 @@ TEST_F(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(PartitionedFilterBlockTest, OneBlockPerKey) {
|
||||
TEST_P(PartitionedFilterBlockTest, OneBlockPerKey) {
|
||||
uint64_t max_index_size = MaxIndexSize();
|
||||
for (uint64_t i = 1; i < max_index_size + 1; i++) {
|
||||
table_options_.metadata_block_size = i;
|
||||
@ -338,7 +347,7 @@ TEST_F(PartitionedFilterBlockTest, OneBlockPerKey) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(PartitionedFilterBlockTest, PartitionCount) {
|
||||
TEST_P(PartitionedFilterBlockTest, PartitionCount) {
|
||||
int num_keys = sizeof(keys) / sizeof(*keys);
|
||||
table_options_.metadata_block_size =
|
||||
std::max(MaxIndexSize(), MaxFilterSize());
|
||||
|
@ -197,6 +197,15 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
|
||||
return s;
|
||||
}
|
||||
|
||||
Status FaultInjectionTestEnv::NewRandomAccessFile(
|
||||
const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
|
||||
const EnvOptions& soptions) {
|
||||
if (!IsFilesystemActive()) {
|
||||
return GetError();
|
||||
}
|
||||
return target()->NewRandomAccessFile(fname, result, soptions);
|
||||
}
|
||||
|
||||
Status FaultInjectionTestEnv::DeleteFile(const std::string& f) {
|
||||
if (!IsFilesystemActive()) {
|
||||
return GetError();
|
||||
|
@ -111,6 +111,10 @@ class FaultInjectionTestEnv : public EnvWrapper {
|
||||
unique_ptr<WritableFile>* result,
|
||||
const EnvOptions& soptions) override;
|
||||
|
||||
Status NewRandomAccessFile(const std::string& fname,
|
||||
std::unique_ptr<RandomAccessFile>* result,
|
||||
const EnvOptions& soptions) override;
|
||||
|
||||
virtual Status DeleteFile(const std::string& f) override;
|
||||
|
||||
virtual Status RenameFile(const std::string& s,
|
||||
|
@ -304,13 +304,17 @@ void BlobDBImpl::CloseRandomAccessLocked(
|
||||
open_file_count_--;
|
||||
}
|
||||
|
||||
std::shared_ptr<RandomAccessFileReader> BlobDBImpl::GetOrOpenRandomAccessReader(
|
||||
const std::shared_ptr<BlobFile>& bfile, Env* env,
|
||||
const EnvOptions& env_options) {
|
||||
Status BlobDBImpl::GetBlobFileReader(
|
||||
const std::shared_ptr<BlobFile>& blob_file,
|
||||
std::shared_ptr<RandomAccessFileReader>* reader) {
|
||||
assert(reader != nullptr);
|
||||
bool fresh_open = false;
|
||||
auto rar = bfile->GetOrOpenRandomAccessReader(env, env_options, &fresh_open);
|
||||
if (fresh_open) open_file_count_++;
|
||||
return rar;
|
||||
Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
|
||||
if (s.ok() && fresh_open) {
|
||||
assert(*reader != nullptr);
|
||||
open_file_count_++;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
|
||||
@ -621,39 +625,6 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
|
||||
return db_->Write(options, blob_inserter.batch());
|
||||
}
|
||||
|
||||
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
||||
uint64_t* manifest_file_size,
|
||||
bool flush_memtable) {
|
||||
// Hold a lock in the beginning to avoid updates to base DB during the call
|
||||
ReadLock rl(&mutex_);
|
||||
Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
ret.reserve(ret.size() + blob_files_.size());
|
||||
for (auto bfile_pair : blob_files_) {
|
||||
auto blob_file = bfile_pair.second;
|
||||
ret.emplace_back(blob_file->PathName());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
||||
// Hold a lock in the beginning to avoid updates to base DB during the call
|
||||
ReadLock rl(&mutex_);
|
||||
db_->GetLiveFilesMetaData(metadata);
|
||||
for (auto bfile_pair : blob_files_) {
|
||||
auto blob_file = bfile_pair.second;
|
||||
LiveFileMetaData filemetadata;
|
||||
filemetadata.size = blob_file->GetFileSize();
|
||||
filemetadata.name = blob_file->PathName();
|
||||
auto cfh =
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
||||
filemetadata.column_family_name = cfh->GetName();
|
||||
metadata->emplace_back(filemetadata);
|
||||
}
|
||||
}
|
||||
|
||||
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
|
||||
const Slice& value) {
|
||||
return PutUntil(options, key, value, kNoExpiration);
|
||||
@ -1031,8 +1002,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||
}
|
||||
|
||||
// takes locks when called
|
||||
std::shared_ptr<RandomAccessFileReader> reader =
|
||||
GetOrOpenRandomAccessReader(bfile, env_, env_options_);
|
||||
std::shared_ptr<RandomAccessFileReader> reader;
|
||||
s = GetBlobFileReader(bfile, &reader);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
assert(blob_index.offset() > key.size() + sizeof(uint32_t));
|
||||
uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);
|
||||
@ -1680,16 +1654,21 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
||||
}
|
||||
|
||||
std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
|
||||
if (aborted) return std::make_pair(false, -1);
|
||||
if (aborted) {
|
||||
return std::make_pair(false, -1);
|
||||
}
|
||||
|
||||
{
|
||||
ReadLock rl(&mutex_);
|
||||
if (obsolete_files_.empty()) return std::make_pair(true, -1);
|
||||
MutexLock delete_file_lock(&delete_file_mutex_);
|
||||
if (disable_file_deletions_ > 0) {
|
||||
return std::make_pair(true, -1);
|
||||
}
|
||||
|
||||
std::list<std::shared_ptr<BlobFile>> tobsolete;
|
||||
{
|
||||
WriteLock wl(&mutex_);
|
||||
if (obsolete_files_.empty()) {
|
||||
return std::make_pair(true, -1);
|
||||
}
|
||||
tobsolete.swap(obsolete_files_);
|
||||
}
|
||||
|
||||
|
@ -155,12 +155,6 @@ class BlobDBImpl : public BlobDB {
|
||||
|
||||
virtual Status Close() override;
|
||||
|
||||
virtual Status GetLiveFiles(std::vector<std::string>&,
|
||||
uint64_t* manifest_file_size,
|
||||
bool flush_memtable = true) override;
|
||||
virtual void GetLiveFilesMetaData(
|
||||
std::vector<LiveFileMetaData>* ) override;
|
||||
|
||||
using BlobDB::PutWithTTL;
|
||||
Status PutWithTTL(const WriteOptions& options, const Slice& key,
|
||||
const Slice& value, uint64_t ttl) override;
|
||||
@ -175,6 +169,15 @@ class BlobDBImpl : public BlobDB {
|
||||
const DBOptions& db_options,
|
||||
const ColumnFamilyOptions& cf_options);
|
||||
|
||||
virtual Status DisableFileDeletions() override;
|
||||
|
||||
virtual Status EnableFileDeletions(bool force) override;
|
||||
|
||||
virtual Status GetLiveFiles(std::vector<std::string>&,
|
||||
uint64_t* manifest_file_size,
|
||||
bool flush_memtable = true) override;
|
||||
virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
|
||||
|
||||
~BlobDBImpl();
|
||||
|
||||
Status Open(std::vector<ColumnFamilyHandle*>* handles);
|
||||
@ -293,11 +296,8 @@ class BlobDBImpl : public BlobDB {
|
||||
// Open all blob files found in blob_dir.
|
||||
Status OpenAllBlobFiles();
|
||||
|
||||
// hold write mutex on file and call
|
||||
// creates a Random Access reader for GET call
|
||||
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
|
||||
const std::shared_ptr<BlobFile>& bfile, Env* env,
|
||||
const EnvOptions& env_options);
|
||||
Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
|
||||
std::shared_ptr<RandomAccessFileReader>* reader);
|
||||
|
||||
// hold write mutex on file and call.
|
||||
// Close the above Random Access reader
|
||||
@ -408,6 +408,26 @@ class BlobDBImpl : public BlobDB {
|
||||
|
||||
std::list<std::shared_ptr<BlobFile>> obsolete_files_;
|
||||
|
||||
// DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
|
||||
// on the mutex to avoid contention.
|
||||
//
|
||||
// While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
|
||||
// the difference. mutex_ only needs to be held when access the
|
||||
// data-structure, and delete_file_mutex_ needs to be held the whole time
|
||||
// during DeleteObsoleteFiles to avoid being run simultaneously with
|
||||
// DisableFileDeletions.
|
||||
//
|
||||
// If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
|
||||
// to hold delete_file_mutex_ first to avoid deadlock.
|
||||
mutable port::Mutex delete_file_mutex_;
|
||||
|
||||
// Each call of DisableFileDeletions will increase disable_file_deletion_
|
||||
// by 1. EnableFileDeletions will either decrease the count by 1 or reset
|
||||
// it to zeor, depending on the force flag.
|
||||
//
|
||||
// REQUIRES: access with delete_file_mutex_ held.
|
||||
int disable_file_deletions_ = 0;
|
||||
|
||||
uint32_t debug_level_;
|
||||
};
|
||||
|
||||
|
97
utilities/blob_db/blob_db_impl_filesnapshot.cc
Normal file
97
utilities/blob_db/blob_db_impl_filesnapshot.cc
Normal file
@ -0,0 +1,97 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "utilities/blob_db/blob_db_impl.h"
|
||||
|
||||
#include "util/logging.h"
|
||||
#include "util/mutexlock.h"
|
||||
|
||||
// BlobDBImpl methods to get snapshot of files, e.g. for replication.
|
||||
|
||||
namespace rocksdb {
|
||||
namespace blob_db {
|
||||
|
||||
Status BlobDBImpl::DisableFileDeletions() {
|
||||
// Disable base DB file deletions.
|
||||
Status s = db_impl_->DisableFileDeletions();
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
{
|
||||
// Hold delete_file_mutex_ to make sure no DeleteObsoleteFiles job
|
||||
// is running.
|
||||
MutexLock l(&delete_file_mutex_);
|
||||
count = ++disable_file_deletions_;
|
||||
}
|
||||
|
||||
ROCKS_LOG_INFO(db_options_.info_log,
|
||||
"Disalbed blob file deletions. count: %d", count);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlobDBImpl::EnableFileDeletions(bool force) {
|
||||
// Enable base DB file deletions.
|
||||
Status s = db_impl_->EnableFileDeletions(force);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
{
|
||||
MutexLock l(&delete_file_mutex_);
|
||||
if (force) {
|
||||
disable_file_deletions_ = 0;
|
||||
} else if (disable_file_deletions_ > 0) {
|
||||
count = --disable_file_deletions_;
|
||||
}
|
||||
assert(count >= 0);
|
||||
}
|
||||
|
||||
ROCKS_LOG_INFO(db_options_.info_log, "Enabled blob file deletions. count: %d",
|
||||
count);
|
||||
// Consider trigger DeleteobsoleteFiles once after re-enabled, if we are to
|
||||
// make DeleteobsoleteFiles re-run interval configuration.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
||||
uint64_t* manifest_file_size,
|
||||
bool flush_memtable) {
|
||||
// Hold a lock in the beginning to avoid updates to base DB during the call
|
||||
ReadLock rl(&mutex_);
|
||||
Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
ret.reserve(ret.size() + blob_files_.size());
|
||||
for (auto bfile_pair : blob_files_) {
|
||||
auto blob_file = bfile_pair.second;
|
||||
ret.emplace_back(blob_file->PathName());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
||||
// Hold a lock in the beginning to avoid updates to base DB during the call
|
||||
ReadLock rl(&mutex_);
|
||||
db_->GetLiveFilesMetaData(metadata);
|
||||
for (auto bfile_pair : blob_files_) {
|
||||
auto blob_file = bfile_pair.second;
|
||||
LiveFileMetaData filemetadata;
|
||||
filemetadata.size = blob_file->GetFileSize();
|
||||
filemetadata.name = blob_file->PathName();
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
||||
filemetadata.column_family_name = cfh->GetName();
|
||||
metadata->emplace_back(filemetadata);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace blob_db
|
||||
} // namespace rocksdb
|
||||
#endif // !ROCKSDB_LITE
|
@ -16,6 +16,7 @@
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/utilities/debug.h"
|
||||
#include "util/cast_util.h"
|
||||
#include "util/fault_injection_test_env.h"
|
||||
#include "util/random.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/sync_point.h"
|
||||
@ -40,6 +41,7 @@ class BlobDBTest : public testing::Test {
|
||||
BlobDBTest()
|
||||
: dbname_(test::PerThreadDBPath("blob_db_test")),
|
||||
mock_env_(new MockTimeEnv(Env::Default())),
|
||||
fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
|
||||
blob_db_(nullptr) {
|
||||
Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
|
||||
assert(s.ok());
|
||||
@ -236,6 +238,7 @@ class BlobDBTest : public testing::Test {
|
||||
|
||||
const std::string dbname_;
|
||||
std::unique_ptr<MockTimeEnv> mock_env_;
|
||||
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
|
||||
BlobDB *blob_db_;
|
||||
}; // class BlobDBTest
|
||||
|
||||
@ -354,6 +357,23 @@ TEST_F(BlobDBTest, GetExpiration) {
|
||||
ASSERT_EQ(300 /* = 100 + 200 */, expiration);
|
||||
}
|
||||
|
||||
TEST_F(BlobDBTest, GetIOError) {
|
||||
Options options;
|
||||
options.env = fault_injection_env_.get();
|
||||
BlobDBOptions bdb_options;
|
||||
bdb_options.min_blob_size = 0; // Make sure value write to blob file
|
||||
bdb_options.disable_background_tasks = true;
|
||||
Open(bdb_options, options);
|
||||
ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
|
||||
PinnableSlice value;
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
|
||||
Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
|
||||
ASSERT_TRUE(s.IsIOError());
|
||||
// Reactivate file system to allow test to close DB.
|
||||
fault_injection_env_->SetFilesystemActive(true);
|
||||
}
|
||||
|
||||
TEST_F(BlobDBTest, WriteBatch) {
|
||||
Random rnd(301);
|
||||
BlobDBOptions bdb_options;
|
||||
@ -461,7 +481,6 @@ TEST_F(BlobDBTest, DecompressAfterReopen) {
|
||||
Reopen(bdb_options);
|
||||
VerifyDB(data);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
TEST_F(BlobDBTest, MultipleWriters) {
|
||||
@ -1415,6 +1434,47 @@ TEST_F(BlobDBTest, EvictExpiredFile) {
|
||||
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
}
|
||||
|
||||
TEST_F(BlobDBTest, DisableFileDeletions) {
|
||||
BlobDBOptions bdb_options;
|
||||
bdb_options.disable_background_tasks = true;
|
||||
Open(bdb_options);
|
||||
std::map<std::string, std::string> data;
|
||||
for (bool force : {true, false}) {
|
||||
ASSERT_OK(Put("foo", "v", &data));
|
||||
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
|
||||
ASSERT_EQ(1, blob_files.size());
|
||||
auto blob_file = blob_files[0];
|
||||
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
|
||||
blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
// Call DisableFileDeletions twice.
|
||||
ASSERT_OK(blob_db_->DisableFileDeletions());
|
||||
ASSERT_OK(blob_db_->DisableFileDeletions());
|
||||
// File deletions should be disabled.
|
||||
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
VerifyDB(data);
|
||||
// Enable file deletions once. If force=true, file deletion is enabled.
|
||||
// Otherwise it needs to enable it for a second time.
|
||||
ASSERT_OK(blob_db_->EnableFileDeletions(force));
|
||||
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||
if (!force) {
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
VerifyDB(data);
|
||||
// Call EnableFileDeletions a second time.
|
||||
ASSERT_OK(blob_db_->EnableFileDeletions(false));
|
||||
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||
}
|
||||
// Regardless of value of `force`, file should be deleted by now.
|
||||
ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
VerifyDB({});
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace blob_db
|
||||
} // namespace rocksdb
|
||||
|
||||
|
@ -191,36 +191,48 @@ void BlobFile::CloseRandomAccessLocked() {
|
||||
last_access_ = -1;
|
||||
}
|
||||
|
||||
std::shared_ptr<RandomAccessFileReader> BlobFile::GetOrOpenRandomAccessReader(
|
||||
Env* env, const EnvOptions& env_options, bool* fresh_open) {
|
||||
Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
|
||||
std::shared_ptr<RandomAccessFileReader>* reader,
|
||||
bool* fresh_open) {
|
||||
assert(reader != nullptr);
|
||||
assert(fresh_open != nullptr);
|
||||
*fresh_open = false;
|
||||
int64_t current_time = 0;
|
||||
env->GetCurrentTime(¤t_time);
|
||||
last_access_.store(current_time);
|
||||
Status s;
|
||||
|
||||
{
|
||||
ReadLock lockbfile_r(&mutex_);
|
||||
if (ra_file_reader_) return ra_file_reader_;
|
||||
if (ra_file_reader_) {
|
||||
*reader = ra_file_reader_;
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
WriteLock lockbfile_w(&mutex_);
|
||||
if (ra_file_reader_) return ra_file_reader_;
|
||||
// Double check.
|
||||
if (ra_file_reader_) {
|
||||
*reader = ra_file_reader_;
|
||||
return s;
|
||||
}
|
||||
|
||||
std::unique_ptr<RandomAccessFile> rfile;
|
||||
Status s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
|
||||
s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(info_log_,
|
||||
"Failed to open blob file for random-read: %s status: '%s'"
|
||||
" exists: '%s'",
|
||||
PathName().c_str(), s.ToString().c_str(),
|
||||
env->FileExists(PathName()).ToString().c_str());
|
||||
return nullptr;
|
||||
return s;
|
||||
}
|
||||
|
||||
ra_file_reader_ = std::make_shared<RandomAccessFileReader>(std::move(rfile),
|
||||
PathName());
|
||||
*reader = ra_file_reader_;
|
||||
*fresh_open = true;
|
||||
return ra_file_reader_;
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
|
||||
|
@ -181,6 +181,10 @@ class BlobFile {
|
||||
// footer_valid_ to false and return Status::OK.
|
||||
Status ReadMetadata(Env* env, const EnvOptions& env_options);
|
||||
|
||||
Status GetReader(Env* env, const EnvOptions& env_options,
|
||||
std::shared_ptr<RandomAccessFileReader>* reader,
|
||||
bool* fresh_open);
|
||||
|
||||
private:
|
||||
std::shared_ptr<Reader> OpenRandomAccessReader(
|
||||
Env* env, const DBOptions& db_options,
|
||||
@ -190,9 +194,6 @@ class BlobFile {
|
||||
|
||||
Status WriteFooterAndCloseLocked();
|
||||
|
||||
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
|
||||
Env* env, const EnvOptions& env_options, bool* fresh_open);
|
||||
|
||||
void CloseRandomAccessLocked();
|
||||
|
||||
// this is used, when you are reading only the footer of a
|
||||
|
@ -1717,7 +1717,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
|
||||
}
|
||||
|
||||
// flush only cfa memtable
|
||||
s = db_impl->TEST_FlushMemTable(true, cfa);
|
||||
s = db_impl->TEST_FlushMemTable(true, false, cfa);
|
||||
ASSERT_OK(s);
|
||||
|
||||
switch (txn_db_options.write_policy) {
|
||||
@ -1736,7 +1736,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
|
||||
}
|
||||
|
||||
// flush only cfb memtable
|
||||
s = db_impl->TEST_FlushMemTable(true, cfb);
|
||||
s = db_impl->TEST_FlushMemTable(true, false, cfb);
|
||||
ASSERT_OK(s);
|
||||
|
||||
// should show not dependency on logs
|
||||
@ -3786,7 +3786,7 @@ TEST_P(TransactionTest, SavepointTest3) {
|
||||
|
||||
s = txn1->Put("A", "");
|
||||
ASSERT_OK(s);
|
||||
|
||||
|
||||
s = txn1->PopSavePoint(); // Still no SavePoint present
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
|
||||
@ -3796,21 +3796,21 @@ TEST_P(TransactionTest, SavepointTest3) {
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn1->PopSavePoint(); // Remove 1
|
||||
ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
|
||||
ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
|
||||
|
||||
// Verify that "A" is still locked
|
||||
// Verify that "A" is still locked
|
||||
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn2);
|
||||
|
||||
s = txn2->Put("A", "a2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
delete txn2;
|
||||
|
||||
|
||||
txn1->SetSavePoint(); // 2
|
||||
|
||||
s = txn1->Put("B", "b");
|
||||
ASSERT_OK(s);
|
||||
|
||||
|
||||
txn1->SetSavePoint(); // 3
|
||||
|
||||
s = txn1->Put("B", "b2");
|
||||
@ -3820,7 +3820,7 @@ TEST_P(TransactionTest, SavepointTest3) {
|
||||
|
||||
s = txn1->PopSavePoint();
|
||||
ASSERT_OK(s);
|
||||
|
||||
|
||||
s = txn1->PopSavePoint();
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
|
||||
@ -3834,12 +3834,12 @@ TEST_P(TransactionTest, SavepointTest3) {
|
||||
s = db->Get(read_options, "A", &value);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("a", value);
|
||||
|
||||
|
||||
// tnx1 should have set "B" to just "b"
|
||||
s = db->Get(read_options, "B", &value);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("b", value);
|
||||
|
||||
|
||||
s = db->Get(read_options, "C", &value);
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
}
|
||||
@ -5511,7 +5511,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
|
||||
db->FlushWAL(true);
|
||||
// Flush only cf 1
|
||||
reinterpret_cast<DBImpl*>(db->GetRootDB())
|
||||
->TEST_FlushMemTable(true, handles[1]);
|
||||
->TEST_FlushMemTable(true, false, handles[1]);
|
||||
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
|
||||
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
|
||||
txn0 = db->GetTransactionByName("xid");
|
||||
@ -5549,7 +5549,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
|
||||
ASSERT_OK(db->FlushWAL(true));
|
||||
// Flush only cf 1
|
||||
reinterpret_cast<DBImpl*>(db->GetRootDB())
|
||||
->TEST_FlushMemTable(true, handles[1]);
|
||||
->TEST_FlushMemTable(true, false, handles[1]);
|
||||
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
|
||||
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
|
||||
txn0 = db->GetTransactionByName("xid");
|
||||
@ -5582,7 +5582,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
|
||||
ASSERT_OK(db->FlushWAL(true));
|
||||
// Flush only cf 1
|
||||
reinterpret_cast<DBImpl*>(db->GetRootDB())
|
||||
->TEST_FlushMemTable(true, handles[1]);
|
||||
->TEST_FlushMemTable(true, false, handles[1]);
|
||||
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
|
||||
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
|
||||
txn0 = db->GetTransactionByName("xid");
|
||||
@ -5609,7 +5609,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
|
||||
ASSERT_OK(db->FlushWAL(true));
|
||||
// Flush only cf 1
|
||||
reinterpret_cast<DBImpl*>(db->GetRootDB())
|
||||
->TEST_FlushMemTable(true, handles[1]);
|
||||
->TEST_FlushMemTable(true, false, handles[1]);
|
||||
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
|
||||
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
|
||||
txn0 = db->GetTransactionByName("xid");
|
||||
@ -5636,7 +5636,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
|
||||
ASSERT_OK(db->FlushWAL(true));
|
||||
// Flush only cf 1
|
||||
reinterpret_cast<DBImpl*>(db->GetRootDB())
|
||||
->TEST_FlushMemTable(true, handles[1]);
|
||||
->TEST_FlushMemTable(true, false, handles[1]);
|
||||
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
|
||||
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
|
||||
txn0 = db->GetTransactionByName("xid");
|
||||
|
@ -352,14 +352,16 @@ class WBWIIteratorImpl : public WBWIIterator {
|
||||
}
|
||||
|
||||
virtual void SeekToFirst() override {
|
||||
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
|
||||
column_family_id_, 0, 0);
|
||||
WriteBatchIndexEntry search_entry(
|
||||
nullptr /* search_key */, column_family_id_,
|
||||
true /* is_forward_direction */, true /* is_seek_to_first */);
|
||||
skip_list_iter_.Seek(&search_entry);
|
||||
}
|
||||
|
||||
virtual void SeekToLast() override {
|
||||
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
|
||||
column_family_id_ + 1, 0, 0);
|
||||
WriteBatchIndexEntry search_entry(
|
||||
nullptr /* search_key */, column_family_id_ + 1,
|
||||
true /* is_forward_direction */, true /* is_seek_to_first */);
|
||||
skip_list_iter_.Seek(&search_entry);
|
||||
if (!skip_list_iter_.Valid()) {
|
||||
skip_list_iter_.SeekToLast();
|
||||
@ -369,12 +371,16 @@ class WBWIIteratorImpl : public WBWIIterator {
|
||||
}
|
||||
|
||||
virtual void Seek(const Slice& key) override {
|
||||
WriteBatchIndexEntry search_entry(&key, column_family_id_);
|
||||
WriteBatchIndexEntry search_entry(&key, column_family_id_,
|
||||
true /* is_forward_direction */,
|
||||
false /* is_seek_to_first */);
|
||||
skip_list_iter_.Seek(&search_entry);
|
||||
}
|
||||
|
||||
virtual void SeekForPrev(const Slice& key) override {
|
||||
WriteBatchIndexEntry search_entry(&key, column_family_id_);
|
||||
WriteBatchIndexEntry search_entry(&key, column_family_id_,
|
||||
false /* is_forward_direction */,
|
||||
false /* is_seek_to_first */);
|
||||
skip_list_iter_.SeekForPrev(&search_entry);
|
||||
}
|
||||
|
||||
|
@ -85,6 +85,20 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// If both of `entry1` and `entry2` point to real entry in write batch, we
|
||||
// compare the entries as following:
|
||||
// 1. first compare the column family, the one with larger CF will be larger;
|
||||
// 2. Inside the same CF, we first decode the entry to find the key of the entry
|
||||
// and the entry with larger key will be larger;
|
||||
// 3. If two entries are of the same CF and offset, the one with larger offset
|
||||
// will be larger.
|
||||
// Some times either `entry1` or `entry2` is dummy entry, which is actually
|
||||
// a search key. In this case, in step 2, we don't go ahead and decode the
|
||||
// entry but use the value in WriteBatchIndexEntry::search_key.
|
||||
// One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf.
|
||||
// This indicate that we are going to seek to the first of the column family.
|
||||
// Once we see this, this entry will be smaller than all the real entries of
|
||||
// the column family.
|
||||
int WriteBatchEntryComparator::operator()(
|
||||
const WriteBatchIndexEntry* entry1,
|
||||
const WriteBatchIndexEntry* entry2) const {
|
||||
@ -94,9 +108,10 @@ int WriteBatchEntryComparator::operator()(
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
|
||||
// Deal with special case of seeking to the beginning of a column family
|
||||
if (entry1->is_min_in_cf()) {
|
||||
return -1;
|
||||
} else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
|
||||
} else if (entry2->is_min_in_cf()) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -31,21 +31,52 @@ struct WriteBatchIndexEntry {
|
||||
key_offset(ko),
|
||||
key_size(ksz),
|
||||
search_key(nullptr) {}
|
||||
WriteBatchIndexEntry(const Slice* sk, uint32_t c)
|
||||
: offset(0),
|
||||
column_family(c),
|
||||
// Create a dummy entry as the search key. This index entry won't be backed
|
||||
// by an entry from the write batch, but a pointer to the search key. Or a
|
||||
// special flag of offset can indicate we are seek to first.
|
||||
// @_search_key: the search key
|
||||
// @_column_family: column family
|
||||
// @is_forward_direction: true for Seek(). False for SeekForPrev()
|
||||
// @is_seek_to_first: true if we seek to the beginning of the column family
|
||||
// _search_key should be null in this case.
|
||||
WriteBatchIndexEntry(const Slice* _search_key, uint32_t _column_family,
|
||||
bool is_forward_direction, bool is_seek_to_first)
|
||||
// For SeekForPrev(), we need to make the dummy entry larger than any
|
||||
// entry who has the same search key. Otherwise, we'll miss those entries.
|
||||
: offset(is_forward_direction ? 0 : port::kMaxSizet),
|
||||
column_family(_column_family),
|
||||
key_offset(0),
|
||||
key_size(0),
|
||||
search_key(sk) {}
|
||||
key_size(is_seek_to_first ? kFlagMinInCf : 0),
|
||||
search_key(_search_key) {
|
||||
assert(_search_key != nullptr || is_seek_to_first);
|
||||
}
|
||||
|
||||
// If this flag appears in the offset, it indicates a key that is smaller
|
||||
// than any other entry for the same column family
|
||||
static const size_t kFlagMin = port::kMaxSizet;
|
||||
// If this flag appears in the key_size, it indicates a
|
||||
// key that is smaller than any other entry for the same column family.
|
||||
static const size_t kFlagMinInCf = port::kMaxSizet;
|
||||
|
||||
size_t offset; // offset of an entry in write batch's string buffer.
|
||||
uint32_t column_family; // column family of the entry.
|
||||
bool is_min_in_cf() const {
|
||||
assert(key_size != kFlagMinInCf ||
|
||||
(key_offset == 0 && search_key == nullptr));
|
||||
return key_size == kFlagMinInCf;
|
||||
}
|
||||
|
||||
// offset of an entry in write batch's string buffer. If this is a dummy
|
||||
// lookup key, in which case search_key != nullptr, offset is set to either
|
||||
// 0 or max, only for comparison purpose. Because when entries have the same
|
||||
// key, the entry with larger offset is larger, offset = 0 will make a seek
|
||||
// key small or equal than all the entries with the seek key, so that Seek()
|
||||
// will find all the entries of the same key. Similarly, offset = MAX will
|
||||
// make the entry just larger than all entries with the search key so
|
||||
// SeekForPrev() will see all the keys with the same key.
|
||||
size_t offset;
|
||||
uint32_t column_family; // c1olumn family of the entry.
|
||||
size_t key_offset; // offset of the key in write batch's string buffer.
|
||||
size_t key_size; // size of the key.
|
||||
size_t key_size; // size of the key. kFlagMinInCf indicates
|
||||
// that this is a dummy look up entry for
|
||||
// SeekToFirst() to the beginning of the column
|
||||
// family. We use the flag here to save a boolean
|
||||
// in the struct.
|
||||
|
||||
const Slice* search_key; // if not null, instead of reading keys from
|
||||
// write batch, use it to compare. This is used
|
||||
|
@ -621,7 +621,7 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
|
||||
for (int i = 0; i < 128; i++) {
|
||||
// Random walk and make sure iter and result_iter returns the
|
||||
// same key and value
|
||||
int type = rnd.Uniform(5);
|
||||
int type = rnd.Uniform(6);
|
||||
ASSERT_OK(iter->status());
|
||||
switch (type) {
|
||||
case 0:
|
||||
@ -642,7 +642,15 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
|
||||
result_iter->Seek(key);
|
||||
break;
|
||||
}
|
||||
case 3:
|
||||
case 3: {
|
||||
// SeekForPrev to random key
|
||||
auto key_idx = rnd.Uniform(static_cast<int>(source_strings.size()));
|
||||
auto key = source_strings[key_idx];
|
||||
iter->SeekForPrev(key);
|
||||
result_iter->SeekForPrev(key);
|
||||
break;
|
||||
}
|
||||
case 4:
|
||||
// Next
|
||||
if (is_valid) {
|
||||
iter->Next();
|
||||
@ -652,7 +660,7 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
|
||||
}
|
||||
break;
|
||||
default:
|
||||
assert(type == 4);
|
||||
assert(type == 5);
|
||||
// Prev
|
||||
if (is_valid) {
|
||||
iter->Prev();
|
||||
|
Loading…
Reference in New Issue
Block a user