Compare commits
38 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
a180b60555 | ||
|
cb9d151aa8 | ||
|
d5c0e30b62 | ||
|
07e4776c85 | ||
|
16def5c15f | ||
|
ed391665c8 | ||
|
e448f5465f | ||
|
b2bbca8b60 | ||
|
e1d66b3f82 | ||
|
2984ee8ab8 | ||
|
cd827175a0 | ||
|
d4d5acbf87 | ||
|
af8fdd77ea | ||
|
14e9876808 | ||
|
a9a6a2f434 | ||
|
be5cb800ef | ||
|
0fb70a800e | ||
|
8dbf5a39bd | ||
|
0798a49a28 | ||
|
555f3be85b | ||
|
e29a73b89e | ||
|
0b7dd9131f | ||
|
8dcaa16b54 | ||
|
1d2d22e738 | ||
|
fbd60cf3c4 | ||
|
c5024c03b6 | ||
|
074154582a | ||
|
3967a4f0d1 | ||
|
9a4d86b3da | ||
|
0833b75104 | ||
|
664acfa087 | ||
|
66a007f832 | ||
|
5559488e92 | ||
|
d0ce28bb27 | ||
|
248802038b | ||
|
af06d1a871 | ||
|
5266ed3cc5 | ||
|
7584091c7f |
23
.travis.yml
23
.travis.yml
@ -9,17 +9,32 @@ matrix:
|
||||
addons:
|
||||
apt:
|
||||
sources: ['ubuntu-toolchain-r-test', 'llvm-toolchain-precise-3.6']
|
||||
packages: ['clang-3.6', 'zlib1g-dev', 'libbz2-dev', 'libsnappy-dev']
|
||||
packages: ['clang-3.6', 'clang-format-3.6', 'zlib1g-dev', 'libbz2-dev', 'libsnappy-dev', 'curl']
|
||||
- os: osx
|
||||
compiler: clang
|
||||
|
||||
install:
|
||||
# Build gflags
|
||||
# TODO(noetzli): Remove when gflags available through Travis
|
||||
- pushd /tmp/ && curl -L https://github.com/gflags/gflags/archive/v2.1.2.tar.gz -o gflags.tar.gz && tar xfz gflags.tar.gz && cd gflags-2.1.2 && cmake . && make && popd
|
||||
# Download clang-format-diff.py to check source code formatting
|
||||
- pushd /tmp/ && curl -L http://llvm.org/svn/llvm-project/cfe/trunk/tools/clang-format/clang-format-diff.py -o clang-format-diff.py && chmod +x clang-format-diff.py && popd
|
||||
|
||||
before_script:
|
||||
- if [ -n "${COMPILER}" ]; then CXX=${COMPILER}; fi
|
||||
- if [[ $(uname -s) == 'Darwin' ]]; then brew install gflags snappy; fi
|
||||
# Add gflags to include/library paths
|
||||
# TODO(noetzli): Remove when gflags available through Travis
|
||||
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/tmp/gflags-2.1.2/lib"
|
||||
- export LIBRARY_PATH="$LIBRARY_PATH:/tmp/gflags-2.1.2/lib"
|
||||
- export CPLUS_INCLUDE_PATH="$CPLUS_INCLUDE_PATH:/tmp/gflags-2.1.2/include"
|
||||
- if [ -n "${COMPILER}" ]; then CXX=${COMPILER}; fi
|
||||
- if [[ "${TRAVIS_OS_NAME}" == 'osx' ]]; then brew install gflags snappy; fi
|
||||
- ulimit -n 2000 || true
|
||||
|
||||
# Lousy hack to disable use and testing of fallocate, which doesn't behave quite
|
||||
# as EnvPosixTest::AllocateTest expects within the Travis OpenVZ environment.
|
||||
script: OPT=-DTRAVIS V=1 make -j4 check && OPT=-DTRAVIS V=1 make clean jclean rocksdbjava jtest
|
||||
script:
|
||||
- if [[ "${TRAVIS_OS_NAME}" == 'linux' ]]; then OPT=-DTRAVIS CLANG_FORMAT_DIFF=/tmp/clang-format-diff.py make format; fi
|
||||
- OPT=-DTRAVIS V=1 make -j4 check && OPT=-DTRAVIS V=1 make clean jclean rocksdbjava jtest
|
||||
|
||||
notifications:
|
||||
email:
|
||||
|
41
Makefile
41
Makefile
@ -63,6 +63,14 @@ ifeq ($(MAKECMDGOALS),rocksdbjavastatic)
|
||||
DEBUG_LEVEL=0
|
||||
endif
|
||||
|
||||
ifeq ($(MAKECMDGOALS),rocksdbjavastaticrelease)
|
||||
DEBUG_LEVEL=0
|
||||
endif
|
||||
|
||||
ifeq ($(MAKECMDGOALS),rocksdbjavastaticpublish)
|
||||
DEBUG_LEVEL=0
|
||||
endif
|
||||
|
||||
# compile with -O2 if debug level is not 2
|
||||
ifneq ($(DEBUG_LEVEL), 2)
|
||||
OPT += -O2 -fno-omit-frame-pointer
|
||||
@ -167,10 +175,6 @@ default: all
|
||||
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
|
||||
-Wno-unused-parameter
|
||||
|
||||
ifndef DISABLE_WARNING_AS_ERROR
|
||||
WARNING_FLAGS += -Werror
|
||||
endif
|
||||
|
||||
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
|
||||
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers
|
||||
|
||||
@ -1021,21 +1025,23 @@ liblz4.a:
|
||||
cd lz4-r127/lib && make CFLAGS='-fPIC' all
|
||||
cp lz4-r127/lib/liblz4.a .
|
||||
|
||||
# A version of each $(LIBOBJECTS) compiled with -fPIC
|
||||
java_libobjects = $(patsubst %,jl/%,$(LIBOBJECTS))
|
||||
CLEAN_FILES += jl
|
||||
# A version of each $(LIBOBJECTS) compiled with -fPIC and a fixed set of static compression libraries
|
||||
java_static_libobjects = $(patsubst %,jls/%,$(LIBOBJECTS))
|
||||
CLEAN_FILES += jls
|
||||
|
||||
$(java_libobjects): jl/%.o: %.cc
|
||||
$(AM_V_CC)mkdir -p $(@D)
|
||||
@$(CXX) $(CXXFLAGS) -fPIC -c $< -o $@ $(COVERAGEFLAGS)
|
||||
JAVA_STATIC_FLAGS = -DZLIB -DBZIP2 -DSNAPPY -DLZ4
|
||||
JAVA_STATIC_INCLUDES = -I./zlib-1.2.8 -I./bzip2-1.0.6 -I./snappy-1.1.1 -I./lz4-r127/lib
|
||||
|
||||
rocksdbjavastatic: $(java_libobjects) libz.a libbz2.a libsnappy.a liblz4.a
|
||||
$(java_static_libobjects): jls/%.o: %.cc libz.a libbz2.a libsnappy.a liblz4.a
|
||||
$(AM_V_CC)mkdir -p $(@D) && $(CXX) $(CXXFLAGS) $(JAVA_STATIC_FLAGS) $(JAVA_STATIC_INCLUDES) -fPIC -c $< -o $@ $(COVERAGEFLAGS)
|
||||
|
||||
rocksdbjavastatic: $(java_static_libobjects)
|
||||
cd java;$(MAKE) javalib;
|
||||
rm -f ./java/target/$(ROCKSDBJNILIB)
|
||||
$(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC \
|
||||
-o ./java/target/$(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) \
|
||||
$(java_libobjects) $(COVERAGEFLAGS) \
|
||||
libz.a libbz2.a libsnappy.a liblz4.a $(LDFLAGS)
|
||||
$(java_static_libobjects) $(COVERAGEFLAGS) \
|
||||
libz.a libbz2.a libsnappy.a liblz4.a $(JAVA_STATIC_LDFLAGS)
|
||||
cd java/target;strip -S -x $(ROCKSDBJNILIB)
|
||||
cd java;jar -cf target/$(ROCKSDB_JAR) HISTORY*.md
|
||||
cd java/target;jar -uf $(ROCKSDB_JAR) $(ROCKSDBJNILIB)
|
||||
@ -1046,7 +1052,7 @@ rocksdbjavastatic: $(java_libobjects) libz.a libbz2.a libsnappy.a liblz4.a
|
||||
rocksdbjavastaticrelease: rocksdbjavastatic
|
||||
cd java/crossbuild && vagrant destroy -f && vagrant up linux32 && vagrant halt linux32 && vagrant up linux64 && vagrant halt linux64
|
||||
cd java;jar -cf target/$(ROCKSDB_JAR_ALL) HISTORY*.md
|
||||
cd java;jar -uf target/$(ROCKSDB_JAR_ALL) librocksdbjni-*.so librocksdbjni-*.jnilib
|
||||
cd java/target;jar -uf $(ROCKSDB_JAR_ALL) librocksdbjni-*.so librocksdbjni-*.jnilib
|
||||
cd java/target/classes;jar -uf ../$(ROCKSDB_JAR_ALL) org/rocksdb/*.class org/rocksdb/util/*.class
|
||||
|
||||
rocksdbjavastaticpublish: rocksdbjavastaticrelease
|
||||
@ -1057,6 +1063,13 @@ rocksdbjavastaticpublish: rocksdbjavastaticrelease
|
||||
mvn gpg:sign-and-deploy-file -Durl=https://oss.sonatype.org/service/local/staging/deploy/maven2/ -DrepositoryId=sonatype-nexus-staging -DpomFile=java/rocksjni.pom -Dfile=java/target/rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH)-osx.jar -Dclassifier=osx
|
||||
mvn gpg:sign-and-deploy-file -Durl=https://oss.sonatype.org/service/local/staging/deploy/maven2/ -DrepositoryId=sonatype-nexus-staging -DpomFile=java/rocksjni.pom -Dfile=java/target/rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH).jar
|
||||
|
||||
# A version of each $(LIBOBJECTS) compiled with -fPIC
|
||||
java_libobjects = $(patsubst %,jl/%,$(LIBOBJECTS))
|
||||
CLEAN_FILES += jl
|
||||
|
||||
$(java_libobjects): jl/%.o: %.cc
|
||||
$(AM_V_CC)mkdir -p $(@D) && $(CXX) $(CXXFLAGS) -fPIC -c $< -o $@ $(COVERAGEFLAGS)
|
||||
|
||||
rocksdbjava: $(java_libobjects)
|
||||
cd java;$(MAKE) javalib;
|
||||
rm -f ./java/target/$(ROCKSDBJNILIB)
|
||||
|
@ -8,6 +8,7 @@
|
||||
# CXX C++ Compiler path
|
||||
# PLATFORM_LDFLAGS Linker flags
|
||||
# JAVA_LDFLAGS Linker flags for RocksDBJava
|
||||
# JAVA_STATIC_LDFLAGS Linker flags for RocksDBJava static build
|
||||
# PLATFORM_SHARED_EXT Extension for shared libraries
|
||||
# PLATFORM_SHARED_LDFLAGS Flags for building shared library
|
||||
# PLATFORM_SHARED_CFLAGS Flags for compiling objects for shared library
|
||||
@ -51,12 +52,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
FBCODE_BUILD="true"
|
||||
# If we're compiling with TSAN we need pic build
|
||||
PIC_BUILD=$COMPILE_WITH_TSAN
|
||||
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
# we need this to build with MySQL. Don't use for other purposes.
|
||||
source "$PWD/build_tools/fbcode_config4.8.1.sh"
|
||||
fi
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
fi
|
||||
|
||||
# Delete existing output, if it exists
|
||||
@ -181,6 +177,7 @@ esac
|
||||
|
||||
PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS ${CXXFLAGS}"
|
||||
JAVA_LDFLAGS="$PLATFORM_LDFLAGS"
|
||||
JAVA_STATIC_LDFLAGS="$PLATFORM_LDFLAGS"
|
||||
|
||||
if [ "$CROSS_COMPILE" = "true" -o "$FBCODE_BUILD" = "true" ]; then
|
||||
# Cross-compiling; do not try any compilation tests.
|
||||
@ -374,6 +371,7 @@ echo "CXX=$CXX" >> "$OUTPUT"
|
||||
echo "PLATFORM=$PLATFORM" >> "$OUTPUT"
|
||||
echo "PLATFORM_LDFLAGS=$PLATFORM_LDFLAGS" >> "$OUTPUT"
|
||||
echo "JAVA_LDFLAGS=$JAVA_LDFLAGS" >> "$OUTPUT"
|
||||
echo "JAVA_STATIC_LDFLAGS=$JAVA_STATIC_LDFLAGS" >> "$OUTPUT"
|
||||
echo "VALGRIND_VER=$VALGRIND_VER" >> "$OUTPUT"
|
||||
echo "PLATFORM_CCFLAGS=$PLATFORM_CCFLAGS" >> "$OUTPUT"
|
||||
echo "PLATFORM_CXXFLAGS=$PLATFORM_CXXFLAGS" >> "$OUTPUT"
|
||||
|
19
build_tools/dependencies.sh
Normal file
19
build_tools/dependencies.sh
Normal file
@ -0,0 +1,19 @@
|
||||
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
|
||||
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
|
||||
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832
|
@ -6,128 +6,143 @@
|
||||
# Environment variables that change the behavior of this script:
|
||||
# PIC_BUILD -- if true, it will only take pic versions of libraries from fbcode. libraries that don't have pic variant will not be included
|
||||
|
||||
|
||||
BASEDIR=`dirname $BASH_SOURCE`
|
||||
source "$BASEDIR/dependencies.sh"
|
||||
|
||||
CFLAGS=""
|
||||
|
||||
# location of libgcc
|
||||
LIBGCC_BASE="/mnt/gvfs/third-party2/libgcc/0473c80518a10d6efcbe24c5eeca3fb4ec9b519c/4.9.x/gcc-4.9-glibc-2.20/e1a7e4e"
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
|
||||
LIBGCC_LIBS=" -L $LIBGCC_BASE/libs"
|
||||
# libgcc
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
|
||||
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
|
||||
|
||||
# location of glibc
|
||||
GLIBC_REV=7397bed99280af5d9543439cdb7d018af7542720
|
||||
GLIBC_INCLUDE="/mnt/gvfs/third-party2/glibc/$GLIBC_REV/2.20/gcc-4.9-glibc-2.20/99df8fc/include"
|
||||
GLIBC_LIBS=" -L /mnt/gvfs/third-party2/glibc/$GLIBC_REV/2.20/gcc-4.9-glibc-2.20/99df8fc/lib"
|
||||
|
||||
SNAPPY_INCLUDE=" -I /mnt/gvfs/third-party2/snappy/b0f269b3ca47770121aa159b99e1d8d2ab260e1f/1.0.3/gcc-4.9-glibc-2.20/c32916f/include/"
|
||||
# glibc
|
||||
GLIBC_INCLUDE="$GLIBC_BASE/include"
|
||||
GLIBC_LIBS=" -L $GLIBC_BASE/lib"
|
||||
|
||||
# snappy
|
||||
SNAPPY_INCLUDE=" -I $SNAPPY_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
SNAPPY_LIBS=" /mnt/gvfs/third-party2/snappy/b0f269b3ca47770121aa159b99e1d8d2ab260e1f/1.0.3/gcc-4.9-glibc-2.20/c32916f/lib/libsnappy.a"
|
||||
SNAPPY_LIBS=" $SNAPPY_BASE/lib/libsnappy.a"
|
||||
else
|
||||
SNAPPY_LIBS=" /mnt/gvfs/third-party2/snappy/b0f269b3ca47770121aa159b99e1d8d2ab260e1f/1.0.3/gcc-4.9-glibc-2.20/c32916f/lib/libsnappy_pic.a"
|
||||
SNAPPY_LIBS=" $SNAPPY_BASE/lib/libsnappy_pic.a"
|
||||
fi
|
||||
|
||||
CFLAGS+=" -DSNAPPY"
|
||||
|
||||
if test -z $PIC_BUILD; then
|
||||
# location of zlib headers and libraries
|
||||
ZLIB_INCLUDE=" -I /mnt/gvfs/third-party2/zlib/feb983d9667f4cf5e9da07ce75abc824764b67a1/1.2.8/gcc-4.9-glibc-2.20/4230243/include/"
|
||||
ZLIB_LIBS=" /mnt/gvfs/third-party2/zlib/feb983d9667f4cf5e9da07ce75abc824764b67a1/1.2.8/gcc-4.9-glibc-2.20/4230243/lib/libz.a"
|
||||
ZLIB_INCLUDE=" -I $ZLIB_BASE/include/"
|
||||
ZLIB_LIBS=" $ZLIB_BASE/lib/libz.a"
|
||||
CFLAGS+=" -DZLIB"
|
||||
|
||||
# location of bzip headers and libraries
|
||||
BZIP_INCLUDE=" -I /mnt/gvfs/third-party2/bzip2/af004cceebb2dfd173ca29933ea5915e727aad2f/1.0.6/gcc-4.9-glibc-2.20/4230243/include/"
|
||||
BZIP_LIBS=" /mnt/gvfs/third-party2/bzip2/af004cceebb2dfd173ca29933ea5915e727aad2f/1.0.6/gcc-4.9-glibc-2.20/4230243/lib/libbz2.a"
|
||||
BZIP_INCLUDE=" -I $BZIP2_BASE/include/"
|
||||
BZIP_LIBS=" $BZIP2_BASE/lib/libbz2.a"
|
||||
CFLAGS+=" -DBZIP2"
|
||||
|
||||
LZ4_INCLUDE=" -I /mnt/gvfs/third-party2/lz4/79d2943e2dd7208a3e0b06cf95e9f85f05fe9e1b/r124/gcc-4.9-glibc-2.20/4230243/include/"
|
||||
LZ4_LIBS=" /mnt/gvfs/third-party2/lz4/79d2943e2dd7208a3e0b06cf95e9f85f05fe9e1b/r124/gcc-4.9-glibc-2.20/4230243/lib/liblz4.a"
|
||||
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
|
||||
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
|
||||
CFLAGS+=" -DLZ4"
|
||||
|
||||
ZSTD_REV=8df2d01673ae6afcc8c8d16fec862b2d67ecc1e9
|
||||
ZSTD_INCLUDE=" -I /mnt/gvfs/third-party2/zstd/$ZSTD_REV/0.1.1/gcc-4.8.1-glibc-2.17/c3f970a/include"
|
||||
ZSTD_LIBS=" /mnt/gvfs/third-party2/zstd/$ZSTD_REV/0.1.1/gcc-4.8.1-glibc-2.17/c3f970a/lib/libzstd.a"
|
||||
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
|
||||
CFLAGS+=" -DZSTD"
|
||||
fi
|
||||
|
||||
# location of gflags headers and libraries
|
||||
GFLAGS_INCLUDE=" -I /mnt/gvfs/third-party2/gflags/0fa60e2b88de3e469db6c482d6e6dac72f5d65f9/1.6/gcc-4.9-glibc-2.20/4230243/include/"
|
||||
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
GFLAGS_LIBS=" /mnt/gvfs/third-party2/gflags/0fa60e2b88de3e469db6c482d6e6dac72f5d65f9/1.6/gcc-4.9-glibc-2.20/4230243/lib/libgflags.a"
|
||||
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags.a"
|
||||
else
|
||||
GFLAGS_LIBS=" /mnt/gvfs/third-party2/gflags/0fa60e2b88de3e469db6c482d6e6dac72f5d65f9/1.6/gcc-4.9-glibc-2.20/4230243/lib/libgflags_pic.a"
|
||||
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DGFLAGS=google"
|
||||
CFLAGS+=" -DGFLAGS=gflags"
|
||||
|
||||
# location of jemalloc
|
||||
JEMALLOC_INCLUDE=" -I /mnt/gvfs/third-party2/jemalloc/bcd68e5e419efa4e61b9486d6854564d6d75a0b5/3.6.0/gcc-4.9-glibc-2.20/2aafc78/include/"
|
||||
JEMALLOC_LIB=" /mnt/gvfs/third-party2/jemalloc/bcd68e5e419efa4e61b9486d6854564d6d75a0b5/3.6.0/gcc-4.9-glibc-2.20/2aafc78/lib/libjemalloc.a"
|
||||
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
|
||||
JEMALLOC_LIB=" $JEMALLOC_BASE/lib/libjemalloc.a"
|
||||
|
||||
if test -z $PIC_BUILD; then
|
||||
# location of numa
|
||||
NUMA_INCLUDE=" -I /mnt/gvfs/third-party2/numa/bbefc39ecbf31d0ca184168eb613ef8d397790ee/2.0.8/gcc-4.9-glibc-2.20/4230243/include/"
|
||||
NUMA_LIB=" /mnt/gvfs/third-party2/numa/bbefc39ecbf31d0ca184168eb613ef8d397790ee/2.0.8/gcc-4.9-glibc-2.20/4230243/lib/libnuma.a"
|
||||
NUMA_INCLUDE=" -I $NUMA_BASE/include/"
|
||||
NUMA_LIB=" $NUMA_BASE/lib/libnuma.a"
|
||||
CFLAGS+=" -DNUMA"
|
||||
|
||||
# location of libunwind
|
||||
LIBUNWIND="/mnt/gvfs/third-party2/libunwind/1de3b75e0afedfe5585b231bbb340ec7a1542335/1.1/gcc-4.9-glibc-2.20/34235e8/lib/libunwind.a"
|
||||
LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a"
|
||||
fi
|
||||
|
||||
# location of TBB
|
||||
TBB_INCLUDE=" -isystem $TBB_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
TBB_LIBS="$TBB_BASE/lib/libtbb.a"
|
||||
else
|
||||
TBB_LIBS="$TBB_BASE/lib/libtbb_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DTBB"
|
||||
|
||||
# use Intel SSE support for checksum calculations
|
||||
export USE_SSE=1
|
||||
|
||||
BINUTILS="/mnt/gvfs/third-party2/binutils/0b6ad0c88ddd903333a48ae8bff134efac468e4a/2.25/centos6-native/da39a3e/bin"
|
||||
BINUTILS="$BINUTILS_BASE/bin"
|
||||
AR="$BINUTILS/ar"
|
||||
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE"
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
|
||||
|
||||
GCC_BASE="/mnt/gvfs/third-party2/gcc/1c67a0b88f64d4d9ced0382d141c76aaa7d62fba/4.9.x/centos6-native/1317bc4"
|
||||
STDLIBS="-L $GCC_BASE/lib64"
|
||||
|
||||
CLANG_BASE="/mnt/gvfs/third-party2/clang/d81444dd214df3d2466734de45bb264a0486acc3/dev"
|
||||
CLANG_BIN="$CLANG_BASE/centos6-native/af4b1a0/bin"
|
||||
CLANG_BIN="$CLANG_BASE/bin"
|
||||
CLANG_LIB="$CLANG_BASE/lib"
|
||||
CLANG_SRC="$CLANG_BASE/../../src"
|
||||
|
||||
CLANG_ANALYZER="$CLANG_BIN/clang++"
|
||||
CLANG_SCAN_BUILD="$CLANG_BASE/src/clang/tools/scan-build/scan-build"
|
||||
CLANG_SCAN_BUILD="$CLANG_SRC/llvm/tools/clang/tools/scan-build/bin/scan-build"
|
||||
|
||||
if [ -z "$USE_CLANG" ]; then
|
||||
# gcc
|
||||
CC="$GCC_BASE/bin/gcc"
|
||||
CXX="$GCC_BASE/bin/g++"
|
||||
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
JEMALLOC=1
|
||||
else
|
||||
# clang
|
||||
CLANG_INCLUDE="$CLANG_BASE/gcc-4.9-glibc-2.20/74c386f/lib/clang/dev/include/"
|
||||
# clang
|
||||
CLANG_INCLUDE="$CLANG_LIB/clang/stable/include"
|
||||
CC="$CLANG_BIN/clang"
|
||||
CXX="$CLANG_BIN/clang++"
|
||||
|
||||
KERNEL_HEADERS_INCLUDE="/mnt/gvfs/third-party2/kernel-headers/ffd14f660a43c4b92717986b1bba66722ef089d0/3.2.18_70_fbk11_00129_gc8882d0/gcc-4.9-glibc-2.20/da39a3e/include"
|
||||
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $CLANG_INCLUDE"
|
||||
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE/linux "
|
||||
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE "
|
||||
CFLAGS+=" -Wno-expansion-to-defined "
|
||||
CXXFLAGS="-nostdinc++"
|
||||
fi
|
||||
|
||||
CFLAGS+=" $DEPS_INCLUDE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
|
||||
CXXFLAGS+=" $CFLAGS"
|
||||
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
|
||||
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" $LIBUNWIND"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
|
||||
# required by libtbb
|
||||
EXEC_LDFLAGS+=" -ldl"
|
||||
|
||||
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
|
||||
|
||||
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS"
|
||||
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
|
||||
|
||||
VALGRIND_VER="/mnt/gvfs/third-party2/valgrind/6c45ef049cbf11c2df593addb712cd891049e737/3.10.0/gcc-4.9-glibc-2.20/4230243/bin/"
|
||||
VALGRIND_VER="$VALGRIND_BASE/bin/"
|
||||
|
||||
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD
|
||||
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD
|
||||
|
@ -79,6 +79,12 @@ echo -e "Detect lines that doesn't follow the format rules:\r"
|
||||
echo "$diffs" |
|
||||
sed -e "s/\(^-.*$\)/`echo -e \"$COLOR_RED\1$COLOR_END\"`/" |
|
||||
sed -e "s/\(^+.*$\)/`echo -e \"$COLOR_GREEN\1$COLOR_END\"`/"
|
||||
|
||||
if [[ "$OPT" == *"-DTRAVIS"* ]]
|
||||
then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo -e "Would you like to fix the format automatically (y/n): \c"
|
||||
|
||||
# Make sure under any mode, we can read user input.
|
||||
|
@ -103,9 +103,8 @@ Status BuildTable(
|
||||
// TODO(noetzli): Update stats after flush, too.
|
||||
if (io_priority == Env::IO_HIGH &&
|
||||
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
|
||||
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
||||
ThreadStatusUtil::SetThreadOperationProperty(
|
||||
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
|
||||
IOSTATS_RESET(bytes_written);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include "db/db_impl.h"
|
||||
#include "rocksdb/db.h"
|
||||
@ -19,6 +20,7 @@
|
||||
#include "util/testharness.h"
|
||||
#include "util/testutil.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/sync_point.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -1196,6 +1198,67 @@ TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
|
||||
db_options_.create_missing_column_families = true;
|
||||
Open({"default", "one"});
|
||||
ColumnFamilyOptions options;
|
||||
options.level0_file_num_compaction_trigger = 100;
|
||||
options.level0_slowdown_writes_trigger = 200;
|
||||
options.level0_stop_writes_trigger = 200;
|
||||
options.max_write_buffer_number = 20;
|
||||
options.write_buffer_size = 100000; // small write buffer size
|
||||
Reopen({options, options});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"VersionSet::LogAndApply::ColumnFamilyDrop:1"
|
||||
"FlushJob::InstallResults"},
|
||||
{"FlushJob::InstallResults",
|
||||
"VersionSet::LogAndApply::ColumnFamilyDrop:2", }});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
test::SleepingBackgroundTask sleeping_task;
|
||||
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
|
||||
Env::Priority::HIGH);
|
||||
|
||||
// 1MB should create ~10 files for each CF
|
||||
int kKeysNum = 10000;
|
||||
PutRandomData(1, kKeysNum, 100);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); });
|
||||
|
||||
sleeping_task.WakeUp();
|
||||
sleeping_task.WaitUntilDone();
|
||||
sleeping_task.Reset();
|
||||
// now we sleep again. this is just so we're certain that flush job finished
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
|
||||
Env::Priority::HIGH);
|
||||
sleeping_task.WakeUp();
|
||||
sleeping_task.WaitUntilDone();
|
||||
|
||||
{
|
||||
// Since we didn't delete CF handle, RocksDB's contract guarantees that
|
||||
// we're still able to read dropped CF
|
||||
std::unique_ptr<Iterator> iterator(
|
||||
db_->NewIterator(ReadOptions(), handles_[1]));
|
||||
int count = 0;
|
||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
||||
ASSERT_OK(iterator->status());
|
||||
++count;
|
||||
}
|
||||
ASSERT_OK(iterator->status());
|
||||
ASSERT_EQ(count, kKeysNum);
|
||||
}
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
Close();
|
||||
Destroy();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -128,7 +128,7 @@ void CompactionIterator::NextFromInput() {
|
||||
current_user_key_snapshot_ = 0;
|
||||
// apply the compaction filter to the first occurrence of the user key
|
||||
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
|
||||
(visible_at_tip_ || latest_snapshot_)) {
|
||||
(visible_at_tip_ || ikey_.sequence > latest_snapshot_)) {
|
||||
// If the user has specified a compaction filter and the sequence
|
||||
// number is greater than any external snapshot, then invoke the
|
||||
// filter. If the return value of the compaction filter is true,
|
||||
|
@ -1014,8 +1014,8 @@ TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
|
||||
&rnd, start_key, start_key + key_base - 1,
|
||||
kKeySize, kValueSize, key_interval,
|
||||
compression_ratio, 1);
|
||||
reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact();
|
||||
}
|
||||
reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact();
|
||||
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
|
||||
}
|
||||
|
||||
|
@ -537,6 +537,42 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
|
||||
}
|
||||
}
|
||||
|
||||
// Compaction filters should only be applied to records that are newer than the
|
||||
// latest snapshot. This test inserts records and applies a delete filter.
|
||||
TEST_F(DBTestCompactionFilter, CompactionFilterSnapshot) {
|
||||
Options options;
|
||||
options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
|
||||
options.disable_auto_compactions = true;
|
||||
options.create_if_missing = true;
|
||||
options = CurrentOptions(options);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
// Put some data.
|
||||
const Snapshot* snapshot = nullptr;
|
||||
for (int table = 0; table < 4; ++table) {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
Put(ToString(table * 100 + i), "val");
|
||||
}
|
||||
Flush();
|
||||
|
||||
if (table == 0) {
|
||||
snapshot = db_->GetSnapshot();
|
||||
}
|
||||
}
|
||||
assert(snapshot != nullptr);
|
||||
|
||||
cfilter_count = 0;
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||
// The filter should delete 10 records.
|
||||
ASSERT_EQ(30U, cfilter_count);
|
||||
|
||||
// Release the snapshot and compact again -> now all records should be
|
||||
// removed.
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||
ASSERT_EQ(0U, CountLiveFiles());
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -538,6 +538,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
|
||||
// don't delete files that might be currently written to from compaction
|
||||
// threads
|
||||
// Since job_context->min_pending_output is set, until file scan finishes,
|
||||
// mutex_ cannot be released. Otherwise, we might see no min_pending_output
|
||||
// here but later find newer generated unfinalized files while scannint.
|
||||
if (!pending_outputs_.empty()) {
|
||||
job_context->min_pending_output = *pending_outputs_.begin();
|
||||
} else {
|
||||
@ -550,37 +553,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
|
||||
job_context->min_pending_output);
|
||||
|
||||
if (!alive_log_files_.empty()) {
|
||||
uint64_t min_log_number = versions_->MinLogNumber();
|
||||
// find newly obsoleted log files
|
||||
while (alive_log_files_.begin()->number < min_log_number) {
|
||||
auto& earliest = *alive_log_files_.begin();
|
||||
job_context->log_delete_files.push_back(earliest.number);
|
||||
total_log_size_ -= earliest.size;
|
||||
alive_log_files_.pop_front();
|
||||
// Current log should always stay alive since it can't have
|
||||
// number < MinLogNumber().
|
||||
assert(alive_log_files_.size());
|
||||
}
|
||||
while (!logs_.empty() && logs_.front().number < min_log_number) {
|
||||
auto& log = logs_.front();
|
||||
if (log.getting_synced) {
|
||||
log_sync_cv_.Wait();
|
||||
// logs_ could have changed while we were waiting.
|
||||
continue;
|
||||
}
|
||||
logs_to_free_.push_back(log.ReleaseWriter());
|
||||
logs_.pop_front();
|
||||
}
|
||||
// Current log cannot be obsolete.
|
||||
assert(!logs_.empty());
|
||||
}
|
||||
|
||||
// We're just cleaning up for DB::Write().
|
||||
assert(job_context->logs_to_free.empty());
|
||||
job_context->logs_to_free = logs_to_free_;
|
||||
logs_to_free_.clear();
|
||||
|
||||
// store the current filenum, lognum, etc
|
||||
job_context->manifest_file_number = versions_->manifest_file_number();
|
||||
job_context->pending_manifest_file_number =
|
||||
@ -622,6 +594,37 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!alive_log_files_.empty()) {
|
||||
uint64_t min_log_number = versions_->MinLogNumber();
|
||||
// find newly obsoleted log files
|
||||
while (alive_log_files_.begin()->number < min_log_number) {
|
||||
auto& earliest = *alive_log_files_.begin();
|
||||
job_context->log_delete_files.push_back(earliest.number);
|
||||
total_log_size_ -= earliest.size;
|
||||
alive_log_files_.pop_front();
|
||||
// Current log should always stay alive since it can't have
|
||||
// number < MinLogNumber().
|
||||
assert(alive_log_files_.size());
|
||||
}
|
||||
while (!logs_.empty() && logs_.front().number < min_log_number) {
|
||||
auto& log = logs_.front();
|
||||
if (log.getting_synced) {
|
||||
log_sync_cv_.Wait();
|
||||
// logs_ could have changed while we were waiting.
|
||||
continue;
|
||||
}
|
||||
logs_to_free_.push_back(log.ReleaseWriter());
|
||||
logs_.pop_front();
|
||||
}
|
||||
// Current log cannot be obsolete.
|
||||
assert(!logs_.empty());
|
||||
}
|
||||
|
||||
// We're just cleaning up for DB::Write().
|
||||
assert(job_context->logs_to_free.empty());
|
||||
job_context->logs_to_free = logs_to_free_;
|
||||
logs_to_free_.clear();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
182
db/db_test.cc
182
db/db_test.cc
@ -2219,61 +2219,16 @@ TEST_F(DBTest, NumImmutableMemTable) {
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
class SleepingBackgroundTask {
|
||||
public:
|
||||
SleepingBackgroundTask()
|
||||
: bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {}
|
||||
void DoSleep() {
|
||||
MutexLock l(&mutex_);
|
||||
while (should_sleep_) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
done_with_sleep_ = true;
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
void WakeUp() {
|
||||
MutexLock l(&mutex_);
|
||||
should_sleep_ = false;
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
void WaitUntilDone() {
|
||||
MutexLock l(&mutex_);
|
||||
while (!done_with_sleep_) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
}
|
||||
bool WokenUp() {
|
||||
MutexLock l(&mutex_);
|
||||
return should_sleep_ == false;
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
MutexLock l(&mutex_);
|
||||
should_sleep_ = true;
|
||||
done_with_sleep_ = false;
|
||||
}
|
||||
|
||||
static void DoSleepTask(void* arg) {
|
||||
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
|
||||
}
|
||||
|
||||
private:
|
||||
port::Mutex mutex_;
|
||||
port::CondVar bg_cv_; // Signalled when background work finishes
|
||||
bool should_sleep_;
|
||||
bool done_with_sleep_;
|
||||
};
|
||||
|
||||
TEST_F(DBTest, FlushEmptyColumnFamily) {
|
||||
// Block flush thread and disable compaction thread
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
|
||||
Env::Priority::HIGH);
|
||||
test::SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||
&sleeping_task_high, Env::Priority::HIGH);
|
||||
|
||||
Options options = CurrentOptions();
|
||||
// disable compaction
|
||||
@ -2312,12 +2267,12 @@ TEST_F(DBTest, GetProperty) {
|
||||
// Set sizes to both background thread pool to be 1 and block them.
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
|
||||
Env::Priority::HIGH);
|
||||
test::SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||
&sleeping_task_high, Env::Priority::HIGH);
|
||||
|
||||
Options options = CurrentOptions();
|
||||
WriteOptions writeOpt = WriteOptions();
|
||||
@ -2566,8 +2521,8 @@ TEST_F(DBTest, EstimatePendingCompBytes) {
|
||||
// Set sizes to both background thread pool to be 1 and block them.
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
|
||||
Options options = CurrentOptions();
|
||||
@ -6179,7 +6134,7 @@ TEST_F(DBTest, TableOptionsSanitizeTest) {
|
||||
TEST_F(DBTest, SanitizeNumThreads) {
|
||||
for (int attempt = 0; attempt < 2; attempt++) {
|
||||
const size_t kTotalTasks = 8;
|
||||
SleepingBackgroundTask sleeping_tasks[kTotalTasks];
|
||||
test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
|
||||
|
||||
Options options = CurrentOptions();
|
||||
if (attempt == 0) {
|
||||
@ -6191,7 +6146,8 @@ TEST_F(DBTest, SanitizeNumThreads) {
|
||||
|
||||
for (size_t i = 0; i < kTotalTasks; i++) {
|
||||
// Insert 5 tasks to low priority queue and 5 tasks to high priority queue
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i],
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||
&sleeping_tasks[i],
|
||||
(i < 4) ? Env::Priority::LOW : Env::Priority::HIGH);
|
||||
}
|
||||
|
||||
@ -6483,8 +6439,8 @@ TEST_F(DBTest, DynamicMemtableOptions) {
|
||||
// max_background_flushes == 0, so flushes are getting executed by the
|
||||
// compaction thread
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
// Start from scratch and disable compaction/flush. Flush can only happen
|
||||
// during compaction but trigger is pretty high
|
||||
@ -6519,7 +6475,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
|
||||
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
||||
|
||||
sleeping_task_low.Reset();
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
count = 0;
|
||||
while (!sleeping_task_low.WokenUp() && count < 1024) {
|
||||
@ -6542,7 +6498,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
|
||||
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
||||
|
||||
sleeping_task_low.Reset();
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
|
||||
count = 0;
|
||||
@ -7349,8 +7305,8 @@ TEST_F(DBTest, DynamicCompactionOptions) {
|
||||
// since level0_stop_writes_trigger = 8
|
||||
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
||||
// Block compaction
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
@ -7386,7 +7342,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
|
||||
|
||||
// Block compaction again
|
||||
sleeping_task_low.Reset();
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
count = 0;
|
||||
while (count < 64) {
|
||||
@ -7824,7 +7780,7 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) {
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("0,0,1", FilesPerLevel(0));
|
||||
|
||||
SleepingBackgroundTask blocking_thread;
|
||||
test::SleepingBackgroundTask blocking_thread;
|
||||
port::Mutex mutex_;
|
||||
bool already_blocked(false);
|
||||
|
||||
@ -7891,12 +7847,12 @@ TEST_F(DBTest, CloseSpeedup) {
|
||||
// Block background threads
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
|
||||
Env::Priority::HIGH);
|
||||
test::SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||
&sleeping_task_high, Env::Priority::HIGH);
|
||||
|
||||
std::vector<std::string> filenames;
|
||||
env_->GetChildren(dbname_, &filenames);
|
||||
@ -7993,6 +7949,9 @@ TEST_F(DBTest, MergeTestTime) {
|
||||
|
||||
ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 6000000);
|
||||
ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 3200000);
|
||||
#if ROCKSDB_USING_THREAD_STATUS
|
||||
ASSERT_GT(TestGetTickerCount(options, FLUSH_WRITE_BYTES), 0);
|
||||
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||
}
|
||||
|
||||
TEST_P(DBTestWithParam, MergeCompactionTimeTest) {
|
||||
@ -9143,6 +9102,85 @@ TEST_F(DBTest, GetTotalSstFilesSizeVersionsFilesShared) {
|
||||
INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
|
||||
::testing::Values(1, 4));
|
||||
|
||||
class SliceTransformLimitedDomain : public SliceTransform {
|
||||
const char* Name() const override { return "SliceTransformLimitedDomain"; }
|
||||
|
||||
Slice Transform(const Slice& src) const override {
|
||||
return Slice(src.data(), 5);
|
||||
}
|
||||
|
||||
bool InDomain(const Slice& src) const override {
|
||||
// prefix will be x????
|
||||
return src.size() >= 5 && src[0] == 'x';
|
||||
}
|
||||
|
||||
bool InRange(const Slice& dst) const override {
|
||||
// prefix will be x????
|
||||
return dst.size() == 5 && dst[0] == 'x';
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(DBTest, PrefixExtractorFullFilter) {
|
||||
BlockBasedTableOptions bbto;
|
||||
// Full Filter Block
|
||||
bbto.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));
|
||||
bbto.whole_key_filtering = false;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.prefix_extractor = std::make_shared<SliceTransformLimitedDomain>();
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
|
||||
DestroyAndReopen(options);
|
||||
|
||||
ASSERT_OK(Put("x1111_AAAA", "val1"));
|
||||
ASSERT_OK(Put("x1112_AAAA", "val2"));
|
||||
ASSERT_OK(Put("x1113_AAAA", "val3"));
|
||||
ASSERT_OK(Put("x1114_AAAA", "val4"));
|
||||
// Not in domain, wont be added to filter
|
||||
ASSERT_OK(Put("zzzzz_AAAA", "val5"));
|
||||
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
ASSERT_EQ(Get("x1111_AAAA"), "val1");
|
||||
ASSERT_EQ(Get("x1112_AAAA"), "val2");
|
||||
ASSERT_EQ(Get("x1113_AAAA"), "val3");
|
||||
ASSERT_EQ(Get("x1114_AAAA"), "val4");
|
||||
// Was not added to filter but rocksdb will try to read it from the filter
|
||||
ASSERT_EQ(Get("zzzzz_AAAA"), "val5");
|
||||
}
|
||||
|
||||
TEST_F(DBTest, PrefixExtractorBlockFilter) {
|
||||
BlockBasedTableOptions bbto;
|
||||
// Block Filter Block
|
||||
bbto.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, true));
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.prefix_extractor = std::make_shared<SliceTransformLimitedDomain>();
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
|
||||
DestroyAndReopen(options);
|
||||
|
||||
ASSERT_OK(Put("x1113_AAAA", "val3"));
|
||||
ASSERT_OK(Put("x1114_AAAA", "val4"));
|
||||
// Not in domain, wont be added to filter
|
||||
ASSERT_OK(Put("zzzzz_AAAA", "val1"));
|
||||
ASSERT_OK(Put("zzzzz_AAAB", "val2"));
|
||||
ASSERT_OK(Put("zzzzz_AAAC", "val3"));
|
||||
ASSERT_OK(Put("zzzzz_AAAD", "val4"));
|
||||
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
std::vector<std::string> iter_res;
|
||||
auto iter = db_->NewIterator(ReadOptions());
|
||||
// Seek to a key that was not in Domain
|
||||
for (iter->Seek("zzzzz_AAAA"); iter->Valid(); iter->Next()) {
|
||||
iter_res.emplace_back(iter->value().ToString());
|
||||
}
|
||||
|
||||
std::vector<std::string> expected_res = {"val1", "val2", "val3", "val4"};
|
||||
ASSERT_EQ(iter_res, expected_res);
|
||||
delete iter;
|
||||
}
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif
|
||||
|
@ -111,9 +111,8 @@ void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
|
||||
}
|
||||
|
||||
void FlushJob::RecordFlushIOStats() {
|
||||
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
||||
ThreadStatusUtil::SetThreadOperationProperty(
|
||||
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
|
||||
IOSTATS_RESET(bytes_written);
|
||||
}
|
||||
|
||||
Status FlushJob::Run(FileMetaData* file_meta) {
|
||||
@ -154,6 +153,7 @@ Status FlushJob::Run(FileMetaData* file_meta) {
|
||||
if (!s.ok()) {
|
||||
cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber());
|
||||
} else {
|
||||
TEST_SYNC_POINT("FlushJob::InstallResults");
|
||||
// Replace immutable memtable with the generated Table
|
||||
s = cfd_->imm()->InstallMemtableFlushResults(
|
||||
cfd_, mutable_cf_options_, mems, versions_, db_mutex_,
|
||||
|
@ -132,8 +132,6 @@ DEFINE_int64(seed, 0,
|
||||
"Seed base for random number generators. "
|
||||
"When 0 it is deterministic.");
|
||||
|
||||
static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
namespace {
|
||||
|
@ -1979,7 +1979,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
if (!manifest_writers_.empty()) {
|
||||
manifest_writers_.front()->cv.Signal();
|
||||
}
|
||||
return Status::OK();
|
||||
// we steal this code to also inform about cf-drop
|
||||
return Status::ShutdownInProgress();
|
||||
}
|
||||
|
||||
std::vector<VersionEdit*> batch_edits;
|
||||
@ -2141,6 +2142,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
new_manifest_file_size = descriptor_log_->file()->GetFileSize();
|
||||
}
|
||||
|
||||
if (edit->is_column_family_drop_) {
|
||||
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
|
||||
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
|
||||
}
|
||||
|
||||
LogFlush(db_options_->info_log);
|
||||
mu->Lock();
|
||||
}
|
||||
|
@ -99,7 +99,9 @@ class Transaction {
|
||||
virtual Status RollbackToSavePoint() = 0;
|
||||
|
||||
// This function is similar to DB::Get() except it will also read pending
|
||||
// changes in this transaction.
|
||||
// changes in this transaction. Currently, this function will return
|
||||
// Status::MergeInProgress if the most recent write to the queried key in
|
||||
// this batch is a Merge.
|
||||
//
|
||||
// If read_options.snapshot is not set, the current version of the key will
|
||||
// be read. Calling SetSnapshot() does not affect the version of the data
|
||||
@ -131,6 +133,9 @@ class Transaction {
|
||||
// snapshot is set in this transaction). The transaction behavior is the
|
||||
// same regardless of whether the key exists or not.
|
||||
//
|
||||
// Note: Currently, this function will return Status::MergeInProgress
|
||||
// if the most recent write to the queried key in this batch is a Merge.
|
||||
//
|
||||
// The values returned by this function are similar to Transaction::Get().
|
||||
// If value==nullptr, then this function will not read any data, but will
|
||||
// still ensure that this key cannot be written to by outside of this
|
||||
@ -146,6 +151,7 @@ class Transaction {
|
||||
// Status::TimedOut() if a lock could not be acquired,
|
||||
// Status::TryAgain() if the memtable history size is not large enough
|
||||
// (See max_write_buffer_number_to_maintain)
|
||||
// Status::MergeInProgress() if merge operations cannot be resolved.
|
||||
// or other errors if this key could not be read.
|
||||
virtual Status GetForUpdate(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
|
@ -120,7 +120,14 @@ class WriteBatchWithIndex : public WriteBatchBase {
|
||||
WBWIIterator* NewIterator();
|
||||
|
||||
// Will create a new Iterator that will use WBWIIterator as a delta and
|
||||
// base_iterator as base
|
||||
// base_iterator as base.
|
||||
//
|
||||
// This function is only supported if the WriteBatchWithIndex was
|
||||
// constructed with overwrite_key=true.
|
||||
//
|
||||
// The returned iterator should be deleted by the caller.
|
||||
// The base_iterator is now 'owned' by the returned iterator. Deleting the
|
||||
// returned iterator will also delete the base_iterator.
|
||||
Iterator* NewIteratorWithBase(ColumnFamilyHandle* column_family,
|
||||
Iterator* base_iterator);
|
||||
// default column family
|
||||
@ -135,7 +142,7 @@ class WriteBatchWithIndex : public WriteBatchBase {
|
||||
|
||||
// Similar to previous function but does not require a column_family.
|
||||
// Note: An InvalidArgument status will be returned if there are any Merge
|
||||
// operators for this key.
|
||||
// operators for this key. Use previous method instead.
|
||||
Status GetFromBatch(const DBOptions& options, const Slice& key,
|
||||
std::string* value) {
|
||||
return GetFromBatch(nullptr, options, key, value);
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 4
|
||||
#define ROCKSDB_MINOR 0
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 1
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -15,7 +15,7 @@ Once you have these items, run this make command from RocksDB's root source dire
|
||||
|
||||
This command will build RocksDB natively on OSX, and will then spin up two Vagrant Virtualbox Ubuntu images to build RocksDB for both 32-bit and 64-bit Linux.
|
||||
|
||||
You can find all native binaries and JARs in the java directory upon completion:
|
||||
You can find all native binaries and JARs in the java/target directory upon completion:
|
||||
|
||||
librocksdbjni-linux32.so
|
||||
librocksdbjni-linux64.so
|
||||
|
2
java/crossbuild/Vagrantfile
vendored
2
java/crossbuild/Vagrantfile
vendored
@ -20,7 +20,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
|
||||
end
|
||||
|
||||
config.vm.provision :shell, path: "build-linux-centos.sh"
|
||||
config.vm.synced_folder "../", "/rocksdb-build"
|
||||
config.vm.synced_folder "../target", "/rocksdb-build"
|
||||
config.vm.synced_folder "../..", "/rocksdb", type: "rsync"
|
||||
config.vm.boot_timeout = 1200
|
||||
end
|
||||
|
@ -1,15 +1,15 @@
|
||||
#!/usr/bin/env bash
|
||||
# install all required packages for rocksdb that are available through yum
|
||||
ARCH=$(uname -i)
|
||||
sudo yum -y install openssl java-1.7.0-openjdk-devel.$ARCH zlib zlib-devel bzip2 bzip2-devel
|
||||
sudo yum -y install openssl java-1.7.0-openjdk-devel.$ARCH
|
||||
|
||||
# install gcc/g++ 4.8.2 via CERN (http://linux.web.cern.ch/linux/devtoolset/)
|
||||
sudo wget -O /etc/yum.repos.d/slc5-devtoolset.repo http://linuxsoft.cern.ch/cern/devtoolset/slc5-devtoolset.repo
|
||||
sudo wget -O /etc/pki/rpm-gpg/RPM-GPG-KEY-cern http://ftp.mirrorservice.org/sites/ftp.scientificlinux.org/linux/scientific/51/i386/RPM-GPG-KEYs/RPM-GPG-KEY-cern
|
||||
sudo yum -y install devtoolset-2
|
||||
|
||||
wget http://gflags.googlecode.com/files/gflags-1.6.tar.gz
|
||||
tar xvfz gflags-1.6.tar.gz; cd gflags-1.6; scl enable devtoolset-2 ./configure; scl enable devtoolset-2 make; sudo make install
|
||||
wget http://gflags.googlecode.com/files/gflags-2.0-no-svn-files.tar.gz
|
||||
tar xvfz gflags-2.0-no-svn-files.tar.gz; cd gflags-2.0; scl enable devtoolset-2 ./configure; scl enable devtoolset-2 make; sudo make install
|
||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib
|
||||
|
||||
# set java home so we can build rocksdb jars
|
||||
@ -18,7 +18,7 @@ export JAVA_HOME=/usr/lib/jvm/java-1.7.0
|
||||
# build rocksdb
|
||||
cd /rocksdb
|
||||
scl enable devtoolset-2 'make jclean clean'
|
||||
scl enable devtoolset-2 'make rocksdbjavastatic'
|
||||
scl enable devtoolset-2 'PORTABLE=1 make rocksdbjavastatic'
|
||||
cp /rocksdb/java/target/librocksdbjni-* /rocksdb-build
|
||||
cp /rocksdb/java/target/rocksdbjni-* /rocksdb-build
|
||||
|
||||
|
@ -617,13 +617,13 @@ void BlockBasedTable::SetupForCompaction() {
|
||||
case Options::NONE:
|
||||
break;
|
||||
case Options::NORMAL:
|
||||
rep_->file->Hint(RandomAccessFile::NORMAL);
|
||||
rep_->file->file()->Hint(RandomAccessFile::NORMAL);
|
||||
break;
|
||||
case Options::SEQUENTIAL:
|
||||
rep_->file->Hint(RandomAccessFile::SEQUENTIAL);
|
||||
rep_->file->file()->Hint(RandomAccessFile::SEQUENTIAL);
|
||||
break;
|
||||
case Options::WILLNEED:
|
||||
rep_->file->Hint(RandomAccessFile::WILLNEED);
|
||||
rep_->file->file()->Hint(RandomAccessFile::WILLNEED);
|
||||
break;
|
||||
default:
|
||||
assert(false);
|
||||
@ -1107,9 +1107,12 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
|
||||
}
|
||||
|
||||
assert(rep_->ioptions.prefix_extractor != nullptr);
|
||||
auto prefix = rep_->ioptions.prefix_extractor->Transform(
|
||||
ExtractUserKey(internal_key));
|
||||
InternalKey internal_key_prefix(prefix, 0, kTypeValue);
|
||||
auto user_key = ExtractUserKey(internal_key);
|
||||
if (!rep_->ioptions.prefix_extractor->InDomain(user_key)) {
|
||||
return true;
|
||||
}
|
||||
auto prefix = rep_->ioptions.prefix_extractor->Transform(user_key);
|
||||
InternalKey internal_key_prefix(prefix, kMaxSequenceNumber, kTypeValue);
|
||||
auto internal_prefix = internal_key_prefix.Encode();
|
||||
|
||||
bool may_match = true;
|
||||
@ -1192,6 +1195,7 @@ bool BlockBasedTable::FullFilterKeyMayMatch(FilterBlockReader* filter,
|
||||
return false;
|
||||
}
|
||||
if (rep_->ioptions.prefix_extractor &&
|
||||
rep_->ioptions.prefix_extractor->InDomain(user_key) &&
|
||||
!filter->PrefixMayMatch(
|
||||
rep_->ioptions.prefix_extractor->Transform(user_key))) {
|
||||
return false;
|
||||
|
@ -1216,6 +1216,41 @@ TEST_F(BlockBasedTableTest, TotalOrderSeekOnHashIndex) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(BlockBasedTableTest, NoopTransformSeek) {
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
|
||||
|
||||
Options options;
|
||||
options.comparator = BytewiseComparator();
|
||||
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
||||
options.prefix_extractor.reset(NewNoopTransform());
|
||||
|
||||
TableConstructor c(options.comparator);
|
||||
// To tickle the PrefixMayMatch bug it is important that the
|
||||
// user-key is a single byte so that the index key exactly matches
|
||||
// the user-key.
|
||||
InternalKey key("a", 1, kTypeValue);
|
||||
c.Add(key.Encode().ToString(), "b");
|
||||
std::vector<std::string> keys;
|
||||
stl_wrappers::KVMap kvmap;
|
||||
const ImmutableCFOptions ioptions(options);
|
||||
const InternalKeyComparator internal_comparator(options.comparator);
|
||||
c.Finish(options, ioptions, table_options, internal_comparator, &keys,
|
||||
&kvmap);
|
||||
|
||||
auto* reader = c.GetTableReader();
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
ReadOptions ro;
|
||||
ro.total_order_seek = (i == 0);
|
||||
std::unique_ptr<Iterator> iter(reader->NewIterator(ro));
|
||||
|
||||
iter->Seek(key.Encode());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ("a", ExtractUserKey(iter->key()).ToString());
|
||||
}
|
||||
}
|
||||
|
||||
static std::string RandomString(Random* rnd, int len) {
|
||||
std::string r;
|
||||
test::RandomString(rnd, len, &r);
|
||||
|
@ -223,7 +223,7 @@ DEFINE_int32(set_options_one_in, 0,
|
||||
DEFINE_int32(set_in_place_one_in, 0,
|
||||
"With a chance of 1/N, toggle in place support option");
|
||||
|
||||
DEFINE_int64(cache_size, 2 * KB * KB * KB,
|
||||
DEFINE_int64(cache_size, 2LL * KB * KB * KB,
|
||||
"Number of bytes to use as a cache of uncompressed data.");
|
||||
|
||||
DEFINE_uint64(subcompactions, 1,
|
||||
@ -1637,10 +1637,10 @@ class StressTest {
|
||||
void VerifyDb(ThreadState* thread) const {
|
||||
ReadOptions options(FLAGS_verify_checksum, true);
|
||||
auto shared = thread->shared;
|
||||
static const long max_key = shared->GetMaxKey();
|
||||
static const long keys_per_thread = max_key / shared->GetNumThreads();
|
||||
long start = keys_per_thread * thread->tid;
|
||||
long end = start + keys_per_thread;
|
||||
const int64_t max_key = shared->GetMaxKey();
|
||||
const int64_t keys_per_thread = max_key / shared->GetNumThreads();
|
||||
int64_t start = keys_per_thread * thread->tid;
|
||||
int64_t end = start + keys_per_thread;
|
||||
if (thread->tid == shared->GetNumThreads() - 1) {
|
||||
end = max_key;
|
||||
}
|
||||
|
@ -21,6 +21,14 @@ class ArenaTest : public testing::Test {};
|
||||
TEST_F(ArenaTest, Empty) { Arena arena0; }
|
||||
|
||||
namespace {
|
||||
bool CheckMemoryAllocated(size_t allocated, size_t expected) {
|
||||
// The value returned by Arena::MemoryAllocatedBytes() may be greater than
|
||||
// the requested memory. We choose a somewhat arbitrary upper bound of
|
||||
// max_expected = expected * 1.1 to detect critical overallocation.
|
||||
size_t max_expected = expected * 1.1;
|
||||
return allocated >= expected && allocated <= max_expected;
|
||||
}
|
||||
|
||||
void MemoryAllocatedBytesTest(size_t huge_page_size) {
|
||||
const int N = 17;
|
||||
size_t req_sz; // requested size
|
||||
@ -36,7 +44,8 @@ void MemoryAllocatedBytesTest(size_t huge_page_size) {
|
||||
arena.Allocate(req_sz);
|
||||
}
|
||||
expected_memory_allocated = req_sz * N + Arena::kInlineSize;
|
||||
ASSERT_EQ(arena.MemoryAllocatedBytes(), expected_memory_allocated);
|
||||
ASSERT_PRED2(CheckMemoryAllocated, arena.MemoryAllocatedBytes(),
|
||||
expected_memory_allocated);
|
||||
|
||||
arena.Allocate(Arena::kInlineSize - 1);
|
||||
|
||||
@ -49,13 +58,15 @@ void MemoryAllocatedBytesTest(size_t huge_page_size) {
|
||||
arena.Allocate(req_sz);
|
||||
}
|
||||
if (huge_page_size) {
|
||||
ASSERT_TRUE(arena.MemoryAllocatedBytes() ==
|
||||
expected_memory_allocated + bsz ||
|
||||
arena.MemoryAllocatedBytes() ==
|
||||
expected_memory_allocated + huge_page_size);
|
||||
ASSERT_TRUE(
|
||||
CheckMemoryAllocated(arena.MemoryAllocatedBytes(),
|
||||
expected_memory_allocated + bsz) ||
|
||||
CheckMemoryAllocated(arena.MemoryAllocatedBytes(),
|
||||
expected_memory_allocated + huge_page_size));
|
||||
} else {
|
||||
expected_memory_allocated += bsz;
|
||||
ASSERT_EQ(arena.MemoryAllocatedBytes(), expected_memory_allocated);
|
||||
ASSERT_PRED2(CheckMemoryAllocated, arena.MemoryAllocatedBytes(),
|
||||
expected_memory_allocated);
|
||||
}
|
||||
|
||||
// requested size > size of a block:
|
||||
@ -66,7 +77,8 @@ void MemoryAllocatedBytesTest(size_t huge_page_size) {
|
||||
arena.Allocate(req_sz);
|
||||
}
|
||||
expected_memory_allocated += req_sz * N;
|
||||
ASSERT_EQ(arena.MemoryAllocatedBytes(), expected_memory_allocated);
|
||||
ASSERT_PRED2(CheckMemoryAllocated, arena.MemoryAllocatedBytes(),
|
||||
expected_memory_allocated);
|
||||
}
|
||||
|
||||
// Make sure we didn't count the allocate but not used memory space in
|
||||
@ -83,7 +95,8 @@ static void ApproximateMemoryUsageTest(size_t huge_page_size) {
|
||||
arena.AllocateAligned(Arena::kInlineSize / 2 - 16);
|
||||
arena.AllocateAligned(Arena::kInlineSize / 2);
|
||||
ASSERT_EQ(arena.ApproximateMemoryUsage(), Arena::kInlineSize - 8);
|
||||
ASSERT_EQ(arena.MemoryAllocatedBytes(), Arena::kInlineSize);
|
||||
ASSERT_PRED2(CheckMemoryAllocated, arena.MemoryAllocatedBytes(),
|
||||
Arena::kInlineSize);
|
||||
|
||||
auto num_blocks = kBlockSize / kEntrySize;
|
||||
|
||||
@ -91,10 +104,12 @@ static void ApproximateMemoryUsageTest(size_t huge_page_size) {
|
||||
arena.AllocateAligned(kEntrySize);
|
||||
auto mem_usage = arena.MemoryAllocatedBytes();
|
||||
if (huge_page_size) {
|
||||
ASSERT_TRUE(mem_usage == kBlockSize + Arena::kInlineSize ||
|
||||
mem_usage == huge_page_size + Arena::kInlineSize);
|
||||
ASSERT_TRUE(
|
||||
CheckMemoryAllocated(mem_usage, kBlockSize + Arena::kInlineSize) ||
|
||||
CheckMemoryAllocated(mem_usage, huge_page_size + Arena::kInlineSize));
|
||||
} else {
|
||||
ASSERT_EQ(mem_usage, kBlockSize + Arena::kInlineSize);
|
||||
ASSERT_PRED2(CheckMemoryAllocated, mem_usage,
|
||||
kBlockSize + Arena::kInlineSize);
|
||||
}
|
||||
auto usage = arena.ApproximateMemoryUsage();
|
||||
ASSERT_LT(usage, mem_usage);
|
||||
|
@ -73,6 +73,8 @@ void CompactionJobStats::Add(const CompactionJobStats& stats) {
|
||||
|
||||
void CompactionJobStats::Reset() {}
|
||||
|
||||
void CompactionJobStats::Add(const CompactionJobStats& stats) {}
|
||||
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -620,7 +620,7 @@ inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input,
|
||||
size_t compressBound = ZSTD_compressBound(length);
|
||||
output->resize(static_cast<size_t>(output_header_len + compressBound));
|
||||
size_t outlen = ZSTD_compress(&(*output)[output_header_len], compressBound,
|
||||
input, length);
|
||||
input, length, 1 /* level */);
|
||||
if (outlen == 0) {
|
||||
return false;
|
||||
}
|
||||
|
@ -83,8 +83,8 @@ class AtomicCounter {
|
||||
uint64_t start = env_->NowMicros();
|
||||
while (count_ < count) {
|
||||
uint64_t now = env_->NowMicros();
|
||||
cond_count_.TimedWait(now + /*1s*/ 1 * 000 * 000);
|
||||
if (env_->NowMicros() - start > /*10s*/ 10 * 000 * 000) {
|
||||
cond_count_.TimedWait(now + /*1s*/ 1 * 1000 * 1000);
|
||||
if (env_->NowMicros() - start > /*10s*/ 10 * 1000 * 1000) {
|
||||
return false;
|
||||
}
|
||||
if (count_ < count) {
|
||||
|
@ -99,14 +99,15 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
|
||||
"DeleteSchedulerImpl::BackgroundEmptyTrash:Wait",
|
||||
[&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
int num_files = 100; // 100 files
|
||||
uint64_t file_size = 1024; // every file is 1 kb
|
||||
std::vector<uint64_t> delete_kbs_per_sec = {512, 200, 100, 50, 25};
|
||||
|
||||
for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
|
||||
penalties.clear();
|
||||
rocksdb::SyncPoint::GetInstance()->ClearTrace();
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
DestroyAndCreateDir(dummy_files_dir_);
|
||||
rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
|
||||
delete_scheduler_.reset(
|
||||
@ -130,6 +131,9 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
|
||||
delete_scheduler_->WaitForEmptyTrash();
|
||||
uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
|
||||
|
||||
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
|
||||
ASSERT_EQ(bg_errors.size(), 0);
|
||||
|
||||
uint64_t total_files_size = 0;
|
||||
uint64_t expected_penlty = 0;
|
||||
ASSERT_EQ(penalties.size(), num_files);
|
||||
@ -141,10 +145,8 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
|
||||
ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
|
||||
|
||||
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
|
||||
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
|
||||
ASSERT_EQ(bg_errors.size(), 0);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
// Same as the BasicRateLimiting test but delete files in multiple threads.
|
||||
@ -165,7 +167,6 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
|
||||
"DeleteSchedulerImpl::BackgroundEmptyTrash:Wait",
|
||||
[&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
int thread_cnt = 10;
|
||||
int num_files = 10; // 10 files per thread
|
||||
uint64_t file_size = 1024; // every file is 1 kb
|
||||
@ -173,6 +174,9 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
|
||||
std::vector<uint64_t> delete_kbs_per_sec = {512, 200, 100, 50, 25};
|
||||
for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
|
||||
penalties.clear();
|
||||
rocksdb::SyncPoint::GetInstance()->ClearTrace();
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
DestroyAndCreateDir(dummy_files_dir_);
|
||||
rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
|
||||
delete_scheduler_.reset(
|
||||
@ -210,6 +214,9 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
|
||||
delete_scheduler_->WaitForEmptyTrash();
|
||||
uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
|
||||
|
||||
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
|
||||
ASSERT_EQ(bg_errors.size(), 0);
|
||||
|
||||
uint64_t total_files_size = 0;
|
||||
uint64_t expected_penlty = 0;
|
||||
ASSERT_EQ(penalties.size(), num_files * thread_cnt);
|
||||
@ -222,10 +229,8 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
|
||||
|
||||
ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
|
||||
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
|
||||
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
|
||||
ASSERT_EQ(bg_errors.size(), 0);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
// Disable rate limiting by setting rate_bytes_per_sec_ to 0 and make sure
|
||||
|
@ -31,7 +31,7 @@ class SequentialFileReader {
|
||||
SequentialFile* file() { return file_.get(); }
|
||||
};
|
||||
|
||||
class RandomAccessFileReader : public RandomAccessFile {
|
||||
class RandomAccessFileReader {
|
||||
private:
|
||||
std::unique_ptr<RandomAccessFile> file_;
|
||||
Env* env_;
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include <assert.h>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <unordered_set>
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "util/mutexlock.h"
|
||||
#include "util/random.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -272,5 +273,50 @@ class NullLogger : public Logger {
|
||||
// Corrupts key by changing the type
|
||||
extern void CorruptKeyType(InternalKey* ikey);
|
||||
|
||||
class SleepingBackgroundTask {
|
||||
public:
|
||||
SleepingBackgroundTask()
|
||||
: bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {}
|
||||
void DoSleep() {
|
||||
MutexLock l(&mutex_);
|
||||
while (should_sleep_) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
done_with_sleep_ = true;
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
void WakeUp() {
|
||||
MutexLock l(&mutex_);
|
||||
should_sleep_ = false;
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
void WaitUntilDone() {
|
||||
MutexLock l(&mutex_);
|
||||
while (!done_with_sleep_) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
}
|
||||
bool WokenUp() {
|
||||
MutexLock l(&mutex_);
|
||||
return should_sleep_ == false;
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
MutexLock l(&mutex_);
|
||||
should_sleep_ = true;
|
||||
done_with_sleep_ = false;
|
||||
}
|
||||
|
||||
static void DoSleepTask(void* arg) {
|
||||
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
|
||||
}
|
||||
|
||||
private:
|
||||
port::Mutex mutex_;
|
||||
port::CondVar bg_cv_; // Signalled when background work finishes
|
||||
bool should_sleep_;
|
||||
bool done_with_sleep_;
|
||||
};
|
||||
|
||||
} // namespace test
|
||||
} // namespace rocksdb
|
||||
|
@ -10,6 +10,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/transaction_log.h"
|
||||
#include "util/file_util.h"
|
||||
#include "port/port.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -131,7 +132,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
(type == kDescriptorFile) ? manifest_file_size : 0);
|
||||
}
|
||||
}
|
||||
Log(db_->GetOptions().info_log, "Number of log files %ld",
|
||||
Log(db_->GetOptions().info_log, "Number of log files %" ROCKSDB_PRIszt,
|
||||
live_wal_files.size());
|
||||
|
||||
// Link WAL files. Copy exact size of last one because it is the only one
|
||||
|
@ -38,7 +38,6 @@ OptimisticTransactionImpl::~OptimisticTransactionImpl() {
|
||||
|
||||
void OptimisticTransactionImpl::Clear() {
|
||||
TransactionBaseImpl::Clear();
|
||||
tracked_keys_.clear();
|
||||
}
|
||||
|
||||
Status OptimisticTransactionImpl::Commit() {
|
||||
@ -83,18 +82,7 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||
|
||||
std::string key_str = key.ToString();
|
||||
|
||||
auto iter = tracked_keys_[cfh_id].find(key_str);
|
||||
if (iter == tracked_keys_[cfh_id].end()) {
|
||||
// key not yet seen, store it.
|
||||
tracked_keys_[cfh_id].insert({std::move(key_str), seq});
|
||||
} else {
|
||||
SequenceNumber old_seq = iter->second;
|
||||
if (seq < old_seq) {
|
||||
// Snapshot has changed since we last saw this key, need to
|
||||
// store the earliest seen sequence number.
|
||||
tracked_keys_[cfh_id][key_str] = seq;
|
||||
}
|
||||
}
|
||||
TrackKey(cfh_id, key_str, seq);
|
||||
|
||||
// Always return OK. Confilct checking will happen at commit time.
|
||||
return Status::OK();
|
||||
@ -113,19 +101,7 @@ Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) {
|
||||
assert(dynamic_cast<DBImpl*>(db) != nullptr);
|
||||
auto db_impl = reinterpret_cast<DBImpl*>(db);
|
||||
|
||||
return TransactionUtil::CheckKeysForConflicts(db_impl, &tracked_keys_);
|
||||
}
|
||||
|
||||
uint64_t OptimisticTransactionImpl::GetNumKeys() const {
|
||||
uint64_t count = 0;
|
||||
|
||||
// sum up locked keys in all column families
|
||||
for (const auto& key_map_iter : tracked_keys_) {
|
||||
const auto& keys = key_map_iter.second;
|
||||
count += keys.size();
|
||||
}
|
||||
|
||||
return count;
|
||||
return TransactionUtil::CheckKeysForConflicts(db_impl, GetTrackedKeys());
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -38,10 +38,6 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
|
||||
|
||||
void Rollback() override;
|
||||
|
||||
uint64_t GetNumKeys() const override;
|
||||
|
||||
const TransactionKeyMap* GetTrackedKeys() const { return &tracked_keys_; }
|
||||
|
||||
protected:
|
||||
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
|
||||
bool untracked = false) override;
|
||||
@ -49,12 +45,6 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
|
||||
private:
|
||||
OptimisticTransactionDB* const txn_db_;
|
||||
|
||||
// Map of Column Family IDs to keys and corresponding sequence numbers.
|
||||
// The sequence number stored for a key will be used during commit to make
|
||||
// sure this key has
|
||||
// not changed since this sequence number.
|
||||
TransactionKeyMap tracked_keys_;
|
||||
|
||||
friend class OptimisticTransactionCallback;
|
||||
|
||||
// Returns OK if it is safe to commit this transaction. Returns Status::Busy
|
||||
|
@ -28,6 +28,7 @@ TransactionBaseImpl::~TransactionBaseImpl() {}
|
||||
void TransactionBaseImpl::Clear() {
|
||||
save_points_.reset(nullptr);
|
||||
write_batch_->Clear();
|
||||
tracked_keys_.clear();
|
||||
num_puts_ = 0;
|
||||
num_deletes_ = 0;
|
||||
num_merges_ = 0;
|
||||
@ -71,12 +72,25 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
|
||||
num_deletes_ = save_point.num_deletes_;
|
||||
num_merges_ = save_point.num_merges_;
|
||||
|
||||
save_points_->pop();
|
||||
|
||||
// Rollback batch
|
||||
Status s = write_batch_->RollbackToSavePoint();
|
||||
assert(s.ok());
|
||||
|
||||
// Rollback any keys that were tracked since the last savepoint
|
||||
const TransactionKeyMap* key_map = GetTrackedKeysSinceSavePoint();
|
||||
assert(key_map);
|
||||
for (auto& key_map_iter : *key_map) {
|
||||
uint32_t column_family_id = key_map_iter.first;
|
||||
auto& keys = key_map_iter.second;
|
||||
|
||||
for (auto& key_iter : keys) {
|
||||
const std::string& key = key_iter.first;
|
||||
tracked_keys_[column_family_id].erase(key);
|
||||
}
|
||||
}
|
||||
|
||||
save_points_->pop();
|
||||
|
||||
return s;
|
||||
} else {
|
||||
assert(write_batch_->RollbackToSavePoint().IsNotFound());
|
||||
@ -306,6 +320,42 @@ uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
|
||||
|
||||
uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
|
||||
|
||||
uint64_t TransactionBaseImpl::GetNumKeys() const {
|
||||
uint64_t count = 0;
|
||||
|
||||
// sum up locked keys in all column families
|
||||
for (const auto& key_map_iter : tracked_keys_) {
|
||||
const auto& keys = key_map_iter.second;
|
||||
count += keys.size();
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
|
||||
SequenceNumber seq) {
|
||||
auto iter = tracked_keys_[cfh_id].find(key);
|
||||
if (iter == tracked_keys_[cfh_id].end()) {
|
||||
tracked_keys_[cfh_id].insert({key, seq});
|
||||
|
||||
if (save_points_ != nullptr && !save_points_->empty()) {
|
||||
// Aren't tracking this key, add it.
|
||||
save_points_->top().new_keys_[cfh_id][key] = seq;
|
||||
}
|
||||
} else if (seq < iter->second) {
|
||||
// Now tracking this key with an earlier sequence number
|
||||
iter->second = seq;
|
||||
}
|
||||
}
|
||||
|
||||
const TransactionKeyMap* TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
|
||||
if (save_points_ != nullptr && !save_points_->empty()) {
|
||||
return &save_points_->top().new_keys_;
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "rocksdb/utilities/transaction.h"
|
||||
#include "rocksdb/utilities/transaction_db.h"
|
||||
#include "rocksdb/utilities/write_batch_with_index.h"
|
||||
#include "utilities/transactions/transaction_util.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -166,7 +167,19 @@ class TransactionBaseImpl : public Transaction {
|
||||
|
||||
uint64_t GetNumMerges() const override;
|
||||
|
||||
uint64_t GetNumKeys() const override;
|
||||
|
||||
// Get list of keys in this transaction that must not have any conflicts
|
||||
// with writes in other transactions.
|
||||
const TransactionKeyMap& GetTrackedKeys() const { return tracked_keys_; }
|
||||
|
||||
protected:
|
||||
// Add a key to the list of tracked keys.
|
||||
// seqno is the earliest seqno this key was involved with this transaction.
|
||||
void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno);
|
||||
|
||||
const TransactionKeyMap* GetTrackedKeysSinceSavePoint();
|
||||
|
||||
DB* const db_;
|
||||
|
||||
const WriteOptions write_options_;
|
||||
@ -194,6 +207,9 @@ class TransactionBaseImpl : public Transaction {
|
||||
uint64_t num_deletes_;
|
||||
uint64_t num_merges_;
|
||||
|
||||
// Record all keys tracked since the last savepoint
|
||||
TransactionKeyMap new_keys_;
|
||||
|
||||
SavePoint(std::shared_ptr<ManagedSnapshot> snapshot, uint64_t num_puts,
|
||||
uint64_t num_deletes, uint64_t num_merges)
|
||||
: snapshot_(snapshot),
|
||||
@ -202,11 +218,18 @@ class TransactionBaseImpl : public Transaction {
|
||||
num_merges_(num_merges) {}
|
||||
};
|
||||
|
||||
private:
|
||||
// Stack of the Snapshot saved at each save point. Saved snapshots may be
|
||||
// nullptr if there was no snapshot at the time SetSavePoint() was called.
|
||||
std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint>> save_points_;
|
||||
|
||||
private:
|
||||
// Map from column_family_id to map of keys that are involved in this
|
||||
// transaction.
|
||||
// Pessimistic Transactions will do conflict checking before adding a key
|
||||
// by calling TrackKey().
|
||||
// Optimistic Transactions will wait till commit time to do conflict checking.
|
||||
TransactionKeyMap tracked_keys_;
|
||||
|
||||
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
|
||||
bool untracked = false);
|
||||
};
|
||||
|
@ -141,7 +141,8 @@ Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id,
|
||||
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv());
|
||||
}
|
||||
|
||||
void TransactionDBImpl::UnLock(TransactionImpl* txn, TransactionKeyMap* keys) {
|
||||
void TransactionDBImpl::UnLock(TransactionImpl* txn,
|
||||
const TransactionKeyMap* keys) {
|
||||
lock_mgr_.UnLock(txn, keys, GetEnv());
|
||||
}
|
||||
|
||||
|
@ -54,7 +54,7 @@ class TransactionDBImpl : public TransactionDB {
|
||||
|
||||
Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key);
|
||||
|
||||
void UnLock(TransactionImpl* txn, TransactionKeyMap* keys);
|
||||
void UnLock(TransactionImpl* txn, const TransactionKeyMap* keys);
|
||||
void UnLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key);
|
||||
|
||||
void AddColumnFamily(const ColumnFamilyHandle* handle);
|
||||
|
@ -58,14 +58,12 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
|
||||
}
|
||||
|
||||
TransactionImpl::~TransactionImpl() {
|
||||
txn_db_impl_->UnLock(this, &tracked_keys_);
|
||||
txn_db_impl_->UnLock(this, &GetTrackedKeys());
|
||||
}
|
||||
|
||||
void TransactionImpl::Clear() {
|
||||
txn_db_impl_->UnLock(this, &GetTrackedKeys());
|
||||
TransactionBaseImpl::Clear();
|
||||
|
||||
txn_db_impl_->UnLock(this, &tracked_keys_);
|
||||
tracked_keys_.clear();
|
||||
}
|
||||
|
||||
bool TransactionImpl::IsExpired() const {
|
||||
@ -126,6 +124,16 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) {
|
||||
|
||||
void TransactionImpl::Rollback() { Clear(); }
|
||||
|
||||
Status TransactionImpl::RollbackToSavePoint() {
|
||||
// Unlock any keys locked since last transaction
|
||||
const TransactionKeyMap* keys = GetTrackedKeysSinceSavePoint();
|
||||
if (keys) {
|
||||
txn_db_impl_->UnLock(this, keys);
|
||||
}
|
||||
|
||||
return TransactionBaseImpl::RollbackToSavePoint();
|
||||
}
|
||||
|
||||
// Lock all keys in this batch.
|
||||
// On success, caller should unlock keys_to_unlock
|
||||
Status TransactionImpl::LockBatch(WriteBatch* batch,
|
||||
@ -219,21 +227,26 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||
// TODO(agiardullo): could optimize by supporting shared txn locks in the
|
||||
// future
|
||||
bool check_snapshot = !untracked;
|
||||
SequenceNumber tracked_seqno = kMaxSequenceNumber;
|
||||
|
||||
// Lookup whether this key has already been locked by this transaction
|
||||
const auto& tracked_keys = GetTrackedKeys();
|
||||
const auto tracked_keys_cf = tracked_keys.find(cfh_id);
|
||||
if (tracked_keys_cf == tracked_keys.end()) {
|
||||
previously_locked = false;
|
||||
} else {
|
||||
auto iter = tracked_keys_cf->second.find(key_str);
|
||||
if (iter == tracked_keys_cf->second.end()) {
|
||||
previously_locked = false;
|
||||
} else {
|
||||
previously_locked = true;
|
||||
tracked_seqno = iter->second;
|
||||
}
|
||||
}
|
||||
|
||||
// lock this key if this transactions hasn't already locked it
|
||||
auto iter = tracked_keys_[cfh_id].find(key_str);
|
||||
if (iter == tracked_keys_[cfh_id].end()) {
|
||||
previously_locked = false;
|
||||
|
||||
if (!previously_locked) {
|
||||
s = txn_db_impl_->TryLock(this, cfh_id, key_str);
|
||||
|
||||
if (s.ok()) {
|
||||
// Record that we've locked this key
|
||||
auto result = tracked_keys_[cfh_id].insert({key_str, kMaxSequenceNumber});
|
||||
iter = result.first;
|
||||
}
|
||||
} else {
|
||||
previously_locked = true;
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
@ -244,17 +257,17 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||
// key has not been modified after. This is useful if this same
|
||||
// transaction
|
||||
// later tries to lock this key again.
|
||||
if (iter->second == kMaxSequenceNumber) {
|
||||
if (tracked_seqno == kMaxSequenceNumber) {
|
||||
// Since we haven't checked a snapshot, we only know this key has not
|
||||
// been modified since after we locked it.
|
||||
iter->second = db_->GetLatestSequenceNumber();
|
||||
tracked_seqno = db_->GetLatestSequenceNumber();
|
||||
}
|
||||
} else {
|
||||
// If the key has been previous validated at a sequence number earlier
|
||||
// than the curent snapshot's sequence number, we already know it has not
|
||||
// been modified.
|
||||
SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber();
|
||||
bool already_validated = iter->second <= seq;
|
||||
bool already_validated = tracked_seqno <= seq;
|
||||
|
||||
if (!already_validated) {
|
||||
s = CheckKeySequence(column_family, key);
|
||||
@ -262,19 +275,23 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||
if (s.ok()) {
|
||||
// Record that there have been no writes to this key after this
|
||||
// sequence.
|
||||
iter->second = seq;
|
||||
tracked_seqno = seq;
|
||||
} else {
|
||||
// Failed to validate key
|
||||
if (!previously_locked) {
|
||||
// Unlock key we just locked
|
||||
txn_db_impl_->UnLock(this, cfh_id, key.ToString());
|
||||
tracked_keys_[cfh_id].erase(iter);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
// Let base class know we've conflict checked this key.
|
||||
TrackKey(cfh_id, key_str, tracked_seqno);
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
@ -298,18 +315,6 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family,
|
||||
return result;
|
||||
}
|
||||
|
||||
uint64_t TransactionImpl::GetNumKeys() const {
|
||||
uint64_t count = 0;
|
||||
|
||||
// sum up locked keys in all column families
|
||||
for (const auto& key_map_iter : tracked_keys_) {
|
||||
const auto& keys = key_map_iter.second;
|
||||
count += keys.size();
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -44,7 +44,7 @@ class TransactionImpl : public TransactionBaseImpl {
|
||||
|
||||
void Rollback() override;
|
||||
|
||||
uint64_t GetNumKeys() const override;
|
||||
Status RollbackToSavePoint() override;
|
||||
|
||||
// Generate a new unique transaction identifier
|
||||
static TransactionID GenTxnID();
|
||||
@ -86,12 +86,6 @@ class TransactionImpl : public TransactionBaseImpl {
|
||||
// Timeout in microseconds when locking a key or -1 if there is no timeout.
|
||||
int64_t lock_timeout_;
|
||||
|
||||
// Map from column_family_id to map of keys to Sequence Numbers. Stores keys
|
||||
// that have been locked.
|
||||
// The key is known to not have been modified after the Sequence Number
|
||||
// stored.
|
||||
TransactionKeyMap tracked_keys_;
|
||||
|
||||
void Clear() override;
|
||||
|
||||
Status CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key);
|
||||
|
@ -12,6 +12,8 @@
|
||||
#include "rocksdb/utilities/transaction_db.h"
|
||||
#include "util/logging.h"
|
||||
#include "util/testharness.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
#include "utilities/merge_operators/string_append/stringappend.h"
|
||||
|
||||
using std::string;
|
||||
|
||||
@ -28,6 +30,7 @@ class TransactionTest : public testing::Test {
|
||||
TransactionTest() {
|
||||
options.create_if_missing = true;
|
||||
options.max_write_buffer_number = 2;
|
||||
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
|
||||
dbname = test::TmpDir() + "/transaction_testdb";
|
||||
|
||||
DestroyDB(dbname, options);
|
||||
@ -1488,6 +1491,105 @@ TEST_F(TransactionTest, SavepointTest) {
|
||||
delete txn;
|
||||
}
|
||||
|
||||
TEST_F(TransactionTest, SavepointTest2) {
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options, snapshot_read_options;
|
||||
TransactionOptions txn_options;
|
||||
string value;
|
||||
Status s;
|
||||
|
||||
txn_options.lock_timeout = 1; // 1 ms
|
||||
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn1);
|
||||
|
||||
s = txn1->Put("A", "");
|
||||
ASSERT_OK(s);
|
||||
|
||||
txn1->SetSavePoint(); // 1
|
||||
|
||||
s = txn1->Put("A", "a");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn1->Put("C", "c");
|
||||
ASSERT_OK(s);
|
||||
|
||||
txn1->SetSavePoint(); // 2
|
||||
|
||||
s = txn1->Put("A", "a");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put("B", "b");
|
||||
ASSERT_OK(s);
|
||||
|
||||
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2
|
||||
|
||||
// Verify that "A" and "C" is still locked while "B" is not
|
||||
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn2);
|
||||
|
||||
s = txn2->Put("A", "a2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("C", "c2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("B", "b2");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn1->Put("A", "aa");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put("B", "bb");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
|
||||
s = txn2->Commit();
|
||||
ASSERT_OK(s);
|
||||
delete txn2;
|
||||
|
||||
s = txn1->Put("A", "aaa");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put("B", "bbb");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put("C", "ccc");
|
||||
ASSERT_OK(s);
|
||||
|
||||
txn1->SetSavePoint(); // 3
|
||||
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3
|
||||
|
||||
// Verify that "A", "B", "C" are still locked
|
||||
txn2 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn2);
|
||||
|
||||
s = txn2->Put("A", "a2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("B", "b2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("C", "c2");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
|
||||
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
|
||||
|
||||
// Verify that only "A" is locked
|
||||
s = txn2->Put("A", "a3");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
s = txn2->Put("B", "b3");
|
||||
ASSERT_OK(s);
|
||||
s = txn2->Put("C", "c3po");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn1->Commit();
|
||||
ASSERT_OK(s);
|
||||
delete txn1;
|
||||
|
||||
// Verify "A" "C" "B" are no longer locked
|
||||
s = txn2->Put("A", "a4");
|
||||
ASSERT_OK(s);
|
||||
s = txn2->Put("B", "b4");
|
||||
ASSERT_OK(s);
|
||||
s = txn2->Put("C", "c4");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn2->Commit();
|
||||
ASSERT_OK(s);
|
||||
delete txn2;
|
||||
}
|
||||
|
||||
TEST_F(TransactionTest, TimeoutTest) {
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
@ -1623,6 +1725,62 @@ TEST_F(TransactionTest, TimeoutTest) {
|
||||
delete txn2;
|
||||
}
|
||||
|
||||
TEST_F(TransactionTest, MergeTest) {
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
string value;
|
||||
Status s;
|
||||
|
||||
Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
|
||||
ASSERT_TRUE(txn);
|
||||
|
||||
s = db->Put(write_options, "A", "a0");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn->Merge("A", "1");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn->Merge("A", "2");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn->Get(read_options, "A", &value);
|
||||
ASSERT_TRUE(s.IsMergeInProgress());
|
||||
|
||||
s = txn->Put("A", "a");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn->Get(read_options, "A", &value);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("a", value);
|
||||
|
||||
s = txn->Merge("A", "3");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = txn->Get(read_options, "A", &value);
|
||||
ASSERT_TRUE(s.IsMergeInProgress());
|
||||
|
||||
TransactionOptions txn_options;
|
||||
txn_options.lock_timeout = 1; // 1 ms
|
||||
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn2);
|
||||
|
||||
// verify that txn has "A" locked
|
||||
s = txn2->Merge("A", "4");
|
||||
ASSERT_TRUE(s.IsTimedOut());
|
||||
|
||||
s = txn2->Commit();
|
||||
ASSERT_OK(s);
|
||||
delete txn2;
|
||||
|
||||
s = txn->Commit();
|
||||
ASSERT_OK(s);
|
||||
delete txn;
|
||||
|
||||
s = db->Get(read_options, "A", &value);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("a,3", value);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -100,11 +100,11 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
||||
return result;
|
||||
}
|
||||
|
||||
Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
|
||||
TransactionKeyMap* key_map) {
|
||||
Status TransactionUtil::CheckKeysForConflicts(
|
||||
DBImpl* db_impl, const TransactionKeyMap& key_map) {
|
||||
Status result;
|
||||
|
||||
for (auto& key_map_iter : *key_map) {
|
||||
for (auto& key_map_iter : key_map) {
|
||||
uint32_t cf_id = key_map_iter.first;
|
||||
const auto& keys = key_map_iter.second;
|
||||
|
||||
|
@ -46,7 +46,8 @@ class TransactionUtil {
|
||||
//
|
||||
// REQUIRED: this function should only be called on the write thread or if the
|
||||
// mutex is held.
|
||||
static Status CheckKeysForConflicts(DBImpl* db_impl, TransactionKeyMap* keys);
|
||||
static Status CheckKeysForConflicts(DBImpl* db_impl,
|
||||
const TransactionKeyMap& keys);
|
||||
|
||||
private:
|
||||
static Status CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
||||
|
@ -619,9 +619,9 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
|
||||
MergeContext merge_context;
|
||||
|
||||
WriteBatchWithIndexInternal::Result result =
|
||||
WriteBatchWithIndexInternal::GetFromBatch(options, this, column_family,
|
||||
key, &merge_context,
|
||||
&rep->comparator, value, &s);
|
||||
WriteBatchWithIndexInternal::GetFromBatch(
|
||||
options, this, column_family, key, &merge_context, &rep->comparator,
|
||||
value, rep->overwrite_key, &s);
|
||||
|
||||
switch (result) {
|
||||
case WriteBatchWithIndexInternal::Result::kFound:
|
||||
@ -662,8 +662,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
||||
std::string batch_value;
|
||||
WriteBatchWithIndexInternal::Result result =
|
||||
WriteBatchWithIndexInternal::GetFromBatch(
|
||||
options, this, column_family, key, &merge_context,
|
||||
&rep->comparator, &batch_value, &s);
|
||||
options, this, column_family, key, &merge_context, &rep->comparator,
|
||||
&batch_value, rep->overwrite_key, &s);
|
||||
|
||||
if (result == WriteBatchWithIndexInternal::Result::kFound) {
|
||||
value->assign(batch_value.data(), batch_value.size());
|
||||
@ -675,6 +675,14 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
||||
if (result == WriteBatchWithIndexInternal::Result::kError) {
|
||||
return s;
|
||||
}
|
||||
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
|
||||
rep->overwrite_key == true) {
|
||||
// Since we've overwritten keys, we do not know what other operations are
|
||||
// in this batch for this key, so we cannot do a Merge to compute the
|
||||
// result. Instead, we will simply return MergeInProgress.
|
||||
return Status::MergeInProgress();
|
||||
}
|
||||
|
||||
assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
|
||||
result == WriteBatchWithIndexInternal::Result::kNotFound);
|
||||
|
||||
|
@ -132,7 +132,7 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
|
||||
const DBOptions& options, WriteBatchWithIndex* batch,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
|
||||
std::string* value, Status* s) {
|
||||
std::string* value, bool overwrite_key, Status* s) {
|
||||
uint32_t cf_id = GetColumnFamilyID(column_family);
|
||||
*s = Status::OK();
|
||||
WriteBatchWithIndexInternal::Result result =
|
||||
@ -205,6 +205,13 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
|
||||
// We can stop iterating once we find a PUT or DELETE
|
||||
break;
|
||||
}
|
||||
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
|
||||
overwrite_key == true) {
|
||||
// Since we've overwritten keys, we do not know what other operations are
|
||||
// in this batch for this key, so we cannot do a Merge to compute the
|
||||
// result. Instead, we will simply return MergeInProgress.
|
||||
break;
|
||||
}
|
||||
|
||||
iter->Prev();
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ class WriteBatchWithIndexInternal {
|
||||
const DBOptions& options, WriteBatchWithIndex* batch,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
|
||||
std::string* value, Status* s);
|
||||
std::string* value, bool overwrite_key, Status* s);
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -971,7 +971,7 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) {
|
||||
|
||||
DestroyDB(dbname, options);
|
||||
Status s = DB::Open(options, dbname, &db);
|
||||
assert(s.ok());
|
||||
ASSERT_OK(s);
|
||||
|
||||
ColumnFamilyHandle* column_family = db->DefaultColumnFamily();
|
||||
WriteBatchWithIndex batch;
|
||||
@ -1009,6 +1009,66 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) {
|
||||
DestroyDB(dbname, options);
|
||||
}
|
||||
|
||||
TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge2) {
|
||||
DB* db;
|
||||
Options options;
|
||||
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
|
||||
options.create_if_missing = true;
|
||||
|
||||
std::string dbname = test::TmpDir() + "/write_batch_with_index_test";
|
||||
|
||||
DestroyDB(dbname, options);
|
||||
Status s = DB::Open(options, dbname, &db);
|
||||
ASSERT_OK(s);
|
||||
|
||||
ColumnFamilyHandle* column_family = db->DefaultColumnFamily();
|
||||
|
||||
// Test batch with overwrite_key=true
|
||||
WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
|
||||
std::string value;
|
||||
|
||||
s = batch.GetFromBatch(column_family, options, "X", &value);
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
|
||||
batch.Put(column_family, "X", "x");
|
||||
s = batch.GetFromBatch(column_family, options, "X", &value);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("x", value);
|
||||
|
||||
batch.Put(column_family, "X", "x2");
|
||||
s = batch.GetFromBatch(column_family, options, "X", &value);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("x2", value);
|
||||
|
||||
batch.Merge(column_family, "X", "aaa");
|
||||
s = batch.GetFromBatch(column_family, options, "X", &value);
|
||||
ASSERT_TRUE(s.IsMergeInProgress());
|
||||
|
||||
batch.Merge(column_family, "X", "bbb");
|
||||
s = batch.GetFromBatch(column_family, options, "X", &value);
|
||||
ASSERT_TRUE(s.IsMergeInProgress());
|
||||
|
||||
batch.Put(column_family, "X", "x3");
|
||||
s = batch.GetFromBatch(column_family, options, "X", &value);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("x3", value);
|
||||
|
||||
batch.Merge(column_family, "X", "ccc");
|
||||
s = batch.GetFromBatch(column_family, options, "X", &value);
|
||||
ASSERT_TRUE(s.IsMergeInProgress());
|
||||
|
||||
batch.Delete(column_family, "X");
|
||||
s = batch.GetFromBatch(column_family, options, "X", &value);
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
|
||||
batch.Merge(column_family, "X", "ddd");
|
||||
s = batch.GetFromBatch(column_family, options, "X", &value);
|
||||
ASSERT_TRUE(s.IsMergeInProgress());
|
||||
|
||||
delete db;
|
||||
DestroyDB(dbname, options);
|
||||
}
|
||||
|
||||
TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) {
|
||||
DB* db;
|
||||
Options options;
|
||||
@ -1017,7 +1077,7 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) {
|
||||
|
||||
DestroyDB(dbname, options);
|
||||
Status s = DB::Open(options, dbname, &db);
|
||||
assert(s.ok());
|
||||
ASSERT_OK(s);
|
||||
|
||||
WriteBatchWithIndex batch;
|
||||
ReadOptions read_options;
|
||||
@ -1185,6 +1245,54 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) {
|
||||
DestroyDB(dbname, options);
|
||||
}
|
||||
|
||||
TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge2) {
|
||||
DB* db;
|
||||
Options options;
|
||||
|
||||
options.create_if_missing = true;
|
||||
std::string dbname = test::TmpDir() + "/write_batch_with_index_test";
|
||||
|
||||
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
|
||||
|
||||
DestroyDB(dbname, options);
|
||||
Status s = DB::Open(options, dbname, &db);
|
||||
assert(s.ok());
|
||||
|
||||
// Test batch with overwrite_key=true
|
||||
WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
|
||||
|
||||
ReadOptions read_options;
|
||||
WriteOptions write_options;
|
||||
std::string value;
|
||||
|
||||
s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
|
||||
batch.Merge("A", "xxx");
|
||||
|
||||
s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
|
||||
ASSERT_TRUE(s.IsMergeInProgress());
|
||||
|
||||
batch.Merge("A", "yyy");
|
||||
|
||||
s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
|
||||
ASSERT_TRUE(s.IsMergeInProgress());
|
||||
|
||||
s = db->Put(write_options, "A", "a0");
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
|
||||
ASSERT_TRUE(s.IsMergeInProgress());
|
||||
|
||||
batch.Delete("A");
|
||||
|
||||
s = batch.GetFromBatchAndDB(db, read_options, "A", &value);
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
|
||||
delete db;
|
||||
DestroyDB(dbname, options);
|
||||
}
|
||||
|
||||
void AssertKey(std::string key, WBWIIterator* iter) {
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(key, iter->Entry().key.ToString());
|
||||
|
Loading…
Reference in New Issue
Block a user