Compare commits
19 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
32fda17a2e | ||
|
2594970c39 | ||
|
1b4234f331 | ||
|
54ceceafb3 | ||
|
9ec602f5f7 | ||
|
6d1896f4ca | ||
|
98084910a7 | ||
|
fbd1c98255 | ||
|
c743289d81 | ||
|
2be165b291 | ||
|
0f4c98d305 | ||
|
c75f4faa9d | ||
|
56f497bb21 | ||
|
051c88dd79 | ||
|
071893eb27 | ||
|
7d7234a9c8 | ||
|
d641bb8fb0 | ||
|
e98b76dd9b | ||
|
4001e799ab |
@ -264,6 +264,7 @@ set(SOURCES
|
||||
utilities/merge_operators/put.cc
|
||||
utilities/merge_operators/max.cc
|
||||
utilities/merge_operators/uint64add.cc
|
||||
utilities/option_change_migration/option_change_migration.cc
|
||||
utilities/options/options_util.cc
|
||||
utilities/persistent_cache/persistent_cache_tier.cc
|
||||
utilities/persistent_cache/volatile_tier_impl.cc
|
||||
@ -432,6 +433,7 @@ set(TESTS
|
||||
utilities/geodb/geodb_test.cc
|
||||
utilities/memory/memory_test.cc
|
||||
utilities/merge_operators/string_append/stringappend_test.cc
|
||||
utilities/option_change_migration/option_change_migration_test.cc
|
||||
utilities/options/options_util_test.cc
|
||||
utilities/persistent_cache/hash_table_test.cc
|
||||
utilities/persistent_cache/persistent_cache_test.cc
|
||||
|
21
HISTORY.md
21
HISTORY.md
@ -1,10 +1,27 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
## 4.11.2 (9/15/2016)
|
||||
### Bug fixes
|
||||
* Segfault when failing to open an SST file for read-ahead iterators.
|
||||
* WAL without data for all CFs is not deleted after recovery.
|
||||
|
||||
## 4.11.1 (8/30/2016)
|
||||
### Bug Fixes
|
||||
* Mitigate the regression bug of deadlock condition during recovery when options.max_successive_merges hits.
|
||||
* Fix data race condition related to hash index in block based table when putting indexes in the block cache.
|
||||
|
||||
## 4.11.0 (8/1/2016)
|
||||
### Public API Change
|
||||
* options.memtable_prefix_bloom_huge_page_tlb_size => memtable_huge_page_size. When it is set, RocksDB will try to allocate memory from huge page for memtable too, rather than just memtable bloom filter.
|
||||
|
||||
### New Features
|
||||
* A tool to migrate DB after options change. See include/rocksdb/utilities/option_change_migration.h.
|
||||
* Add ReadOptions.background_purge_on_iterator_cleanup. If true, we avoid file deletion when destorying iterators.
|
||||
|
||||
## 4.10.0 (7/5/2016)
|
||||
### Public API Change
|
||||
* options.memtable_prefix_bloom_bits changes to options.memtable_prefix_bloom_bits_ratio and deprecate options.memtable_prefix_bloom_probes
|
||||
* enum type CompressionType and PerfLevel changes from char to unsigned char. Value of all PerfLevel shift by one.
|
||||
* Deprecate options.filter_deletes.
|
||||
* options.memtable_prefix_bloom_huge_page_tlb_size => memtable_huge_page_size. When it is set, RocksDB will try to allocate memory from huge page for memtable too, rather than just memtable bloom filter.
|
||||
|
||||
### New Features
|
||||
* Add avoid_flush_during_recovery option.
|
||||
|
10
Makefile
10
Makefile
@ -216,10 +216,6 @@ default: all
|
||||
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
|
||||
-Wno-unused-parameter
|
||||
|
||||
ifndef DISABLE_WARNING_AS_ERROR
|
||||
WARNING_FLAGS += -Werror
|
||||
endif
|
||||
|
||||
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
|
||||
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers
|
||||
|
||||
@ -374,6 +370,7 @@ TESTS = \
|
||||
heap_test \
|
||||
compact_on_deletion_collector_test \
|
||||
compaction_job_stats_test \
|
||||
option_change_migration_test \
|
||||
transaction_test \
|
||||
ldb_cmd_test \
|
||||
iostats_context_test \
|
||||
@ -865,7 +862,7 @@ arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
autovector_test: util/autovector_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
column_family_test: db/column_family_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
column_family_test: db/column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
table_properties_collector_test: db/table_properties_collector_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
@ -886,6 +883,9 @@ cache_test: util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
coding_test: util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
option_change_migration_test: utilities/option_change_migration/option_change_migration_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
|
@ -52,12 +52,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
FBCODE_BUILD="true"
|
||||
# If we're compiling with TSAN we need pic build
|
||||
PIC_BUILD=$COMPILE_WITH_TSAN
|
||||
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
# we need this to build with MySQL. Don't use for other purposes.
|
||||
source "$PWD/build_tools/fbcode_config4.8.1.sh"
|
||||
fi
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
fi
|
||||
|
||||
# Delete existing output, if it exists
|
||||
|
@ -1,16 +1,19 @@
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/ebc96bc2fb751b5a0300b8d91a95bdf24ac1d88b/4.9.x/centos6-native/108cf83
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/b91de48a4974ec839946d824402b098d43454cef/stable/centos6-native/7aaccbe
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/53e0eac8911888a105aa98b9a35fe61cf1d8b278/4.9.x/gcc-4.9-glibc-2.20/024dbc3
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/ee36ac9a72dfac4a995f1b215bb4c0fc8a0f6f91/2.20/gcc-4.9-glibc-2.20/500e281
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2b24f1e99454f9ca7b0301720f94836dae1bf71b/1.2.8/gcc-5-glibc-2.23/9bc6787
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/af7c14c9b652cdd5ec34eadd25c3f38a9b306c5d/1.0.6/gcc-5-glibc-2.23/9bc6787
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/1408484d03b15492aa54b10356104e9dc22e1cc5/0.6.1/gcc-5-glibc-2.23/9bc6787
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/5a5c7a6608cb32f1e1e7f814023d5bdfbd136370/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/a4c2adecffcaa68d5585d06be2252e3efa52555b/master/gcc-5-glibc-2.23/1c32b4b
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/1abc0d3c01743b854676423cf2d3629912f34930/2.0.8/gcc-5-glibc-2.23/9bc6787
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/f24be37d170e04be6e469af487644d4d62e1c6c1/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/9d65c666b9adf8f2a989fd4b98a9a5e7d3afa233/2.26/centos6-native/da39a3e
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/9cee5a3628dc9d4b93897972c58eba865e25b270/3.10.0/gcc-4.9-glibc-2.20/e9936bf
|
||||
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
|
||||
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
|
||||
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832
|
||||
|
@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
|
||||
CFLAGS=""
|
||||
|
||||
# libgcc
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
|
||||
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
|
||||
|
||||
# glibc
|
||||
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
|
||||
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
|
||||
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
|
||||
CFLAGS+=" -DLZ4"
|
||||
|
||||
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
|
||||
CFLAGS+=" -DZSTD"
|
||||
fi
|
||||
|
||||
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
|
||||
else
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DZSTD"
|
||||
|
||||
# location of gflags headers and libraries
|
||||
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
|
||||
else
|
||||
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DGFLAGS=google"
|
||||
CFLAGS+=" -DGFLAGS=gflags"
|
||||
|
||||
# location of jemalloc
|
||||
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
|
||||
@ -72,13 +76,22 @@ if test -z $PIC_BUILD; then
|
||||
LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a"
|
||||
fi
|
||||
|
||||
# location of TBB
|
||||
TBB_INCLUDE=" -isystem $TBB_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
TBB_LIBS="$TBB_BASE/lib/libtbb.a"
|
||||
else
|
||||
TBB_LIBS="$TBB_BASE/lib/libtbb_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DTBB"
|
||||
|
||||
# use Intel SSE support for checksum calculations
|
||||
export USE_SSE=1
|
||||
|
||||
BINUTILS="$BINUTILS_BASE/bin"
|
||||
AR="$BINUTILS/ar"
|
||||
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE"
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
|
||||
|
||||
STDLIBS="-L $GCC_BASE/lib64"
|
||||
|
||||
@ -95,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
|
||||
CXX="$GCC_BASE/bin/g++"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
JEMALLOC=1
|
||||
else
|
||||
# clang
|
||||
@ -107,8 +120,8 @@ else
|
||||
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $CLANG_INCLUDE"
|
||||
@ -119,18 +132,21 @@ else
|
||||
fi
|
||||
|
||||
CFLAGS+=" $DEPS_INCLUDE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
|
||||
CXXFLAGS+=" $CFLAGS"
|
||||
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
|
||||
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" $LIBUNWIND"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
|
||||
# required by libtbb
|
||||
EXEC_LDFLAGS+=" -ldl"
|
||||
|
||||
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
|
||||
|
||||
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS"
|
||||
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
|
||||
|
||||
VALGRIND_VER="$VALGRIND_BASE/bin/"
|
||||
|
||||
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD
|
||||
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD
|
||||
|
@ -558,8 +558,9 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
"(waiting for flush), max_write_buffer_number is set to %d",
|
||||
name_.c_str(), imm()->NumNotFlushed(),
|
||||
mutable_cf_options.max_write_buffer_number);
|
||||
} else if (vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_stop_writes_trigger) {
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_stop_writes_trigger) {
|
||||
write_controller_token_ = write_controller->GetStopToken();
|
||||
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
|
||||
if (compaction_picker_->IsLevel0CompactionInProgress()) {
|
||||
@ -569,7 +570,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
|
||||
"[%s] Stopping writes because we have %d level-0 files",
|
||||
name_.c_str(), vstorage->l0_delay_trigger_count());
|
||||
} else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
|
||||
compaction_needed_bytes >=
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit) {
|
||||
write_controller_token_ = write_controller->GetStopToken();
|
||||
@ -594,7 +596,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
name_.c_str(), imm()->NumNotFlushed(),
|
||||
mutable_cf_options.max_write_buffer_number,
|
||||
write_controller->delayed_write_rate());
|
||||
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
|
||||
vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_slowdown_writes_trigger) {
|
||||
write_controller_token_ =
|
||||
@ -611,7 +614,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
"rate %" PRIu64,
|
||||
name_.c_str(), vstorage->l0_delay_trigger_count(),
|
||||
write_controller->delayed_write_rate());
|
||||
} else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
|
||||
vstorage->estimated_compaction_needed_bytes() >=
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit) {
|
||||
write_controller_token_ =
|
||||
|
@ -2413,6 +2413,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
|
||||
mutable_cf_options.level0_stop_writes_trigger = 10000;
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
|
||||
mutable_cf_options.disable_auto_compactions = false;
|
||||
|
||||
vstorage->TEST_set_estimated_compaction_needed_bytes(50);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
@ -2559,16 +2560,17 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
|
||||
vstorage->set_l0_delay_trigger_count(50);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
|
||||
|
||||
vstorage->set_l0_delay_trigger_count(60);
|
||||
vstorage->TEST_set_estimated_compaction_needed_bytes(300);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
|
||||
|
||||
mutable_cf_options.disable_auto_compactions = false;
|
||||
vstorage->set_l0_delay_trigger_count(70);
|
||||
vstorage->TEST_set_estimated_compaction_needed_bytes(500);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
@ -2576,7 +2578,6 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
|
||||
|
||||
mutable_cf_options.disable_auto_compactions = false;
|
||||
vstorage->set_l0_delay_trigger_count(71);
|
||||
vstorage->TEST_set_estimated_compaction_needed_bytes(501);
|
||||
cfd->RecalculateWriteStallConditions(mutable_cf_options);
|
||||
@ -2803,6 +2804,217 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
|
||||
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_F(ColumnFamilyTest, FlushCloseWALFiles) {
|
||||
SpecialEnv env(Env::Default());
|
||||
db_options_.env = &env;
|
||||
db_options_.max_background_flushes = 1;
|
||||
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
|
||||
Open();
|
||||
CreateColumnFamilies({"one"});
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(0, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
|
||||
// Block flush jobs from running
|
||||
test::SleepingBackgroundTask sleeping_task;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
|
||||
Env::Priority::HIGH);
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = true;
|
||||
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
|
||||
|
||||
ASSERT_EQ(2, env.num_open_wal_file_.load());
|
||||
|
||||
sleeping_task.WakeUp();
|
||||
sleeping_task.WaitUntilDone();
|
||||
WaitForFlush(1);
|
||||
ASSERT_EQ(1, env.num_open_wal_file_.load());
|
||||
|
||||
Reopen();
|
||||
ASSERT_EQ("mirko", Get(0, "fodor"));
|
||||
ASSERT_EQ("mirko", Get(1, "fodor"));
|
||||
db_options_.env = env_;
|
||||
Close();
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
TEST_F(ColumnFamilyTest, IteratorCloseWALFile1) {
|
||||
SpecialEnv env(Env::Default());
|
||||
db_options_.env = &env;
|
||||
db_options_.max_background_flushes = 1;
|
||||
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
|
||||
Open();
|
||||
CreateColumnFamilies({"one"});
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
// Create an iterator holding the current super version.
|
||||
Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]);
|
||||
// A flush will make `it` hold the last reference of its super version.
|
||||
Flush(1);
|
||||
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(0, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
|
||||
// Flush jobs will close previous WAL files after finishing. By
|
||||
// block flush jobs from running, we trigger a condition where
|
||||
// the iterator destructor should close the WAL files.
|
||||
test::SleepingBackgroundTask sleeping_task;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
|
||||
Env::Priority::HIGH);
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = true;
|
||||
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
|
||||
|
||||
ASSERT_EQ(2, env.num_open_wal_file_.load());
|
||||
// Deleting the iterator will clear its super version, triggering
|
||||
// closing all files
|
||||
delete it;
|
||||
ASSERT_EQ(1, env.num_open_wal_file_.load());
|
||||
|
||||
sleeping_task.WakeUp();
|
||||
sleeping_task.WaitUntilDone();
|
||||
WaitForFlush(1);
|
||||
|
||||
Reopen();
|
||||
ASSERT_EQ("mirko", Get(0, "fodor"));
|
||||
ASSERT_EQ("mirko", Get(1, "fodor"));
|
||||
db_options_.env = env_;
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(ColumnFamilyTest, IteratorCloseWALFile2) {
|
||||
SpecialEnv env(Env::Default());
|
||||
// Allow both of flush and purge job to schedule.
|
||||
env.SetBackgroundThreads(2, Env::HIGH);
|
||||
db_options_.env = &env;
|
||||
db_options_.max_background_flushes = 1;
|
||||
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
|
||||
Open();
|
||||
CreateColumnFamilies({"one"});
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
// Create an iterator holding the current super version.
|
||||
ReadOptions ro;
|
||||
ro.background_purge_on_iterator_cleanup = true;
|
||||
Iterator* it = db_->NewIterator(ro, handles_[1]);
|
||||
// A flush will make `it` hold the last reference of its super version.
|
||||
Flush(1);
|
||||
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(0, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||
{"ColumnFamilyTest::IteratorCloseWALFile2:0",
|
||||
"DBImpl::BGWorkPurge:start"},
|
||||
{"ColumnFamilyTest::IteratorCloseWALFile2:2",
|
||||
"DBImpl::BackgroundCallFlush:start"},
|
||||
{"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = true;
|
||||
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
|
||||
|
||||
ASSERT_EQ(2, env.num_open_wal_file_.load());
|
||||
// Deleting the iterator will clear its super version, triggering
|
||||
// closing all files
|
||||
delete it;
|
||||
ASSERT_EQ(2, env.num_open_wal_file_.load());
|
||||
|
||||
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
|
||||
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
|
||||
ASSERT_EQ(1, env.num_open_wal_file_.load());
|
||||
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
|
||||
WaitForFlush(1);
|
||||
ASSERT_EQ(1, env.num_open_wal_file_.load());
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
Reopen();
|
||||
ASSERT_EQ("mirko", Get(0, "fodor"));
|
||||
ASSERT_EQ("mirko", Get(1, "fodor"));
|
||||
db_options_.env = env_;
|
||||
Close();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE // TEST functions are not supported in lite
|
||||
TEST_F(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
|
||||
SpecialEnv env(Env::Default());
|
||||
// Allow both of flush and purge job to schedule.
|
||||
env.SetBackgroundThreads(2, Env::HIGH);
|
||||
db_options_.env = &env;
|
||||
db_options_.max_background_flushes = 1;
|
||||
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(3));
|
||||
column_family_options_.level0_file_num_compaction_trigger = 2;
|
||||
Open();
|
||||
CreateColumnFamilies({"one"});
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(1, "fodar2", "mirko"));
|
||||
Flush(1);
|
||||
|
||||
// Create an iterator holding the current super version, as well as
|
||||
// the SST file just flushed.
|
||||
ReadOptions ro;
|
||||
ro.tailing = true;
|
||||
ro.background_purge_on_iterator_cleanup = true;
|
||||
Iterator* it = db_->NewIterator(ro, handles_[1]);
|
||||
// A flush will make `it` hold the last reference of its super version.
|
||||
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(1, "fodar2", "mirko"));
|
||||
Flush(1);
|
||||
|
||||
WaitForCompaction();
|
||||
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(0, "fodor", "mirko"));
|
||||
ASSERT_OK(Put(1, "fodor", "mirko"));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||
{"ColumnFamilyTest::IteratorCloseWALFile2:0",
|
||||
"DBImpl::BGWorkPurge:start"},
|
||||
{"ColumnFamilyTest::IteratorCloseWALFile2:2",
|
||||
"DBImpl::BackgroundCallFlush:start"},
|
||||
{"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = true;
|
||||
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
|
||||
|
||||
env.delete_count_.store(0);
|
||||
ASSERT_EQ(2, env.num_open_wal_file_.load());
|
||||
// Deleting the iterator will clear its super version, triggering
|
||||
// closing all files
|
||||
it->Seek("");
|
||||
ASSERT_EQ(2, env.num_open_wal_file_.load());
|
||||
ASSERT_EQ(0, env.delete_count_.load());
|
||||
|
||||
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
|
||||
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
|
||||
ASSERT_EQ(1, env.num_open_wal_file_.load());
|
||||
ASSERT_EQ(1, env.delete_count_.load());
|
||||
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
|
||||
WaitForFlush(1);
|
||||
ASSERT_EQ(1, env.num_open_wal_file_.load());
|
||||
ASSERT_EQ(1, env.delete_count_.load());
|
||||
|
||||
delete it;
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
Reopen();
|
||||
ASSERT_EQ("mirko", Get(0, "fodor"));
|
||||
ASSERT_EQ("mirko", Get(1, "fodor"));
|
||||
db_options_.env = env_;
|
||||
Close();
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
// Disable on windows because SyncWAL requires env->IsSyncThreadSafe()
|
||||
// to return true which is not so in unbuffered mode.
|
||||
#ifndef OS_WIN
|
||||
|
@ -716,6 +716,16 @@ uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
|
||||
return min_log;
|
||||
}
|
||||
|
||||
void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
|
||||
if (!job_context->logs_to_free.empty()) {
|
||||
for (auto l : job_context->logs_to_free) {
|
||||
AddToLogsToFreeQueue(l);
|
||||
}
|
||||
job_context->logs_to_free.clear();
|
||||
SchedulePurge();
|
||||
}
|
||||
}
|
||||
|
||||
// * Returns the list of live files in 'sst_live'
|
||||
// If it's doing full scan:
|
||||
// * Returns the list of all files in the filesystem in
|
||||
@ -1566,10 +1576,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
||||
// we just ignore the update.
|
||||
// That's why we set ignore missing column families to true
|
||||
bool has_valid_writes = false;
|
||||
// If we pass DB through and options.max_successive_merges is hit
|
||||
// during recovery, Get() will be issued which will try to acquire
|
||||
// DB mutex and cause deadlock, as DB mutex is already held.
|
||||
// The DB pointer is not needed unless 2PC is used.
|
||||
// TODO(sdong) fix the allow_2pc case too.
|
||||
status = WriteBatchInternal::InsertInto(
|
||||
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
|
||||
log_number, this, false /* concurrent_memtable_writes */,
|
||||
next_sequence, &has_valid_writes);
|
||||
log_number, db_options_.allow_2pc ? this : nullptr,
|
||||
false /* concurrent_memtable_writes */, next_sequence,
|
||||
&has_valid_writes);
|
||||
// If it is the first log file and there is no column family updated
|
||||
// after replaying the file, this file may be a stale file. We ignore
|
||||
// sequence IDs from the file. Otherwise, if a newer stale log file that
|
||||
@ -1685,7 +1701,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
||||
// recovered and should be ignored on next reincarnation.
|
||||
// Since we already recovered max_log_number, we want all logs
|
||||
// with numbers `<= max_log_number` (includes this one) to be ignored
|
||||
if (flushed) {
|
||||
if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
|
||||
edit->SetLogNumber(max_log_number + 1);
|
||||
}
|
||||
// we must mark the next log number as used, even though it's
|
||||
@ -2358,20 +2374,6 @@ void DBImpl::NotifyOnCompactionCompleted(
|
||||
#endif // ROCKSDB_LITE
|
||||
}
|
||||
|
||||
bool DBImpl::NeedFlushOrCompaction(const MutableCFOptions& base_options,
|
||||
const MutableCFOptions& new_options) {
|
||||
return (base_options.disable_auto_compactions &&
|
||||
!new_options.disable_auto_compactions) ||
|
||||
base_options.level0_slowdown_writes_trigger <
|
||||
new_options.level0_slowdown_writes_trigger ||
|
||||
base_options.level0_stop_writes_trigger <
|
||||
new_options.level0_stop_writes_trigger ||
|
||||
base_options.soft_pending_compaction_bytes_limit <
|
||||
new_options.soft_pending_compaction_bytes_limit ||
|
||||
base_options.hard_pending_compaction_bytes_limit <
|
||||
new_options.hard_pending_compaction_bytes_limit;
|
||||
}
|
||||
|
||||
Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
||||
const std::unordered_map<std::string, std::string>& options_map) {
|
||||
#ifdef ROCKSDB_LITE
|
||||
@ -2385,7 +2387,6 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
||||
return Status::InvalidArgument("empty input");
|
||||
}
|
||||
|
||||
MutableCFOptions prev_options = *cfd->GetLatestMutableCFOptions();
|
||||
MutableCFOptions new_options;
|
||||
Status s;
|
||||
Status persist_options_status;
|
||||
@ -2394,14 +2395,12 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
||||
s = cfd->SetOptions(options_map);
|
||||
if (s.ok()) {
|
||||
new_options = *cfd->GetLatestMutableCFOptions();
|
||||
if (NeedFlushOrCompaction(prev_options, new_options)) {
|
||||
// Trigger possible flush/compactions. This has to be before we persist
|
||||
// options to file, otherwise there will be a deadlock with writer
|
||||
// thread.
|
||||
auto* old_sv =
|
||||
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
|
||||
delete old_sv;
|
||||
}
|
||||
// Trigger possible flush/compactions. This has to be before we persist
|
||||
// options to file, otherwise there will be a deadlock with writer
|
||||
// thread.
|
||||
auto* old_sv =
|
||||
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
|
||||
delete old_sv;
|
||||
|
||||
// Persist RocksDB options under the single write thread
|
||||
WriteThread::Writer w;
|
||||
@ -3005,8 +3004,9 @@ void DBImpl::BGWorkCompaction(void* arg) {
|
||||
|
||||
void DBImpl::BGWorkPurge(void* db) {
|
||||
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
|
||||
TEST_SYNC_POINT("DBImpl::BGWorkPurge");
|
||||
TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
|
||||
reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
|
||||
TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
|
||||
}
|
||||
|
||||
void DBImpl::UnscheduleCallback(void* arg) {
|
||||
@ -3021,20 +3021,32 @@ void DBImpl::UnscheduleCallback(void* arg) {
|
||||
void DBImpl::BackgroundCallPurge() {
|
||||
mutex_.Lock();
|
||||
|
||||
while (!purge_queue_.empty()) {
|
||||
auto purge_file = purge_queue_.begin();
|
||||
auto fname = purge_file->fname;
|
||||
auto type = purge_file->type;
|
||||
auto number = purge_file->number;
|
||||
auto path_id = purge_file->path_id;
|
||||
auto job_id = purge_file->job_id;
|
||||
purge_queue_.pop_front();
|
||||
// We use one single loop to clear both queues so that after existing the loop
|
||||
// both queues are empty. This is stricter than what is needed, but can make
|
||||
// it easier for us to reason the correctness.
|
||||
while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) {
|
||||
if (!purge_queue_.empty()) {
|
||||
auto purge_file = purge_queue_.begin();
|
||||
auto fname = purge_file->fname;
|
||||
auto type = purge_file->type;
|
||||
auto number = purge_file->number;
|
||||
auto path_id = purge_file->path_id;
|
||||
auto job_id = purge_file->job_id;
|
||||
purge_queue_.pop_front();
|
||||
|
||||
mutex_.Unlock();
|
||||
Status file_deletion_status;
|
||||
DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number,
|
||||
path_id);
|
||||
mutex_.Lock();
|
||||
mutex_.Unlock();
|
||||
Status file_deletion_status;
|
||||
DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number,
|
||||
path_id);
|
||||
mutex_.Lock();
|
||||
} else {
|
||||
assert(!logs_to_free_queue_.empty());
|
||||
log::Writer* log_writer = *(logs_to_free_queue_.begin());
|
||||
logs_to_free_queue_.pop_front();
|
||||
mutex_.Unlock();
|
||||
delete log_writer;
|
||||
mutex_.Lock();
|
||||
}
|
||||
}
|
||||
bg_purge_scheduled_--;
|
||||
|
||||
@ -3101,6 +3113,8 @@ void DBImpl::BackgroundCallFlush() {
|
||||
JobContext job_context(next_job_id_.fetch_add(1), true);
|
||||
assert(bg_flush_scheduled_);
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
|
||||
|
||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
@ -3343,7 +3357,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
|
||||
// compaction is not necessary. Need to make sure mutex is held
|
||||
// until we make a copy in the following code
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
|
||||
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
|
||||
if (c != nullptr) {
|
||||
// update statistics
|
||||
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
|
||||
@ -3670,6 +3686,9 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
|
||||
state->mu->Lock();
|
||||
state->super_version->Cleanup();
|
||||
state->db->FindObsoleteFiles(&job_context, false, true);
|
||||
if (state->background_purge) {
|
||||
state->db->ScheduleBgLogWriterClose(&job_context);
|
||||
}
|
||||
state->mu->Unlock();
|
||||
|
||||
delete state->super_version;
|
||||
|
15
db/db_impl.h
15
db/db_impl.h
@ -365,6 +365,10 @@ class DBImpl : public DB {
|
||||
// compaction status.
|
||||
int BGCompactionsAllowed() const;
|
||||
|
||||
// move logs pending closing from job_context to the DB queue and
|
||||
// schedule a purge
|
||||
void ScheduleBgLogWriterClose(JobContext* job_context);
|
||||
|
||||
// Returns the list of live files in 'live' and the list
|
||||
// of all files in the filesystem in 'candidate_files'.
|
||||
// If force == false and the last call was less than
|
||||
@ -493,6 +497,9 @@ class DBImpl : public DB {
|
||||
|
||||
void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
|
||||
void MarkLogAsContainingPrepSection(uint64_t log);
|
||||
void AddToLogsToFreeQueue(log::Writer* log_writer) {
|
||||
logs_to_free_queue_.push_back(log_writer);
|
||||
}
|
||||
|
||||
Status NewDB();
|
||||
|
||||
@ -672,11 +679,6 @@ class DBImpl : public DB {
|
||||
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
// Compare options before and after to see whether flush or compaction is
|
||||
// needed immediately after dynamic option change.
|
||||
bool NeedFlushOrCompaction(const MutableCFOptions& base_options,
|
||||
const MutableCFOptions& new_options);
|
||||
|
||||
void PrintStatistics();
|
||||
|
||||
// dump rocksdb.stats to LOG
|
||||
@ -884,6 +886,9 @@ class DBImpl : public DB {
|
||||
|
||||
// A queue to store filenames of the files to be purged
|
||||
std::deque<PurgeFileInfo> purge_queue_;
|
||||
|
||||
// A queue to store log writers to close
|
||||
std::deque<log::Writer*> logs_to_free_queue_;
|
||||
int unscheduled_flushes_;
|
||||
int unscheduled_compactions_;
|
||||
|
||||
|
@ -6,6 +6,9 @@
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#include <limits>
|
||||
#include <string>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "util/sync_point.h"
|
||||
@ -20,80 +23,101 @@ class DBOptionsTest : public DBTestBase {
|
||||
// RocksDB lite don't support dynamic options.
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
// When write stalls, user can enable auto compaction to unblock writes.
|
||||
// However, we had an issue where the stalled write thread blocks the attempt
|
||||
// to persist auto compaction option, thus creating a deadlock. The test
|
||||
// verifies the issue is fixed.
|
||||
TEST_F(DBOptionsTest, EnableAutoCompactionToUnblockWrites) {
|
||||
Options options;
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 1000 * 1000; // 1M
|
||||
options.level0_file_num_compaction_trigger = 1;
|
||||
options.level0_slowdown_writes_trigger = 1;
|
||||
options.level0_stop_writes_trigger = 1;
|
||||
options.compression = kNoCompression;
|
||||
TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) {
|
||||
const std::string kValue(1024, 'v');
|
||||
for (int method_type = 0; method_type < 2; method_type++) {
|
||||
for (int option_type = 0; option_type < 4; option_type++) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 1024 * 1024;
|
||||
options.compression = CompressionType::kNoCompression;
|
||||
options.level0_file_num_compaction_trigger = 1;
|
||||
options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
|
||||
options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
|
||||
options.hard_pending_compaction_bytes_limit =
|
||||
std::numeric_limits<uint64_t>::max();
|
||||
options.soft_pending_compaction_bytes_limit =
|
||||
std::numeric_limits<uint64_t>::max();
|
||||
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::DelayWrite:Wait",
|
||||
"DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"},
|
||||
{"DBImpl::BackgroundCompaction:Finish",
|
||||
"DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
DestroyAndReopen(options);
|
||||
for (int i = 0; i < 1024 * 2; i++) {
|
||||
Put(Key(i), kValue);
|
||||
}
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
ASSERT_EQ(2, NumTableFilesAtLevel(0));
|
||||
uint64_t l0_size = SizeAtLevel(0);
|
||||
|
||||
// Stall writes.
|
||||
Reopen(options);
|
||||
env_->StartThread(
|
||||
[](void* arg) {
|
||||
std::string value(1000, 'v');
|
||||
auto* t = static_cast<DBOptionsTest*>(arg);
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
ASSERT_OK(t->Put(t->Key(i), value));
|
||||
}
|
||||
},
|
||||
this);
|
||||
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionToUnblockWrites:1");
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ColumnFamilyHandle* handle = dbfull()->DefaultColumnFamily();
|
||||
// We will get a deadlock here if we hit the issue.
|
||||
ASSERT_OK(dbfull()->EnableAutoCompaction({handle}));
|
||||
env_->WaitForJoin();
|
||||
}
|
||||
switch (option_type) {
|
||||
case 0:
|
||||
// test with level0_stop_writes_trigger
|
||||
options.level0_stop_writes_trigger = 2;
|
||||
options.level0_slowdown_writes_trigger = 2;
|
||||
break;
|
||||
case 1:
|
||||
options.level0_slowdown_writes_trigger = 2;
|
||||
break;
|
||||
case 2:
|
||||
options.hard_pending_compaction_bytes_limit = l0_size;
|
||||
options.soft_pending_compaction_bytes_limit = l0_size;
|
||||
break;
|
||||
case 3:
|
||||
options.soft_pending_compaction_bytes_limit = l0_size;
|
||||
break;
|
||||
}
|
||||
Reopen(options);
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
|
||||
// Similar to EnableAutoCompactionAfterStallDeadlock. See comments there.
|
||||
TEST_F(DBOptionsTest, ToggleStopTriggerToUnblockWrites) {
|
||||
Options options;
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 1000 * 1000; // 1M
|
||||
options.level0_file_num_compaction_trigger = 1;
|
||||
options.level0_slowdown_writes_trigger = 1;
|
||||
options.level0_stop_writes_trigger = 1;
|
||||
options.compression = kNoCompression;
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBOptionsTest::EnableAutoCompactionAndTriggerStall:1",
|
||||
"BackgroundCallCompaction:0"},
|
||||
{"DBImpl::BackgroundCompaction():BeforePickCompaction",
|
||||
"DBOptionsTest::EnableAutoCompactionAndTriggerStall:2"},
|
||||
{"DBOptionsTest::EnableAutoCompactionAndTriggerStall:3",
|
||||
"DBImpl::BackgroundCompaction():AfterPickCompaction"}});
|
||||
// Block background compaction.
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::DelayWrite:Wait",
|
||||
"DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"},
|
||||
{"DBImpl::BackgroundCompaction:Finish",
|
||||
"DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
switch (method_type) {
|
||||
case 0:
|
||||
ASSERT_OK(
|
||||
dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
|
||||
break;
|
||||
case 1:
|
||||
ASSERT_OK(dbfull()->EnableAutoCompaction(
|
||||
{dbfull()->DefaultColumnFamily()}));
|
||||
break;
|
||||
}
|
||||
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:1");
|
||||
// Wait for stall condition recalculate.
|
||||
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:2");
|
||||
|
||||
// Stall writes.
|
||||
Reopen(options);
|
||||
env_->StartThread(
|
||||
[](void* arg) {
|
||||
std::string value(1000, 'v');
|
||||
auto* t = static_cast<DBOptionsTest*>(arg);
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
ASSERT_OK(t->Put(t->Key(i), value));
|
||||
}
|
||||
},
|
||||
this);
|
||||
TEST_SYNC_POINT("DBOptionsTest::ToggleStopTriggerToUnblockWrites:1");
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
|
||||
// We will get a deadlock here if we hit the issue.
|
||||
ASSERT_OK(
|
||||
dbfull()->SetOptions({{"level0_stop_writes_trigger", "1000000"},
|
||||
{"level0_slowdown_writes_trigger", "1000000"}}));
|
||||
env_->WaitForJoin();
|
||||
switch (option_type) {
|
||||
case 0:
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
|
||||
break;
|
||||
case 1:
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
break;
|
||||
case 2:
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
|
||||
break;
|
||||
case 3:
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
break;
|
||||
}
|
||||
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:3");
|
||||
|
||||
// Background compaction executed.
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
|
||||
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -1816,6 +1816,22 @@ TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBTest2, MaxSuccessiveMergesInRecovery) {
|
||||
Options options;
|
||||
options = CurrentOptions(options);
|
||||
options.merge_operator = MergeOperators::CreatePutOperator();
|
||||
DestroyAndReopen(options);
|
||||
|
||||
db_->Put(WriteOptions(), "foo", "bar");
|
||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
||||
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
|
||||
|
||||
options.max_successive_merges = 3;
|
||||
Reopen(options);
|
||||
}
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -30,6 +30,8 @@ SpecialEnv::SpecialEnv(Env* base)
|
||||
manifest_write_error_.store(false, std::memory_order_release);
|
||||
log_write_error_.store(false, std::memory_order_release);
|
||||
random_file_open_counter_.store(0, std::memory_order_relaxed);
|
||||
delete_count_.store(0, std::memory_order_relaxed);
|
||||
num_open_wal_file_.store(0);
|
||||
log_write_slowdown_ = 0;
|
||||
bytes_written_ = 0;
|
||||
sync_counter_ = 0;
|
||||
|
@ -285,7 +285,10 @@ class SpecialEnv : public EnvWrapper {
|
||||
class WalFile : public WritableFile {
|
||||
public:
|
||||
WalFile(SpecialEnv* env, unique_ptr<WritableFile>&& b)
|
||||
: env_(env), base_(std::move(b)) {}
|
||||
: env_(env), base_(std::move(b)) {
|
||||
env_->num_open_wal_file_.fetch_add(1);
|
||||
}
|
||||
virtual ~WalFile() { env_->num_open_wal_file_.fetch_add(-1); }
|
||||
Status Append(const Slice& data) override {
|
||||
#if !(defined NDEBUG) || !defined(OS_WIN)
|
||||
TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1");
|
||||
@ -443,6 +446,11 @@ class SpecialEnv : public EnvWrapper {
|
||||
addon_time_.load();
|
||||
}
|
||||
|
||||
virtual Status DeleteFile(const std::string& fname) override {
|
||||
delete_count_.fetch_add(1);
|
||||
return target()->DeleteFile(fname);
|
||||
}
|
||||
|
||||
Random rnd_;
|
||||
port::Mutex rnd_mutex_; // Lock to pretect rnd_
|
||||
|
||||
@ -470,6 +478,9 @@ class SpecialEnv : public EnvWrapper {
|
||||
// Slow down every log write, in micro-seconds.
|
||||
std::atomic<int> log_write_slowdown_;
|
||||
|
||||
// Number of WAL files that are still open for write.
|
||||
std::atomic<int> num_open_wal_file_;
|
||||
|
||||
bool count_random_reads_;
|
||||
anon::AtomicCounter random_read_counter_;
|
||||
std::atomic<size_t> random_read_bytes_counter_;
|
||||
@ -494,6 +505,8 @@ class SpecialEnv : public EnvWrapper {
|
||||
|
||||
std::atomic<int64_t> addon_time_;
|
||||
|
||||
std::atomic<int> delete_count_;
|
||||
|
||||
bool time_elapse_only_sleep_;
|
||||
|
||||
bool no_sleep_;
|
||||
|
@ -292,6 +292,44 @@ TEST_F(DBWALTest, RecoveryWithEmptyLog) {
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_F(DBWALTest, GetSortedWalFiles) {
|
||||
do {
|
||||
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
||||
VectorLogPtr log_files;
|
||||
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
|
||||
ASSERT_EQ(0, log_files.size());
|
||||
|
||||
ASSERT_OK(Put(1, "foo", "v1"));
|
||||
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
|
||||
ASSERT_EQ(1, log_files.size());
|
||||
} while (ChangeOptions());
|
||||
}
|
||||
|
||||
TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
|
||||
// Test for regression of WAL cleanup missing files that don't contain data
|
||||
// for every column family.
|
||||
do {
|
||||
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
||||
ASSERT_OK(Put(1, "foo", "v1"));
|
||||
ASSERT_OK(Put(1, "foo", "v2"));
|
||||
std::array<uint64_t, 2> earliest_log_nums;
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
if (i > 0) {
|
||||
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
||||
}
|
||||
VectorLogPtr log_files;
|
||||
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
|
||||
if (log_files.size() > 0) {
|
||||
earliest_log_nums[i] = log_files[0]->LogNumber();
|
||||
} else {
|
||||
earliest_log_nums[i] = port::kMaxUint64;
|
||||
}
|
||||
}
|
||||
// Check at least the first WAL was cleaned up during the recovery.
|
||||
ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
|
||||
} while (ChangeOptions());
|
||||
}
|
||||
|
||||
TEST_F(DBWALTest, RecoverWithLargeLog) {
|
||||
do {
|
||||
{
|
||||
|
@ -16,12 +16,17 @@
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
||||
#include "db/dbformat.h"
|
||||
#include "db/column_family.h"
|
||||
#include "db/dbformat.h"
|
||||
#include "db/flush_scheduler.h"
|
||||
#include "db/internal_stats.h"
|
||||
#include "db/job_context.h"
|
||||
#include "db/log_writer.h"
|
||||
#include "db/memtable_list.h"
|
||||
#include "db/snapshot_impl.h"
|
||||
#include "db/version_edit.h"
|
||||
#include "db/write_controller.h"
|
||||
#include "db/write_thread.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
@ -33,11 +38,6 @@
|
||||
#include "util/instrumented_mutex.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/thread_local.h"
|
||||
#include "db/internal_stats.h"
|
||||
#include "db/write_controller.h"
|
||||
#include "db/flush_scheduler.h"
|
||||
#include "db/write_thread.h"
|
||||
#include "db/job_context.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
|
@ -153,10 +153,14 @@ void ForwardIterator::SVCleanup() {
|
||||
db_->mutex_.Lock();
|
||||
sv_->Cleanup();
|
||||
db_->FindObsoleteFiles(&job_context, false, true);
|
||||
if (read_options_.background_purge_on_iterator_cleanup) {
|
||||
db_->ScheduleBgLogWriterClose(&job_context);
|
||||
}
|
||||
db_->mutex_.Unlock();
|
||||
delete sv_;
|
||||
if (job_context.HaveSomethingToDelete()) {
|
||||
db_->PurgeObsoleteFiles(job_context);
|
||||
db_->PurgeObsoleteFiles(
|
||||
job_context, read_options_.background_purge_on_iterator_cleanup);
|
||||
}
|
||||
job_context.Clean();
|
||||
}
|
||||
|
@ -615,15 +615,15 @@ void InternalStats::DumpDBStats(std::string* value) {
|
||||
// Data
|
||||
// writes: total number of write requests.
|
||||
// keys: total number of key updates issued by all the write requests
|
||||
// batches: number of group commits issued to the DB. Each group can contain
|
||||
// one or more writes.
|
||||
// commit groups: number of group commits issued to the DB. Each group can
|
||||
// contain one or more writes.
|
||||
// so writes/keys is the average number of put in multi-put or put
|
||||
// writes/batches is the average group commit size.
|
||||
// writes/groups is the average group commit size.
|
||||
//
|
||||
// The format is the same for interval stats.
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Cumulative writes: %s writes, %s keys, %s batches, "
|
||||
"%.1f writes per batch, ingest: %.2f GB, %.2f MB/s\n",
|
||||
"Cumulative writes: %s writes, %s keys, %s commit groups, "
|
||||
"%.1f writes per commit group, ingest: %.2f GB, %.2f MB/s\n",
|
||||
NumberToHumanString(write_other + write_self).c_str(),
|
||||
NumberToHumanString(num_keys_written).c_str(),
|
||||
NumberToHumanString(write_self).c_str(),
|
||||
@ -654,8 +654,8 @@ void InternalStats::DumpDBStats(std::string* value) {
|
||||
uint64_t interval_num_keys_written =
|
||||
num_keys_written - db_stats_snapshot_.num_keys_written;
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Interval writes: %s writes, %s keys, %s batches, "
|
||||
"%.1f writes per batch, ingest: %.2f MB, %.2f MB/s\n",
|
||||
"Interval writes: %s writes, %s keys, %s commit groups, "
|
||||
"%.1f writes per commit group, ingest: %.2f MB, %.2f MB/s\n",
|
||||
NumberToHumanString(
|
||||
interval_write_other + interval_write_self).c_str(),
|
||||
NumberToHumanString(interval_num_keys_written).c_str(),
|
||||
|
@ -12,7 +12,6 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "db/column_family.h"
|
||||
#include "db/log_writer.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
@ -95,11 +95,11 @@ Status TableCache::GetTableReader(
|
||||
unique_ptr<RandomAccessFile> file;
|
||||
Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);
|
||||
|
||||
if (readahead > 0) {
|
||||
file = NewReadaheadRandomAccessFile(std::move(file), readahead);
|
||||
}
|
||||
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
|
||||
if (s.ok()) {
|
||||
if (readahead > 0) {
|
||||
file = NewReadaheadRandomAccessFile(std::move(file), readahead);
|
||||
}
|
||||
if (!sequential_mode && ioptions_.advise_random_on_open) {
|
||||
file->Hint(RandomAccessFile::RANDOM);
|
||||
}
|
||||
|
@ -383,7 +383,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
|
||||
Status s;
|
||||
if (type == kAliveLogFile) {
|
||||
std::string fname = LogFileName(db_options_.wal_dir, number);
|
||||
s = ReadFirstLine(fname, sequence);
|
||||
s = ReadFirstLine(fname, number, sequence);
|
||||
if (env_->FileExists(fname).ok() && !s.ok()) {
|
||||
// return any error that is not caused by non-existing file
|
||||
return s;
|
||||
@ -394,7 +394,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
|
||||
// check if the file got moved to archive.
|
||||
std::string archived_file =
|
||||
ArchivedLogFileName(db_options_.wal_dir, number);
|
||||
s = ReadFirstLine(archived_file, sequence);
|
||||
s = ReadFirstLine(archived_file, number, sequence);
|
||||
// maybe the file was deleted from archive dir. If that's the case, return
|
||||
// Status::OK(). The caller with identify this as empty file because
|
||||
// *sequence == 0
|
||||
@ -413,6 +413,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
|
||||
// the function returns status.ok() and sequence == 0 if the file exists, but is
|
||||
// empty
|
||||
Status WalManager::ReadFirstLine(const std::string& fname,
|
||||
const uint64_t number,
|
||||
SequenceNumber* sequence) {
|
||||
struct LogReporter : public log::Reader::Reporter {
|
||||
Env* env;
|
||||
@ -449,7 +450,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
|
||||
reporter.status = &status;
|
||||
reporter.ignore_error = !db_options_.paranoid_checks;
|
||||
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
|
||||
true /*checksum*/, 0 /*initial_offset*/, *sequence);
|
||||
true /*checksum*/, 0 /*initial_offset*/, number);
|
||||
std::string scratch;
|
||||
Slice record;
|
||||
|
||||
|
@ -54,9 +54,9 @@ class WalManager {
|
||||
return ReadFirstRecord(type, number, sequence);
|
||||
}
|
||||
|
||||
Status TEST_ReadFirstLine(const std::string& fname,
|
||||
Status TEST_ReadFirstLine(const std::string& fname, const uint64_t number,
|
||||
SequenceNumber* sequence) {
|
||||
return ReadFirstLine(fname, sequence);
|
||||
return ReadFirstLine(fname, number, sequence);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -71,7 +71,8 @@ class WalManager {
|
||||
Status ReadFirstRecord(const WalFileType type, const uint64_t number,
|
||||
SequenceNumber* sequence);
|
||||
|
||||
Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
|
||||
Status ReadFirstLine(const std::string& fname, const uint64_t number,
|
||||
SequenceNumber* sequence);
|
||||
|
||||
// ------- state from DBImpl ------
|
||||
const DBOptions& db_options_;
|
||||
|
@ -119,10 +119,11 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) {
|
||||
ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
|
||||
|
||||
SequenceNumber s;
|
||||
ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, &s));
|
||||
ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
|
||||
ASSERT_EQ(s, 0U);
|
||||
|
||||
ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
|
||||
ASSERT_OK(
|
||||
wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
|
||||
ASSERT_EQ(s, 0U);
|
||||
|
||||
unique_ptr<WritableFileWriter> file_writer(
|
||||
|
19
include/rocksdb/utilities/option_change_migration.h
Normal file
19
include/rocksdb/utilities/option_change_migration.h
Normal file
@ -0,0 +1,19 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/status.h"
|
||||
|
||||
namespace rocksdb {
|
||||
// Try to migrate DB created with old_opts to be use new_opts.
|
||||
// Multiple column families is not supported.
|
||||
// It is best-effort. No guarantee to succeed.
|
||||
// A full compaction may be executed.
|
||||
Status OptionChangeMigration(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts);
|
||||
} // namespace rocksdb
|
@ -6,7 +6,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 4
|
||||
#define ROCKSDB_MINOR 11
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 2
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
2
src.mk
2
src.mk
@ -131,6 +131,7 @@ LIB_SOURCES = \
|
||||
utilities/merge_operators/string_append/stringappend2.cc \
|
||||
utilities/merge_operators/string_append/stringappend.cc \
|
||||
utilities/merge_operators/uint64add.cc \
|
||||
utilities/option_change_migration/option_change_migration.cc \
|
||||
utilities/options/options_util.cc \
|
||||
utilities/persistent_cache/persistent_cache_tier.cc \
|
||||
utilities/persistent_cache/volatile_tier_impl.cc \
|
||||
@ -286,6 +287,7 @@ MAIN_SOURCES = \
|
||||
utilities/geodb/geodb_test.cc \
|
||||
utilities/memory/memory_test.cc \
|
||||
utilities/merge_operators/string_append/stringappend_test.cc \
|
||||
utilities/option_change_migration/option_change_migration_test.cc \
|
||||
utilities/options/options_util_test.cc \
|
||||
utilities/redis/redis_lists_test.cc \
|
||||
utilities/simulator_cache/sim_cache_test.cc \
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include "util/perf_context_imp.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -533,12 +534,19 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
|
||||
|
||||
// We've successfully read the footer and the index block: we're
|
||||
// ready to serve requests.
|
||||
// Better not mutate rep_ after the creation. eg. internal_prefix_transform
|
||||
// raw pointer will be used to create HashIndexReader, whose reset may
|
||||
// access a dangling pointer.
|
||||
Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options,
|
||||
internal_comparator, skip_filters);
|
||||
rep->file = std::move(file);
|
||||
rep->footer = footer;
|
||||
rep->index_type = table_options.index_type;
|
||||
rep->hash_index_allow_collision = table_options.hash_index_allow_collision;
|
||||
// We need to wrap data with internal_prefix_transform to make sure it can
|
||||
// handle prefix correctly.
|
||||
rep->internal_prefix_transform.reset(
|
||||
new InternalKeySliceTransform(rep->ioptions.prefix_extractor));
|
||||
SetupCacheKeyPrefix(rep, file_size);
|
||||
unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep));
|
||||
|
||||
@ -1053,7 +1061,11 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
|
||||
} else {
|
||||
// Create index reader and put it in the cache.
|
||||
Status s;
|
||||
TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:2");
|
||||
s = CreateIndexReader(&index_reader);
|
||||
TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:1");
|
||||
TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:3");
|
||||
TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:4");
|
||||
if (s.ok()) {
|
||||
assert(index_reader != nullptr);
|
||||
s = block_cache->Insert(key, index_reader, index_reader->usable_size(),
|
||||
@ -1609,10 +1621,6 @@ Status BlockBasedTable::CreateIndexReader(
|
||||
meta_index_iter = meta_iter_guard.get();
|
||||
}
|
||||
|
||||
// We need to wrap data with internal_prefix_transform to make sure it can
|
||||
// handle prefix correctly.
|
||||
rep_->internal_prefix_transform.reset(
|
||||
new InternalKeySliceTransform(rep_->ioptions.prefix_extractor));
|
||||
return HashIndexReader::Create(
|
||||
rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions,
|
||||
comparator, footer.index_handle(), meta_index_iter, index_reader,
|
||||
|
@ -45,6 +45,7 @@
|
||||
#include "util/random.h"
|
||||
#include "util/statistics.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/sync_point.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/testutil.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
@ -2021,6 +2022,65 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) {
|
||||
c.ResetTableReader();
|
||||
}
|
||||
|
||||
TEST_F(BlockBasedTableTest, NewIndexIteratorLeak) {
|
||||
// A regression test to avoid data race described in
|
||||
// https://github.com/facebook/rocksdb/issues/1267
|
||||
TableConstructor c(BytewiseComparator(), true /* convert_to_internal_key_ */);
|
||||
std::vector<std::string> keys;
|
||||
stl_wrappers::KVMap kvmap;
|
||||
c.Add("a1", "val1");
|
||||
Options options;
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.index_type = BlockBasedTableOptions::kHashSearch;
|
||||
table_options.cache_index_and_filter_blocks = true;
|
||||
table_options.block_cache = NewLRUCache(0);
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
const ImmutableCFOptions ioptions(options);
|
||||
c.Finish(options, ioptions, table_options,
|
||||
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
|
||||
{
|
||||
{"BlockBasedTable::NewIndexIterator::thread1:1",
|
||||
"BlockBasedTable::NewIndexIterator::thread2:2"},
|
||||
{"BlockBasedTable::NewIndexIterator::thread2:3",
|
||||
"BlockBasedTable::NewIndexIterator::thread1:4"},
|
||||
},
|
||||
{
|
||||
{"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker",
|
||||
"BlockBasedTable::NewIndexIterator::thread1:1"},
|
||||
{"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker",
|
||||
"BlockBasedTable::NewIndexIterator::thread1:4"},
|
||||
{"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker",
|
||||
"BlockBasedTable::NewIndexIterator::thread2:2"},
|
||||
{"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker",
|
||||
"BlockBasedTable::NewIndexIterator::thread2:3"},
|
||||
});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
ReadOptions ro;
|
||||
auto* reader = c.GetTableReader();
|
||||
|
||||
std::function<void()> func1 = [&]() {
|
||||
TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker");
|
||||
std::unique_ptr<InternalIterator> iter(reader->NewIterator(ro));
|
||||
iter->Seek(InternalKey("a1", 0, kTypeValue).Encode());
|
||||
};
|
||||
|
||||
std::function<void()> func2 = [&]() {
|
||||
TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker");
|
||||
std::unique_ptr<InternalIterator> iter(reader->NewIterator(ro));
|
||||
};
|
||||
|
||||
auto thread1 = std::thread(func1);
|
||||
auto thread2 = std::thread(func2);
|
||||
thread1.join();
|
||||
thread2.join();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
c.ResetTableReader();
|
||||
}
|
||||
|
||||
// Plain table is not supported in ROCKSDB_LITE
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_F(PlainTableTest, BasicPlainTableProperties) {
|
||||
|
@ -129,9 +129,13 @@ class PosixEnv : public Env {
|
||||
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
|
||||
thread_pools_[pool_id].JoinAllThreads();
|
||||
}
|
||||
// All threads must be joined before the deletion of
|
||||
// thread_status_updater_.
|
||||
delete thread_status_updater_;
|
||||
// Delete the thread_status_updater_ only when the current Env is not
|
||||
// Env::Default(). This is to avoid the free-after-use error when
|
||||
// Env::Default() is destructed while some other child threads are
|
||||
// still trying to update thread status.
|
||||
if (this != Env::Default()) {
|
||||
delete thread_status_updater_;
|
||||
}
|
||||
}
|
||||
|
||||
void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include <assert.h>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
@ -10,6 +10,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
153
utilities/option_change_migration/option_change_migration.cc
Normal file
153
utilities/option_change_migration/option_change_migration.cc
Normal file
@ -0,0 +1,153 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include "rocksdb/utilities/option_change_migration.h"
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
#include "rocksdb/db.h"
|
||||
|
||||
namespace rocksdb {
|
||||
namespace {
|
||||
// Return a version of Options `opts` that allow us to open/write into a DB
|
||||
// without triggering an automatic compaction or stalling. This is guaranteed
|
||||
// by disabling automatic compactions and using huge values for stalling
|
||||
// triggers.
|
||||
Options GetNoCompactionOptions(const Options& opts) {
|
||||
Options ret_opts = opts;
|
||||
ret_opts.disable_auto_compactions = true;
|
||||
ret_opts.level0_slowdown_writes_trigger = 999999;
|
||||
ret_opts.level0_stop_writes_trigger = 999999;
|
||||
ret_opts.soft_pending_compaction_bytes_limit = 0;
|
||||
ret_opts.hard_pending_compaction_bytes_limit = 0;
|
||||
return ret_opts;
|
||||
}
|
||||
|
||||
Status OpenDb(const Options& options, const std::string& dbname,
|
||||
std::unique_ptr<DB>* db) {
|
||||
db->reset();
|
||||
DB* tmpdb;
|
||||
Status s = DB::Open(options, dbname, &tmpdb);
|
||||
if (s.ok()) {
|
||||
db->reset(tmpdb);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status CompactToLevel(const Options& options, const std::string& dbname,
|
||||
int dest_level, bool need_reopen) {
|
||||
std::unique_ptr<DB> db;
|
||||
Options no_compact_opts = GetNoCompactionOptions(options);
|
||||
if (dest_level == 0) {
|
||||
// L0 has strict sequenceID requirements to files to it. It's safer
|
||||
// to only put one compacted file to there.
|
||||
// This is only used for converting to universal compaction with
|
||||
// only one level. In this case, compacting to one file is also
|
||||
// optimal.
|
||||
no_compact_opts.target_file_size_base = 999999999999999;
|
||||
}
|
||||
Status s = OpenDb(no_compact_opts, dbname, &db);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
CompactRangeOptions cro;
|
||||
cro.change_level = true;
|
||||
cro.target_level = dest_level;
|
||||
db->CompactRange(cro, nullptr, nullptr);
|
||||
|
||||
if (need_reopen) {
|
||||
// Need to restart DB to rewrite the manifest file.
|
||||
// In order to open a DB with specific num_levels, the manifest file should
|
||||
// contain no record that mentiones any level beyond num_levels. Issuing a
|
||||
// full compaction will move all the data to a level not exceeding
|
||||
// num_levels, but the manifest may still contain previous record mentioning
|
||||
// a higher level. Reopening the DB will force the manifest to be rewritten
|
||||
// so that those records will be cleared.
|
||||
db.reset();
|
||||
s = OpenDb(no_compact_opts, dbname, &db);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status MigrateToUniversal(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts) {
|
||||
if (old_opts.num_levels <= new_opts.num_levels) {
|
||||
return Status::OK();
|
||||
} else {
|
||||
bool need_compact = false;
|
||||
{
|
||||
std::unique_ptr<DB> db;
|
||||
Options opts = GetNoCompactionOptions(old_opts);
|
||||
Status s = OpenDb(opts, dbname, &db);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
ColumnFamilyMetaData metadata;
|
||||
db->GetColumnFamilyMetaData(&metadata);
|
||||
if (!metadata.levels.empty() &&
|
||||
metadata.levels.back().level >= new_opts.num_levels) {
|
||||
need_compact = true;
|
||||
}
|
||||
}
|
||||
if (need_compact) {
|
||||
return CompactToLevel(old_opts, dbname, new_opts.num_levels - 1, true);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
Status MigrateToLevelBase(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts) {
|
||||
if (!new_opts.level_compaction_dynamic_level_bytes) {
|
||||
if (old_opts.num_levels == 1) {
|
||||
return Status::OK();
|
||||
}
|
||||
// Compact everything to level 1 to guarantee it can be safely opened.
|
||||
Options opts = old_opts;
|
||||
opts.target_file_size_base = new_opts.target_file_size_base;
|
||||
// Although sometimes we can open the DB with the new option without error,
|
||||
// We still want to compact the files to avoid the LSM tree to stuck
|
||||
// in bad shape. For example, if the user changed the level size
|
||||
// multiplier from 4 to 8, with the same data, we will have fewer
|
||||
// levels. Unless we issue a full comaction, the LSM tree may stuck
|
||||
// with more levels than needed and it won't recover automatically.
|
||||
return CompactToLevel(opts, dbname, 1, true);
|
||||
} else {
|
||||
// Compact everything to the last level to guarantee it can be safely
|
||||
// opened.
|
||||
if (old_opts.num_levels == 1) {
|
||||
return Status::OK();
|
||||
} else if (new_opts.num_levels > old_opts.num_levels) {
|
||||
// Dynamic level mode requires data to be put in the last level first.
|
||||
return CompactToLevel(new_opts, dbname, new_opts.num_levels - 1, false);
|
||||
} else {
|
||||
Options opts = old_opts;
|
||||
opts.target_file_size_base = new_opts.target_file_size_base;
|
||||
return CompactToLevel(opts, dbname, new_opts.num_levels - 1, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
Status OptionChangeMigration(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts) {
|
||||
if (new_opts.compaction_style == CompactionStyle::kCompactionStyleUniversal) {
|
||||
return MigrateToUniversal(dbname, old_opts, new_opts);
|
||||
} else if (new_opts.compaction_style ==
|
||||
CompactionStyle::kCompactionStyleLevel) {
|
||||
return MigrateToLevelBase(dbname, old_opts, new_opts);
|
||||
} else {
|
||||
return Status::NotSupported(
|
||||
"Do not how to migrate to this compaction style");
|
||||
}
|
||||
}
|
||||
} // namespace rocksdb
|
||||
#else
|
||||
namespace rocksdb {
|
||||
Status OptionChangeMigration(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts) {
|
||||
return Status::NotSupported();
|
||||
}
|
||||
} // namespace rocksdb
|
||||
#endif // ROCKSDB_LITE
|
@ -0,0 +1,207 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "rocksdb/utilities/option_change_migration.h"
|
||||
#include <set>
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
namespace rocksdb {
|
||||
|
||||
class DBOptionChangeMigrationTest
|
||||
: public DBTestBase,
|
||||
public testing::WithParamInterface<
|
||||
std::tuple<int, bool, bool, int, bool, bool>> {
|
||||
public:
|
||||
DBOptionChangeMigrationTest()
|
||||
: DBTestBase("/db_option_change_migration_test") {
|
||||
level1_ = std::get<0>(GetParam());
|
||||
is_universal1_ = std::get<1>(GetParam());
|
||||
is_dynamic1_ = std::get<2>(GetParam());
|
||||
|
||||
level2_ = std::get<3>(GetParam());
|
||||
is_universal2_ = std::get<4>(GetParam());
|
||||
is_dynamic2_ = std::get<5>(GetParam());
|
||||
}
|
||||
|
||||
// Required if inheriting from testing::WithParamInterface<>
|
||||
static void SetUpTestCase() {}
|
||||
static void TearDownTestCase() {}
|
||||
|
||||
int level1_;
|
||||
bool is_universal1_;
|
||||
bool is_dynamic1_;
|
||||
|
||||
int level2_;
|
||||
bool is_universal2_;
|
||||
bool is_dynamic2_;
|
||||
};
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_P(DBOptionChangeMigrationTest, Migrate1) {
|
||||
Options old_options = CurrentOptions();
|
||||
if (is_universal1_) {
|
||||
old_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
|
||||
} else {
|
||||
old_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
|
||||
old_options.level_compaction_dynamic_level_bytes = is_dynamic1_;
|
||||
}
|
||||
old_options.level0_file_num_compaction_trigger = 3;
|
||||
old_options.write_buffer_size = 64 * 1024;
|
||||
old_options.target_file_size_base = 128 * 1024;
|
||||
// Make level target of L1, L2 to be 200KB and 600KB
|
||||
old_options.num_levels = level1_;
|
||||
old_options.max_bytes_for_level_multiplier = 3;
|
||||
old_options.max_bytes_for_level_base = 200 * 1024;
|
||||
|
||||
Reopen(old_options);
|
||||
|
||||
Random rnd(301);
|
||||
int key_idx = 0;
|
||||
|
||||
// Generate at least 2MB of data
|
||||
for (int num = 0; num < 20; num++) {
|
||||
GenerateNewFile(&rnd, &key_idx);
|
||||
}
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
// Will make sure exactly those keys are in the DB after migration.
|
||||
std::set<std::string> keys;
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->SeekToFirst();
|
||||
for (; it->Valid(); it->Next()) {
|
||||
keys.insert(it->key().ToString());
|
||||
}
|
||||
}
|
||||
Close();
|
||||
|
||||
Options new_options = old_options;
|
||||
if (is_universal2_) {
|
||||
new_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
|
||||
} else {
|
||||
new_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
|
||||
new_options.level_compaction_dynamic_level_bytes = is_dynamic2_;
|
||||
}
|
||||
new_options.target_file_size_base = 256 * 1024;
|
||||
new_options.num_levels = level2_;
|
||||
new_options.max_bytes_for_level_base = 150 * 1024;
|
||||
new_options.max_bytes_for_level_multiplier = 4;
|
||||
ASSERT_OK(OptionChangeMigration(dbname_, old_options, new_options));
|
||||
Reopen(new_options);
|
||||
|
||||
// Wait for compaction to finish and make sure it can reopen
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
Reopen(new_options);
|
||||
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->SeekToFirst();
|
||||
for (std::string key : keys) {
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ(key, it->key().ToString());
|
||||
it->Next();
|
||||
}
|
||||
ASSERT_TRUE(!it->Valid());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(DBOptionChangeMigrationTest, Migrate2) {
|
||||
Options old_options = CurrentOptions();
|
||||
if (is_universal2_) {
|
||||
old_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
|
||||
} else {
|
||||
old_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
|
||||
old_options.level_compaction_dynamic_level_bytes = is_dynamic2_;
|
||||
}
|
||||
old_options.level0_file_num_compaction_trigger = 3;
|
||||
old_options.write_buffer_size = 64 * 1024;
|
||||
old_options.target_file_size_base = 128 * 1024;
|
||||
// Make level target of L1, L2 to be 200KB and 600KB
|
||||
old_options.num_levels = level2_;
|
||||
old_options.max_bytes_for_level_multiplier = 3;
|
||||
old_options.max_bytes_for_level_base = 200 * 1024;
|
||||
|
||||
Reopen(old_options);
|
||||
|
||||
Random rnd(301);
|
||||
int key_idx = 0;
|
||||
|
||||
// Generate at least 2MB of data
|
||||
for (int num = 0; num < 20; num++) {
|
||||
GenerateNewFile(&rnd, &key_idx);
|
||||
}
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
// Will make sure exactly those keys are in the DB after migration.
|
||||
std::set<std::string> keys;
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->SeekToFirst();
|
||||
for (; it->Valid(); it->Next()) {
|
||||
keys.insert(it->key().ToString());
|
||||
}
|
||||
}
|
||||
|
||||
Close();
|
||||
|
||||
Options new_options = old_options;
|
||||
if (is_universal1_) {
|
||||
new_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
|
||||
} else {
|
||||
new_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
|
||||
new_options.level_compaction_dynamic_level_bytes = is_dynamic1_;
|
||||
}
|
||||
new_options.target_file_size_base = 256 * 1024;
|
||||
new_options.num_levels = level1_;
|
||||
new_options.max_bytes_for_level_base = 150 * 1024;
|
||||
new_options.max_bytes_for_level_multiplier = 4;
|
||||
ASSERT_OK(OptionChangeMigration(dbname_, old_options, new_options));
|
||||
Reopen(new_options);
|
||||
// Wait for compaction to finish and make sure it can reopen
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
Reopen(new_options);
|
||||
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->SeekToFirst();
|
||||
for (std::string key : keys) {
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ(key, it->key().ToString());
|
||||
it->Next();
|
||||
}
|
||||
ASSERT_TRUE(!it->Valid());
|
||||
}
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
DBOptionChangeMigrationTest, DBOptionChangeMigrationTest,
|
||||
::testing::Values(std::make_tuple(3, false, false, 4, false, false),
|
||||
std::make_tuple(3, false, true, 4, false, true),
|
||||
std::make_tuple(3, false, true, 4, false, false),
|
||||
std::make_tuple(3, false, false, 4, false, true),
|
||||
std::make_tuple(3, true, false, 4, true, false),
|
||||
std::make_tuple(1, true, false, 4, true, false),
|
||||
std::make_tuple(3, false, false, 4, true, false),
|
||||
std::make_tuple(3, false, false, 1, true, false),
|
||||
std::make_tuple(3, false, true, 4, true, false),
|
||||
std::make_tuple(3, false, true, 1, true, false),
|
||||
std::make_tuple(1, true, false, 4, false, false)));
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
rocksdb::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -7,6 +7,8 @@
|
||||
#include "utilities/persistent_cache/block_cache_tier_file.h"
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <functional>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <functional>
|
||||
#include "util/random.h"
|
||||
#include "utilities/persistent_cache/hash_table.h"
|
||||
#include "utilities/persistent_cache/lrulist.h"
|
||||
|
Loading…
Reference in New Issue
Block a user