Compare commits
30 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
fe55e3e18f | ||
|
3d267c9941 | ||
|
315d3afcde | ||
|
dae099db27 | ||
|
eb22eb7dfe | ||
|
b26658ca6b | ||
|
57f8dd3dc1 | ||
|
61c915ac50 | ||
|
63cbcd25d1 | ||
|
b2892047fd | ||
|
beb5daeeac | ||
|
a00f9bc498 | ||
|
7d8218912d | ||
|
a5163cfa60 | ||
|
6bca22522b | ||
|
2c4981cada | ||
|
9891486ddd | ||
|
849efe3863 | ||
|
e14eaa31fd | ||
|
8e5a257bfe | ||
|
a0cdf54bd3 | ||
|
935ce6d386 | ||
|
c93f7848c6 | ||
|
882e706400 | ||
|
f04765f7cf | ||
|
7768975517 | ||
|
314828c973 | ||
|
8a6d7a349a | ||
|
1991ad0409 | ||
|
2be7301d42 |
@ -502,6 +502,7 @@ set(TESTS
|
||||
db/db_iter_test.cc
|
||||
db/db_log_iter_test.cc
|
||||
db/db_memtable_test.cc
|
||||
db/db_merge_operator_test.cc
|
||||
db/db_options_test.cc
|
||||
db/db_properties_test.cc
|
||||
db/db_table_properties_test.cc
|
||||
|
@ -1,4 +1,8 @@
|
||||
# RocksDB default options change log
|
||||
## 5.0 (11/17/2016)
|
||||
* Options::allow_concurrent_memtable_write and Options::enable_write_thread_adaptive_yield are now true by default
|
||||
* Options.level0_stop_writes_trigger default value changes from 24 to 32.
|
||||
|
||||
## 4.8.0 (5/2/2016)
|
||||
* options.max_open_files changes from 5000 to -1. It improves performance, but users need to set file descriptor limit to be large enough and watch memory usage for index and bloom filters.
|
||||
* options.base_background_compactions changes from max_background_compactions to 1. When users set higher max_background_compactions but the write throughput is not high, the writes are less spiky to disks.
|
||||
|
14
HISTORY.md
14
HISTORY.md
@ -1,9 +1,11 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
### Public API Change
|
||||
* Options.level0_stop_writes_trigger default value changes from 24 to 32.
|
||||
* New compaction filter API: CompactionFilter::FilterV2(). Allows to drop ranges of keys.
|
||||
* Removed flashcache support.
|
||||
### Bug fixes
|
||||
* Fix the data corruption bug in the case that concurrent memtable write is enabled and 2PC is used.
|
||||
* OptimizeForPointLookup() doesn't work with the default DB setting of allow_concurrent_memtable_write=true. Fix it.
|
||||
* Fix a 2PC related bug where WAL files size grow too large.
|
||||
* Fix the bug that if 2PC is enabled, checkpoints may loss some recent transactions.
|
||||
* When file copying is needed when creating checkpoints or bulk loading files, fsync the file after the file copying.
|
||||
|
||||
## 5.0.0 (11/17/2016)
|
||||
### Public API Change
|
||||
@ -13,6 +15,10 @@
|
||||
* Support dynamically change `delayed_write_rate` option via SetDBOptions().
|
||||
* Options::allow_concurrent_memtable_write and Options::enable_write_thread_adaptive_yield are now true by default.
|
||||
* Remove Tickers::SEQUENCE_NUMBER to avoid confusion if statistics object is shared among RocksDB instance. Alternatively DB::GetLatestSequenceNumber() can be used to get the same value.
|
||||
* Options.level0_stop_writes_trigger default value changes from 24 to 32.
|
||||
* New compaction filter API: CompactionFilter::FilterV2(). Allows to drop ranges of keys.
|
||||
* Removed flashcache support.
|
||||
* DB::AddFile() is deprecated and is replaced with DB::IngestExternalFile(). DB::IngestExternalFile() remove all the restrictions that existed for DB::AddFile.
|
||||
|
||||
### New Features
|
||||
* Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable via SetDBOptions().
|
||||
|
8
Makefile
8
Makefile
@ -220,10 +220,6 @@ default: all
|
||||
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
|
||||
-Wno-unused-parameter
|
||||
|
||||
ifndef DISABLE_WARNING_AS_ERROR
|
||||
WARNING_FLAGS += -Werror
|
||||
endif
|
||||
|
||||
|
||||
ifdef LUA_PATH
|
||||
|
||||
@ -309,6 +305,7 @@ TESTS = \
|
||||
db_inplace_update_test \
|
||||
db_iterator_test \
|
||||
db_memtable_test \
|
||||
db_merge_operator_test \
|
||||
db_options_test \
|
||||
db_range_del_test \
|
||||
db_sst_test \
|
||||
@ -996,6 +993,9 @@ db_iterator_test: db/db_iterator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHA
|
||||
db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
|
@ -51,12 +51,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
FBCODE_BUILD="true"
|
||||
# If we're compiling with TSAN we need pic build
|
||||
PIC_BUILD=$COMPILE_WITH_TSAN
|
||||
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
# we need this to build with MySQL. Don't use for other purposes.
|
||||
source "$PWD/build_tools/fbcode_config4.8.1.sh"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Delete existing output, if it exists
|
||||
|
@ -1,18 +1,19 @@
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/cf7d14c625ce30bae1a4661c2319c5a283e4dd22/4.9.x/centos6-native/108cf83
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/8598c375b0e94e1448182eb3df034704144a838d/stable/centos6-native/3f16ddd
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/d6e0a7da6faba45f5e5b1638f9edd7afc2f34e7d/4.9.x/gcc-4.9-glibc-2.20/024dbc3
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/d282e6e8f3d20f4e40a516834847bdc038e07973/2.20/gcc-4.9-glibc-2.20/500e281
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/0882df3713c7a84f15abe368dc004581f20b39d7/1.2.8/gcc-5-glibc-2.23/9bc6787
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/740325875f6729f42d28deaa2147b0854f3a347e/1.0.6/gcc-5-glibc-2.23/9bc6787
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/9455f75ff7f4831dc9fda02a6a0f8c68922fad8f/1.0.0/gcc-5-glibc-2.23/9bc6787
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/f001a51b2854957676d07306ef3abf67186b5c8b/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/fc8a13ca1fffa4d0765c716c5a0b49f0c107518f/master/gcc-5-glibc-2.23/1c32b4b
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/17c514c4d102a25ca15f4558be564eeed76f4b6a/2.0.8/gcc-5-glibc-2.23/9bc6787
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb
|
||||
TBB_BASE=/mnt/gvfs/third-party2/tbb/9d9a554877d0c5bef330fe818ab7178806dd316a/4.0_update2/gcc-4.9-glibc-2.20/e9936bf
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/7c111ff27e0c466235163f00f280a9d617c3d2ec/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/b7fd454c4b10c6a81015d4524ed06cdeab558490/2.26/centos6-native/da39a3e
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d7f4d4d86674a57668e3a96f76f0e17dd0eb8765/3.10.0/gcc-4.9-glibc-2.20/e9936bf
|
||||
LUA_BASE=/mnt/gvfs/third-party2/lua/61e4abf5813bbc39bc4f548757ccfcadde175a48/5.2.3/gcc-4.9-glibc-2.20/690f0d7
|
||||
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
|
||||
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
|
||||
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832
|
||||
|
@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
|
||||
CFLAGS=""
|
||||
|
||||
# libgcc
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
|
||||
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
|
||||
|
||||
# glibc
|
||||
@ -43,11 +43,15 @@ if test -z $PIC_BUILD; then
|
||||
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
|
||||
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
|
||||
CFLAGS+=" -DLZ4"
|
||||
fi
|
||||
|
||||
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
|
||||
CFLAGS+=" -DZSTD"
|
||||
else
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DZSTD"
|
||||
|
||||
# location of gflags headers and libraries
|
||||
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
|
||||
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
|
||||
else
|
||||
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DGFLAGS=google"
|
||||
CFLAGS+=" -DGFLAGS=gflags"
|
||||
|
||||
# location of jemalloc
|
||||
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
|
||||
@ -104,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
|
||||
CXX="$GCC_BASE/bin/g++"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
JEMALLOC=1
|
||||
else
|
||||
# clang
|
||||
@ -116,8 +120,8 @@ else
|
||||
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $CLANG_INCLUDE"
|
||||
@ -128,13 +132,14 @@ else
|
||||
fi
|
||||
|
||||
CFLAGS+=" $DEPS_INCLUDE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
|
||||
CXXFLAGS+=" $CFLAGS"
|
||||
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" $LIBUNWIND"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
|
||||
# required by libtbb
|
||||
EXEC_LDFLAGS+=" -ldl"
|
||||
|
||||
@ -144,12 +149,4 @@ EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GF
|
||||
|
||||
VALGRIND_VER="$VALGRIND_BASE/bin/"
|
||||
|
||||
LUA_PATH="$LUA_BASE"
|
||||
|
||||
if test -z $PIC_BUILD; then
|
||||
LUA_LIB=" $LUA_PATH/lib/liblua.a"
|
||||
else
|
||||
LUA_LIB=" $LUA_PATH/lib/liblua_pic.a"
|
||||
fi
|
||||
|
||||
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD LUA_PATH LUA_LIB
|
||||
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD
|
||||
|
@ -370,7 +370,8 @@ ColumnFamilyData::ColumnFamilyData(
|
||||
column_family_set_(column_family_set),
|
||||
pending_flush_(false),
|
||||
pending_compaction_(false),
|
||||
prev_compaction_needed_bytes_(0) {
|
||||
prev_compaction_needed_bytes_(0),
|
||||
allow_2pc_(db_options.allow_2pc) {
|
||||
Ref();
|
||||
|
||||
// Convert user defined table properties collector factories to internal ones.
|
||||
@ -492,6 +493,25 @@ ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
|
||||
return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
|
||||
}
|
||||
|
||||
uint64_t ColumnFamilyData::OldestLogToKeep() {
|
||||
auto current_log = GetLogNumber();
|
||||
|
||||
if (allow_2pc_) {
|
||||
auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
|
||||
auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
|
||||
|
||||
if (imm_prep_log > 0 && imm_prep_log < current_log) {
|
||||
current_log = imm_prep_log;
|
||||
}
|
||||
|
||||
if (mem_prep_log > 0 && mem_prep_log < current_log) {
|
||||
current_log = mem_prep_log;
|
||||
}
|
||||
}
|
||||
|
||||
return current_log;
|
||||
}
|
||||
|
||||
const double kIncSlowdownRatio = 0.8;
|
||||
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
|
||||
const double kNearStopSlowdownRatio = 0.6;
|
||||
|
@ -239,6 +239,9 @@ class ColumnFamilyData {
|
||||
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
|
||||
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
|
||||
|
||||
// calculate the oldest log needed for the durability of this column family
|
||||
uint64_t OldestLogToKeep();
|
||||
|
||||
// See Memtable constructor for explanation of earliest_seq param.
|
||||
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
|
||||
SequenceNumber earliest_seq);
|
||||
@ -404,6 +407,9 @@ class ColumnFamilyData {
|
||||
bool pending_compaction_;
|
||||
|
||||
uint64_t prev_compaction_needed_bytes_;
|
||||
|
||||
// if the database was opened with 2pc enabled
|
||||
bool allow_2pc_;
|
||||
};
|
||||
|
||||
// ColumnFamilySet has interesting thread-safety requirements
|
||||
|
@ -437,9 +437,7 @@ void CompactionIterator::NextFromInput() {
|
||||
bottommost_level_);
|
||||
merge_out_iter_.SeekToFirst();
|
||||
|
||||
if (merge_helper_->FilteredUntil(&skip_until)) {
|
||||
need_skip = true;
|
||||
} else if (merge_out_iter_.Valid()) {
|
||||
if (merge_out_iter_.Valid()) {
|
||||
// NOTE: key, value, and ikey_ refer to old entries.
|
||||
// These will be correctly set below.
|
||||
key_ = merge_out_iter_.key();
|
||||
@ -460,6 +458,10 @@ void CompactionIterator::NextFromInput() {
|
||||
// coming after the merges
|
||||
has_current_user_key_ = false;
|
||||
pinned_iters_mgr_.ReleasePinnedData();
|
||||
|
||||
if (merge_helper_->FilteredUntil(&skip_until)) {
|
||||
need_skip = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 1. new user key -OR-
|
||||
|
@ -272,8 +272,8 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
|
||||
return Decision::kKeep;
|
||||
}
|
||||
if (k == "i") {
|
||||
EXPECT_EQ(ValueType::kValue, t);
|
||||
EXPECT_EQ("iv95", v);
|
||||
EXPECT_EQ(ValueType::kMergeOperand, t);
|
||||
EXPECT_EQ("im95", v);
|
||||
*skip_until = "z";
|
||||
return Decision::kRemoveAndSkipUntil;
|
||||
}
|
||||
@ -299,10 +299,10 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
|
||||
test::KeyStr("f", 30, kTypeMerge), // skip to "g+"
|
||||
test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue),
|
||||
test::KeyStr("h", 91, kTypeValue), // keep
|
||||
test::KeyStr("i", 95, kTypeValue), // skip to "z"
|
||||
test::KeyStr("i", 95, kTypeMerge), // skip to "z"
|
||||
test::KeyStr("j", 99, kTypeValue)},
|
||||
{"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
|
||||
"fv25", "gv90", "hv91", "iv95", "jv99"},
|
||||
"fv25", "gv90", "hv91", "im95", "jv99"},
|
||||
{}, {}, kMaxSequenceNumber, &merge_op, &filter);
|
||||
|
||||
// Compaction should output just "a", "e" and "h" keys.
|
||||
|
@ -772,9 +772,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
key, sub_compact->current_output_file_size) &&
|
||||
sub_compact->builder != nullptr) {
|
||||
CompactionIterationStats range_del_out_stats;
|
||||
status =
|
||||
FinishCompactionOutputFile(input->status(), sub_compact,
|
||||
range_del_agg.get(), &range_del_out_stats);
|
||||
status = FinishCompactionOutputFile(input->status(), sub_compact,
|
||||
range_del_agg.get(),
|
||||
&range_del_out_stats, &key);
|
||||
RecordDroppedKeys(range_del_out_stats,
|
||||
&sub_compact->compaction_job_stats);
|
||||
if (!status.ok()) {
|
||||
@ -853,9 +853,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
}
|
||||
}
|
||||
|
||||
Status input_status = input->status();
|
||||
c_iter->Next();
|
||||
|
||||
// Close output file if it is big enough
|
||||
// TODO(aekmekji): determine if file should be closed earlier than this
|
||||
// during subcompactions (i.e. if output size, estimated by input size, is
|
||||
@ -864,6 +861,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
if (sub_compact->compaction->output_level() != 0 &&
|
||||
sub_compact->current_output_file_size >=
|
||||
sub_compact->compaction->max_output_file_size()) {
|
||||
Status input_status = input->status();
|
||||
c_iter->Next();
|
||||
|
||||
const Slice* next_key = nullptr;
|
||||
if (c_iter->Valid()) {
|
||||
next_key = &c_iter->key();
|
||||
@ -879,6 +879,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
// files.
|
||||
sub_compact->compression_dict = std::move(compression_dict);
|
||||
}
|
||||
} else {
|
||||
c_iter->Next();
|
||||
}
|
||||
}
|
||||
|
||||
|
157
db/db_impl.cc
157
db/db_impl.cc
@ -338,6 +338,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
||||
last_stats_dump_time_microsec_(0),
|
||||
next_job_id_(1),
|
||||
has_unpersisted_data_(false),
|
||||
unable_to_flush_oldest_log_(false),
|
||||
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
|
||||
num_running_ingest_file_(0),
|
||||
#ifndef ROCKSDB_LITE
|
||||
@ -654,6 +655,10 @@ void DBImpl::MaybeDumpStats() {
|
||||
}
|
||||
|
||||
uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
|
||||
if (!allow_2pc()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t min_log = 0;
|
||||
|
||||
// we must look through the memtables for two phase transactions
|
||||
@ -698,6 +703,11 @@ void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
|
||||
}
|
||||
|
||||
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
|
||||
|
||||
if (!allow_2pc()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
|
||||
uint64_t min_log = 0;
|
||||
|
||||
@ -736,6 +746,34 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t DBImpl::MinLogNumberToKeep() {
|
||||
uint64_t log_number = versions_->MinLogNumber();
|
||||
|
||||
if (allow_2pc()) {
|
||||
// if are 2pc we must consider logs containing prepared
|
||||
// sections of outstanding transactions.
|
||||
//
|
||||
// We must check min logs with outstanding prep before we check
|
||||
// logs referneces by memtables because a log referenced by the
|
||||
// first data structure could transition to the second under us.
|
||||
//
|
||||
// TODO(horuff): iterating over all column families under db mutex.
|
||||
// should find more optimial solution
|
||||
auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
|
||||
|
||||
if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) {
|
||||
log_number = min_log_in_prep_heap;
|
||||
}
|
||||
|
||||
auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
|
||||
|
||||
if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) {
|
||||
log_number = min_log_refed_by_mem;
|
||||
}
|
||||
}
|
||||
return log_number;
|
||||
}
|
||||
|
||||
// * Returns the list of live files in 'sst_live'
|
||||
// If it's doing full scan:
|
||||
// * Returns the list of all files in the filesystem in
|
||||
@ -794,32 +832,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
job_context->manifest_file_number = versions_->manifest_file_number();
|
||||
job_context->pending_manifest_file_number =
|
||||
versions_->pending_manifest_file_number();
|
||||
job_context->log_number = versions_->MinLogNumber();
|
||||
|
||||
if (allow_2pc()) {
|
||||
// if are 2pc we must consider logs containing prepared
|
||||
// sections of outstanding transactions.
|
||||
//
|
||||
// We must check min logs with outstanding prep before we check
|
||||
// logs referneces by memtables because a log referenced by the
|
||||
// first data structure could transition to the second under us.
|
||||
//
|
||||
// TODO(horuff): iterating over all column families under db mutex.
|
||||
// should find more optimial solution
|
||||
auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
|
||||
|
||||
if (min_log_in_prep_heap != 0 &&
|
||||
min_log_in_prep_heap < job_context->log_number) {
|
||||
job_context->log_number = min_log_in_prep_heap;
|
||||
}
|
||||
|
||||
auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
|
||||
|
||||
if (min_log_refed_by_mem != 0 &&
|
||||
min_log_refed_by_mem < job_context->log_number) {
|
||||
job_context->log_number = min_log_refed_by_mem;
|
||||
}
|
||||
}
|
||||
job_context->log_number = MinLogNumberToKeep();
|
||||
|
||||
job_context->prev_log_number = versions_->prev_log_number();
|
||||
|
||||
@ -2493,7 +2506,7 @@ Status DBImpl::SetDBOptions(
|
||||
mutable_db_options_ = new_options;
|
||||
|
||||
if (total_log_size_ > GetMaxTotalWalSize()) {
|
||||
FlushColumnFamilies();
|
||||
MaybeFlushColumnFamilies();
|
||||
}
|
||||
|
||||
persist_options_status = PersistOptions();
|
||||
@ -2939,6 +2952,12 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status::ShutdownInProgress();
|
||||
}
|
||||
if (cfd->IsDropped()) {
|
||||
// FlushJob cannot flush a dropped CF, if we did not break here
|
||||
// we will loop forever since cfd->imm()->NumNotFlushed() will never
|
||||
// drop to zero
|
||||
return Status::InvalidArgument("Cannot flush a dropped CF");
|
||||
}
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
if (!bg_error_.ok()) {
|
||||
@ -4680,9 +4699,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
||||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
|
||||
|
||||
if (UNLIKELY(!single_column_family_mode_ &&
|
||||
!alive_log_files_.begin()->getting_flushed &&
|
||||
total_log_size_ > GetMaxTotalWalSize())) {
|
||||
FlushColumnFamilies();
|
||||
MaybeFlushColumnFamilies();
|
||||
} else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
|
||||
// Before a new memtable is added in SwitchMemtable(),
|
||||
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
|
||||
@ -5000,28 +5018,40 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
||||
return status;
|
||||
}
|
||||
|
||||
void DBImpl::FlushColumnFamilies() {
|
||||
void DBImpl::MaybeFlushColumnFamilies() {
|
||||
mutex_.AssertHeld();
|
||||
|
||||
WriteContext context;
|
||||
|
||||
if (alive_log_files_.begin()->getting_flushed) {
|
||||
return;
|
||||
}
|
||||
|
||||
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
|
||||
alive_log_files_.begin()->getting_flushed = true;
|
||||
auto oldest_alive_log = alive_log_files_.begin()->number;
|
||||
auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep();
|
||||
|
||||
if (allow_2pc() &&
|
||||
unable_to_flush_oldest_log_ &&
|
||||
oldest_log_with_uncommited_prep > 0 &&
|
||||
oldest_log_with_uncommited_prep <= oldest_alive_log) {
|
||||
// we already attempted to flush all column families dependent on
|
||||
// the oldest alive log but the log still contained uncommited transactions.
|
||||
// the oldest alive log STILL contains uncommited transaction so there
|
||||
// is still nothing that we can do.
|
||||
return;
|
||||
}
|
||||
|
||||
WriteContext context;
|
||||
|
||||
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
|
||||
"Flushing all column families with data in WAL number %" PRIu64
|
||||
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
|
||||
flush_column_family_if_log_file, total_log_size_, GetMaxTotalWalSize());
|
||||
oldest_alive_log, total_log_size_, GetMaxTotalWalSize());
|
||||
// no need to refcount because drop is happening in write thread, so can't
|
||||
// happen while we're in the write thread
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
|
||||
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
|
||||
auto status = SwitchMemtable(cfd, &context);
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
@ -5031,6 +5061,26 @@ void DBImpl::FlushColumnFamilies() {
|
||||
}
|
||||
}
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
|
||||
// we only mark this log as getting flushed if we have successfully
|
||||
// flushed all data in this log. If this log contains outstanding prepred
|
||||
// transactions then we cannot flush this log until those transactions are commited.
|
||||
|
||||
unable_to_flush_oldest_log_ = false;
|
||||
|
||||
if (allow_2pc()) {
|
||||
if (oldest_log_with_uncommited_prep == 0 ||
|
||||
oldest_log_with_uncommited_prep > oldest_alive_log) {
|
||||
// this log contains no outstanding prepared transactions
|
||||
alive_log_files_.begin()->getting_flushed = true;
|
||||
} else {
|
||||
Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log,
|
||||
"Unable to release oldest log due to uncommited transaction");
|
||||
unable_to_flush_oldest_log_ = true;
|
||||
}
|
||||
} else {
|
||||
alive_log_files_.begin()->getting_flushed = true;
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t DBImpl::GetMaxTotalWalSize() const {
|
||||
@ -6493,14 +6543,23 @@ Status DBImpl::IngestExternalFile(
|
||||
|
||||
num_running_ingest_file_++;
|
||||
|
||||
// We cannot ingest a file into a dropped CF
|
||||
if (cfd->IsDropped()) {
|
||||
status = Status::InvalidArgument(
|
||||
"Cannot ingest an external file into a dropped CF");
|
||||
}
|
||||
|
||||
// Figure out if we need to flush the memtable first
|
||||
if (status.ok()) {
|
||||
bool need_flush = false;
|
||||
status = ingestion_job.NeedsFlush(&need_flush);
|
||||
|
||||
if (status.ok() && need_flush) {
|
||||
mutex_.Unlock();
|
||||
status = FlushMemTable(cfd, FlushOptions(), true /* writes_stopped */);
|
||||
mutex_.Lock();
|
||||
}
|
||||
}
|
||||
|
||||
// Run the ingestion job
|
||||
if (status.ok()) {
|
||||
@ -6541,9 +6600,35 @@ Status DBImpl::IngestExternalFile(
|
||||
// Cleanup
|
||||
ingestion_job.Cleanup(status);
|
||||
|
||||
if (status.ok()) {
|
||||
NotifyOnExternalFileIngested(cfd, ingestion_job);
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
void DBImpl::NotifyOnExternalFileIngested(
|
||||
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (immutable_db_options_.listeners.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
|
||||
ExternalFileIngestionInfo info;
|
||||
info.cf_name = cfd->GetName();
|
||||
info.external_file_path = f.external_file_path;
|
||||
info.internal_file_path = f.internal_file_path;
|
||||
info.global_seqno = f.assigned_seqno;
|
||||
info.table_properties = f.table_properties;
|
||||
for (auto listener : immutable_db_options_.listeners) {
|
||||
listener->OnExternalFileIngested(this, info);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
}
|
||||
|
||||
void DBImpl::WaitForIngestFile() {
|
||||
mutex_.AssertHeld();
|
||||
while (num_running_ingest_file_ > 0) {
|
||||
|
27
db/db_impl.h
27
db/db_impl.h
@ -23,6 +23,7 @@
|
||||
#include "db/column_family.h"
|
||||
#include "db/compaction_job.h"
|
||||
#include "db/dbformat.h"
|
||||
#include "db/external_sst_file_ingestion_job.h"
|
||||
#include "db/flush_job.h"
|
||||
#include "db/flush_scheduler.h"
|
||||
#include "db/internal_stats.h"
|
||||
@ -307,6 +308,16 @@ class DBImpl : public DB {
|
||||
ColumnFamilyHandle* column_family = nullptr,
|
||||
bool disallow_trivial_move = false);
|
||||
|
||||
void TEST_MaybeFlushColumnFamilies();
|
||||
|
||||
bool TEST_UnableToFlushOldestLog() {
|
||||
return unable_to_flush_oldest_log_;
|
||||
}
|
||||
|
||||
bool TEST_IsLogGettingFlushed() {
|
||||
return alive_log_files_.begin()->getting_flushed;
|
||||
}
|
||||
|
||||
// Force current memtable contents to be flushed.
|
||||
Status TEST_FlushMemTable(bool wait = true,
|
||||
ColumnFamilyHandle* cfh = nullptr);
|
||||
@ -379,6 +390,8 @@ class DBImpl : public DB {
|
||||
// schedule a purge
|
||||
void ScheduleBgLogWriterClose(JobContext* job_context);
|
||||
|
||||
uint64_t MinLogNumberToKeep();
|
||||
|
||||
// Returns the list of live files in 'live' and the list
|
||||
// of all files in the filesystem in 'candidate_files'.
|
||||
// If force == false and the last call was less than
|
||||
@ -556,6 +569,9 @@ class DBImpl : public DB {
|
||||
void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
|
||||
const MemTableInfo& mem_table_info);
|
||||
|
||||
void NotifyOnExternalFileIngested(
|
||||
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
|
||||
|
||||
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
|
||||
|
||||
void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
|
||||
@ -728,7 +744,7 @@ class DBImpl : public DB {
|
||||
// REQUIRES: mutex locked
|
||||
Status PersistOptions();
|
||||
|
||||
void FlushColumnFamilies();
|
||||
void MaybeFlushColumnFamilies();
|
||||
|
||||
uint64_t GetMaxTotalWalSize() const;
|
||||
|
||||
@ -987,6 +1003,15 @@ class DBImpl : public DB {
|
||||
// Used when disableWAL is true.
|
||||
bool has_unpersisted_data_;
|
||||
|
||||
|
||||
// if an attempt was made to flush all column families that
|
||||
// the oldest log depends on but uncommited data in the oldest
|
||||
// log prevents the log from being released.
|
||||
// We must attempt to free the dependent memtables again
|
||||
// at a later time after the transaction in the oldest
|
||||
// log is fully commited.
|
||||
bool unable_to_flush_oldest_log_;
|
||||
|
||||
static const int KEEP_LOG_FILE_NUM = 1000;
|
||||
// MSVC version 1800 still does not have constexpr for ::max()
|
||||
static const uint64_t kNoTimeOut = port::kMaxUint64;
|
||||
|
@ -19,6 +19,11 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
|
||||
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
|
||||
}
|
||||
|
||||
void DBImpl::TEST_MaybeFlushColumnFamilies() {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
MaybeFlushColumnFamilies();
|
||||
}
|
||||
|
||||
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
|
||||
ColumnFamilyHandle* column_family) {
|
||||
ColumnFamilyData* cfd;
|
||||
|
@ -518,6 +518,7 @@ void DBIter::MergeValuesNewToOld() {
|
||||
iter_->IsValuePinned() /* operand_pinned */);
|
||||
|
||||
ParsedInternalKey ikey;
|
||||
Status s;
|
||||
for (iter_->Next(); iter_->Valid(); iter_->Next()) {
|
||||
if (!ParseKey(&ikey)) {
|
||||
// skip corrupted key
|
||||
@ -538,9 +539,12 @@ void DBIter::MergeValuesNewToOld() {
|
||||
// final result in saved_value_. We are done!
|
||||
// ignore corruption if there is any.
|
||||
const Slice val = iter_->value();
|
||||
MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val,
|
||||
merge_context_.GetOperands(), &saved_value_,
|
||||
logger_, statistics_, env_, &pinned_value_);
|
||||
s = MergeHelper::TimedFullMerge(
|
||||
merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
|
||||
&saved_value_, logger_, statistics_, env_, &pinned_value_);
|
||||
if (!s.ok()) {
|
||||
status_ = s;
|
||||
}
|
||||
// iter_ is positioned after put
|
||||
iter_->Next();
|
||||
return;
|
||||
@ -559,9 +563,12 @@ void DBIter::MergeValuesNewToOld() {
|
||||
// a deletion marker.
|
||||
// feed null as the existing value to the merge operator, such that
|
||||
// client can differentiate this scenario and do things accordingly.
|
||||
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
|
||||
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
|
||||
merge_context_.GetOperands(), &saved_value_,
|
||||
logger_, statistics_, env_, &pinned_value_);
|
||||
if (!s.ok()) {
|
||||
status_ = s;
|
||||
}
|
||||
}
|
||||
|
||||
void DBIter::Prev() {
|
||||
@ -742,6 +749,7 @@ bool DBIter::FindValueForCurrentKey() {
|
||||
FindParseableKey(&ikey, kReverse);
|
||||
}
|
||||
|
||||
Status s;
|
||||
switch (last_key_entry_type) {
|
||||
case kTypeDeletion:
|
||||
case kTypeSingleDeletion:
|
||||
@ -753,16 +761,16 @@ bool DBIter::FindValueForCurrentKey() {
|
||||
if (last_not_merge_type == kTypeDeletion ||
|
||||
last_not_merge_type == kTypeSingleDeletion ||
|
||||
last_not_merge_type == kTypeRangeDeletion) {
|
||||
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
|
||||
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
|
||||
nullptr, merge_context_.GetOperands(),
|
||||
&saved_value_, logger_, statistics_, env_,
|
||||
&pinned_value_);
|
||||
&saved_value_, logger_, statistics_,
|
||||
env_, &pinned_value_);
|
||||
} else {
|
||||
assert(last_not_merge_type == kTypeValue);
|
||||
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
|
||||
&pinned_value_,
|
||||
merge_context_.GetOperands(), &saved_value_,
|
||||
logger_, statistics_, env_, &pinned_value_);
|
||||
s = MergeHelper::TimedFullMerge(
|
||||
merge_operator_, saved_key_.GetKey(), &pinned_value_,
|
||||
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
|
||||
env_, &pinned_value_);
|
||||
}
|
||||
break;
|
||||
case kTypeValue:
|
||||
@ -773,6 +781,9 @@ bool DBIter::FindValueForCurrentKey() {
|
||||
break;
|
||||
}
|
||||
valid_ = true;
|
||||
if (!s.ok()) {
|
||||
status_ = s;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -818,13 +829,15 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
|
||||
FindParseableKey(&ikey, kForward);
|
||||
}
|
||||
|
||||
Status s;
|
||||
if (!iter_->Valid() ||
|
||||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
|
||||
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
|
||||
range_del_agg_.ShouldDelete(ikey)) {
|
||||
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
|
||||
merge_context_.GetOperands(), &saved_value_,
|
||||
logger_, statistics_, env_, &pinned_value_);
|
||||
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
|
||||
nullptr, merge_context_.GetOperands(),
|
||||
&saved_value_, logger_, statistics_, env_,
|
||||
&pinned_value_);
|
||||
// Make iter_ valid and point to saved_key_
|
||||
if (!iter_->Valid() ||
|
||||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
|
||||
@ -832,14 +845,20 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
|
||||
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
|
||||
}
|
||||
valid_ = true;
|
||||
if (!s.ok()) {
|
||||
status_ = s;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
const Slice& val = iter_->value();
|
||||
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
|
||||
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
|
||||
merge_context_.GetOperands(), &saved_value_,
|
||||
logger_, statistics_, env_, &pinned_value_);
|
||||
valid_ = true;
|
||||
if (!s.ok()) {
|
||||
status_ = s;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
98
db/db_merge_operator_test.cc
Normal file
98
db/db_merge_operator_test.cc
Normal file
@ -0,0 +1,98 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "db/forward_iterator.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// Test merge operator functionality.
|
||||
class DBMergeOperatorTest : public DBTestBase {
|
||||
public:
|
||||
DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
|
||||
};
|
||||
|
||||
// A test merge operator mimics put but also fails if one of merge operands is
|
||||
// "corrupted".
|
||||
class TestPutOperator : public MergeOperator {
|
||||
public:
|
||||
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
|
||||
MergeOperationOutput* merge_out) const override {
|
||||
if (merge_in.existing_value != nullptr &&
|
||||
*(merge_in.existing_value) == "corrupted") {
|
||||
return false;
|
||||
}
|
||||
for (auto value : merge_in.operand_list) {
|
||||
if (value == "corrupted") {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
merge_out->existing_operand = merge_in.operand_list.back();
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override { return "TestPutOperator"; }
|
||||
};
|
||||
|
||||
TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.merge_operator.reset(new TestPutOperator());
|
||||
Reopen(options);
|
||||
ASSERT_OK(Merge("k1", "v1"));
|
||||
ASSERT_OK(Merge("k1", "corrupted"));
|
||||
std::string value;
|
||||
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
|
||||
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
|
||||
}
|
||||
|
||||
TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.merge_operator.reset(new TestPutOperator());
|
||||
options.max_successive_merges = 3;
|
||||
Reopen(options);
|
||||
ASSERT_OK(Merge("k1", "v1"));
|
||||
ASSERT_OK(Merge("k1", "v2"));
|
||||
// Will trigger a merge when hitting max_successive_merges and the merge
|
||||
// will fail. The delta will be inserted nevertheless.
|
||||
ASSERT_OK(Merge("k1", "corrupted"));
|
||||
// Data should stay unmerged after the error.
|
||||
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
|
||||
}
|
||||
|
||||
TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.merge_operator.reset(new TestPutOperator());
|
||||
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Merge("k1", "v1"));
|
||||
ASSERT_OK(Merge("k1", "corrupted"));
|
||||
ASSERT_OK(Put("k2", "v2"));
|
||||
VerifyDBFromMap({{"k1", ""}, {"k2", "v2"}}, nullptr, false,
|
||||
{{"k1", Status::Corruption()}});
|
||||
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
|
||||
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Merge("k1", "v1"));
|
||||
ASSERT_OK(Put("k2", "v2"));
|
||||
ASSERT_OK(Merge("k2", "corrupted"));
|
||||
VerifyDBFromMap({{"k1", "v1"}, {"k2", ""}}, nullptr, false,
|
||||
{{"k2", Status::Corruption()}});
|
||||
VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
rocksdb::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -112,6 +112,59 @@ TEST_F(DBRangeDelTest, CompactionOutputFilesExactlyFilled) {
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
|
||||
// Ensures range deletion spanning multiple compaction output files that are
|
||||
// cut by max_compaction_bytes will have non-overlapping key-ranges.
|
||||
// https://github.com/facebook/rocksdb/issues/1778
|
||||
const int kNumFiles = 2, kNumPerFile = 1 << 8, kBytesPerVal = 1 << 12;
|
||||
Options opts = CurrentOptions();
|
||||
opts.comparator = test::Uint64Comparator();
|
||||
opts.disable_auto_compactions = true;
|
||||
opts.level0_file_num_compaction_trigger = kNumFiles;
|
||||
opts.max_compaction_bytes = kNumPerFile * kBytesPerVal;
|
||||
opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
|
||||
// Want max_compaction_bytes to trigger the end of compaction output file, not
|
||||
// target_file_size_base, so make the latter much bigger
|
||||
opts.target_file_size_base = 100 * opts.max_compaction_bytes;
|
||||
Reopen(opts);
|
||||
|
||||
// snapshot protects range tombstone from dropping due to becoming obsolete.
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
|
||||
// It spans the whole key-range, thus will be included in all output files
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
GetNumericStr(0),
|
||||
GetNumericStr(kNumFiles * kNumPerFile - 1)));
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < kNumFiles; ++i) {
|
||||
std::vector<std::string> values;
|
||||
// Write 1MB (256 values, each 4K)
|
||||
for (int j = 0; j < kNumPerFile; j++) {
|
||||
values.push_back(RandomString(&rnd, kBytesPerVal));
|
||||
ASSERT_OK(Put(GetNumericStr(kNumPerFile * i + j), values[j]));
|
||||
}
|
||||
// extra entry to trigger SpecialSkipListFactory's flush
|
||||
ASSERT_OK(Put(GetNumericStr(kNumPerFile), ""));
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
|
||||
}
|
||||
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
|
||||
true /* disallow_trivial_move */);
|
||||
ASSERT_EQ(0, NumTableFilesAtLevel(0));
|
||||
ASSERT_GE(NumTableFilesAtLevel(1), 2);
|
||||
|
||||
std::vector<std::vector<FileMetaData>> files;
|
||||
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
|
||||
|
||||
for (size_t i = 0; i < files[1].size() - 1; ++i) {
|
||||
ASSERT_TRUE(InternalKeyComparator(opts.comparator)
|
||||
.Compare(files[1][i].largest, files[1][i + 1].smallest) <
|
||||
0);
|
||||
}
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, FlushRangeDelsSameStartKey) {
|
||||
db_->Put(WriteOptions(), "b1", "val");
|
||||
ASSERT_OK(
|
||||
|
@ -2212,6 +2212,18 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, OptimizeForPointLookup) {
|
||||
Options options = CurrentOptions();
|
||||
Close();
|
||||
options.OptimizeForPointLookup(2);
|
||||
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
||||
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
ASSERT_EQ("v1", Get("foo"));
|
||||
Flush();
|
||||
ASSERT_EQ("v1", Get("foo"));
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -8,6 +8,7 @@
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "db/forward_iterator.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -501,6 +502,15 @@ Status DBTestBase::Put(int cf, const Slice& k, const Slice& v,
|
||||
}
|
||||
}
|
||||
|
||||
Status DBTestBase::Merge(const Slice& k, const Slice& v, WriteOptions wo) {
|
||||
return db_->Merge(wo, k, v);
|
||||
}
|
||||
|
||||
Status DBTestBase::Merge(int cf, const Slice& k, const Slice& v,
|
||||
WriteOptions wo) {
|
||||
return db_->Merge(wo, handles_[cf], k, v);
|
||||
}
|
||||
|
||||
Status DBTestBase::Delete(const std::string& k) {
|
||||
return db_->Delete(WriteOptions(), k);
|
||||
}
|
||||
@ -1089,11 +1099,18 @@ std::vector<std::uint64_t> DBTestBase::ListTableFiles(Env* env,
|
||||
}
|
||||
|
||||
void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
|
||||
size_t* total_reads_res, bool tailing_iter) {
|
||||
size_t* total_reads_res, bool tailing_iter,
|
||||
std::map<std::string, Status> status) {
|
||||
size_t total_reads = 0;
|
||||
|
||||
for (auto& kv : true_data) {
|
||||
Status s = status[kv.first];
|
||||
if (s.ok()) {
|
||||
ASSERT_EQ(Get(kv.first), kv.second);
|
||||
} else {
|
||||
std::string value;
|
||||
ASSERT_EQ(s, db_->Get(ReadOptions(), kv.first, &value));
|
||||
}
|
||||
total_reads++;
|
||||
}
|
||||
|
||||
@ -1106,21 +1123,40 @@ void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
|
||||
// Verify Iterator::Next()
|
||||
iter_cnt = 0;
|
||||
auto data_iter = true_data.begin();
|
||||
Status s;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
|
||||
ASSERT_EQ(iter->key().ToString(), data_iter->first);
|
||||
Status current_status = status[data_iter->first];
|
||||
if (!current_status.ok()) {
|
||||
s = current_status;
|
||||
}
|
||||
ASSERT_EQ(iter->status(), s);
|
||||
if (current_status.ok()) {
|
||||
ASSERT_EQ(iter->value().ToString(), data_iter->second);
|
||||
}
|
||||
iter_cnt++;
|
||||
total_reads++;
|
||||
}
|
||||
ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
|
||||
<< true_data.size();
|
||||
delete iter;
|
||||
|
||||
// Verify Iterator::Prev()
|
||||
// Use a new iterator to make sure its status is clean.
|
||||
iter = db_->NewIterator(ro);
|
||||
iter_cnt = 0;
|
||||
s = Status::OK();
|
||||
auto data_rev = true_data.rbegin();
|
||||
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
|
||||
ASSERT_EQ(iter->key().ToString(), data_rev->first);
|
||||
Status current_status = status[data_rev->first];
|
||||
if (!current_status.ok()) {
|
||||
s = current_status;
|
||||
}
|
||||
ASSERT_EQ(iter->status(), s);
|
||||
if (current_status.ok()) {
|
||||
ASSERT_EQ(iter->value().ToString(), data_rev->second);
|
||||
}
|
||||
iter_cnt++;
|
||||
total_reads++;
|
||||
}
|
||||
@ -1134,7 +1170,6 @@ void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
|
||||
ASSERT_EQ(kv.second, iter->value().ToString());
|
||||
total_reads++;
|
||||
}
|
||||
|
||||
delete iter;
|
||||
}
|
||||
|
||||
@ -1176,6 +1211,25 @@ void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
|
||||
}
|
||||
}
|
||||
|
||||
void DBTestBase::VerifyDBInternal(
|
||||
std::vector<std::pair<std::string, std::string>> true_data) {
|
||||
Arena arena;
|
||||
InternalKeyComparator icmp(last_options_.comparator);
|
||||
RangeDelAggregator range_del_agg(icmp, {});
|
||||
auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg);
|
||||
iter->SeekToFirst();
|
||||
for (auto p : true_data) {
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ParsedInternalKey ikey;
|
||||
ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey));
|
||||
ASSERT_EQ(p.first, ikey.user_key);
|
||||
ASSERT_EQ(p.second, iter->value());
|
||||
iter->Next();
|
||||
};
|
||||
ASSERT_FALSE(iter->Valid());
|
||||
iter->~InternalIterator();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily(
|
||||
|
@ -699,6 +699,12 @@ class DBTestBase : public testing::Test {
|
||||
Status Put(int cf, const Slice& k, const Slice& v,
|
||||
WriteOptions wo = WriteOptions());
|
||||
|
||||
Status Merge(const Slice& k, const Slice& v,
|
||||
WriteOptions wo = WriteOptions());
|
||||
|
||||
Status Merge(int cf, const Slice& k, const Slice& v,
|
||||
WriteOptions wo = WriteOptions());
|
||||
|
||||
Status Delete(const std::string& k);
|
||||
|
||||
Status Delete(int cf, const std::string& k);
|
||||
@ -827,7 +833,11 @@ class DBTestBase : public testing::Test {
|
||||
|
||||
void VerifyDBFromMap(std::map<std::string, std::string> true_data,
|
||||
size_t* total_reads_res = nullptr,
|
||||
bool tailing_iter = false);
|
||||
bool tailing_iter = false,
|
||||
std::map<std::string, Status> status = {});
|
||||
|
||||
void VerifyDBInternal(
|
||||
std::vector<std::pair<std::string, std::string>> true_data);
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,
|
||||
|
@ -38,6 +38,15 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||
files_to_ingest_.push_back(file_to_ingest);
|
||||
}
|
||||
|
||||
for (const IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.cf_id !=
|
||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
|
||||
f.cf_id != cfd_->GetID()) {
|
||||
return Status::InvalidArgument(
|
||||
"External file column family id dont match");
|
||||
}
|
||||
}
|
||||
|
||||
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
|
||||
auto num_files = files_to_ingest_.size();
|
||||
if (num_files == 0) {
|
||||
@ -87,10 +96,12 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||
status = env_->LinkFile(path_outside_db, path_inside_db);
|
||||
if (status.IsNotSupported()) {
|
||||
// Original file is on a different FS, use copy instead of hard linking
|
||||
status = CopyFile(env_, path_outside_db, path_inside_db, 0);
|
||||
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync);
|
||||
}
|
||||
} else {
|
||||
status = CopyFile(env_, path_outside_db, path_inside_db, 0);
|
||||
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync);
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::AddFile:FileCopied");
|
||||
if (!status.ok()) {
|
||||
@ -302,8 +313,14 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||
file_to_ingest->num_entries = props->num_entries;
|
||||
|
||||
ParsedInternalKey key;
|
||||
std::unique_ptr<InternalIterator> iter(
|
||||
table_reader->NewIterator(ReadOptions()));
|
||||
ReadOptions ro;
|
||||
// During reading the external file we can cache blocks that we read into
|
||||
// the block cache, if we later change the global seqno of this file, we will
|
||||
// have block in cache that will include keys with wrong seqno.
|
||||
// We need to disable fill_cache so that we read from the file without
|
||||
// updating the block cache.
|
||||
ro.fill_cache = false;
|
||||
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(ro));
|
||||
|
||||
// Get first (smallest) key from file
|
||||
iter->SeekToFirst();
|
||||
@ -325,6 +342,10 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||
}
|
||||
file_to_ingest->largest_user_key = key.user_key.ToString();
|
||||
|
||||
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
|
||||
|
||||
file_to_ingest->table_properties = *props;
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -36,6 +36,10 @@ struct IngestedFileInfo {
|
||||
uint64_t file_size;
|
||||
// total number of keys in external file
|
||||
uint64_t num_entries;
|
||||
// Id of column family this file shoule be ingested into
|
||||
uint32_t cf_id;
|
||||
// TableProperties read from external file
|
||||
TableProperties table_properties;
|
||||
// Version of external file
|
||||
int version;
|
||||
|
||||
@ -96,6 +100,10 @@ class ExternalSstFileIngestionJob {
|
||||
|
||||
VersionEdit* edit() { return &edit_; }
|
||||
|
||||
const autovector<IngestedFileInfo>& files_to_ingest() const {
|
||||
return files_to_ingest_;
|
||||
}
|
||||
|
||||
private:
|
||||
// Open the external file and populate `file_to_ingest` with all the
|
||||
// external information we need to ingest this file.
|
||||
|
@ -15,7 +15,7 @@ namespace rocksdb {
|
||||
class ExternalSSTFileTest : public DBTestBase {
|
||||
public:
|
||||
ExternalSSTFileTest() : DBTestBase("/external_sst_file_test") {
|
||||
sst_files_dir_ = test::TmpDir(env_) + "/sst_files/";
|
||||
sst_files_dir_ = dbname_ + "/sst_files/";
|
||||
DestroyAndRecreateExternalSSTFilesDir();
|
||||
}
|
||||
|
||||
@ -28,7 +28,8 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
const Options options,
|
||||
std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
|
||||
bool allow_global_seqno = false, bool sort_data = false,
|
||||
std::map<std::string, std::string>* true_data = nullptr) {
|
||||
std::map<std::string, std::string>* true_data = nullptr,
|
||||
ColumnFamilyHandle* cfh = nullptr) {
|
||||
// Generate a file id if not provided
|
||||
if (file_id == -1) {
|
||||
file_id = last_file_id_ + 1;
|
||||
@ -51,7 +52,8 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
data.resize(uniq_iter - data.begin());
|
||||
}
|
||||
std::string file_path = sst_files_dir_ + ToString(file_id);
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator,
|
||||
cfh);
|
||||
|
||||
Status s = sst_file_writer.Open(file_path);
|
||||
if (!s.ok()) {
|
||||
@ -69,8 +71,12 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
if (s.ok()) {
|
||||
IngestExternalFileOptions ifo;
|
||||
ifo.allow_global_seqno = allow_global_seqno;
|
||||
if (cfh) {
|
||||
s = db_->IngestExternalFile(cfh, {file_path}, ifo);
|
||||
} else {
|
||||
s = db_->IngestExternalFile({file_path}, ifo);
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok() && true_data) {
|
||||
for (auto& entry : data) {
|
||||
@ -84,25 +90,29 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
Status GenerateAndAddExternalFile(
|
||||
const Options options, std::vector<std::pair<int, std::string>> data,
|
||||
int file_id = -1, bool allow_global_seqno = false, bool sort_data = false,
|
||||
std::map<std::string, std::string>* true_data = nullptr) {
|
||||
std::map<std::string, std::string>* true_data = nullptr,
|
||||
ColumnFamilyHandle* cfh = nullptr) {
|
||||
std::vector<std::pair<std::string, std::string>> file_data;
|
||||
for (auto& entry : data) {
|
||||
file_data.emplace_back(Key(entry.first), entry.second);
|
||||
}
|
||||
return GenerateAndAddExternalFile(options, file_data, file_id,
|
||||
allow_global_seqno, sort_data, true_data);
|
||||
allow_global_seqno, sort_data, true_data,
|
||||
cfh);
|
||||
}
|
||||
|
||||
Status GenerateAndAddExternalFile(
|
||||
const Options options, std::vector<int> keys, int file_id = -1,
|
||||
bool allow_global_seqno = false, bool sort_data = false,
|
||||
std::map<std::string, std::string>* true_data = nullptr) {
|
||||
std::map<std::string, std::string>* true_data = nullptr,
|
||||
ColumnFamilyHandle* cfh = nullptr) {
|
||||
std::vector<std::pair<std::string, std::string>> file_data;
|
||||
for (auto& k : keys) {
|
||||
file_data.emplace_back(Key(k), Key(k) + ToString(file_id));
|
||||
}
|
||||
return GenerateAndAddExternalFile(options, file_data, file_id,
|
||||
allow_global_seqno, sort_data, true_data);
|
||||
allow_global_seqno, sort_data, true_data,
|
||||
cfh);
|
||||
}
|
||||
|
||||
Status DeprecatedAddFile(const std::vector<std::string>& files,
|
||||
@ -1780,6 +1790,151 @@ TEST_F(ExternalSSTFileTest, DirtyExit) {
|
||||
ASSERT_NOK(sst_file_writer->Finish());
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
|
||||
Options options = CurrentOptions();
|
||||
CreateAndReopenWithCF({"koko", "toto"}, options);
|
||||
|
||||
SstFileWriter sfw_default(EnvOptions(), options, options.comparator,
|
||||
handles_[0]);
|
||||
SstFileWriter sfw_cf1(EnvOptions(), options, options.comparator, handles_[1]);
|
||||
SstFileWriter sfw_cf2(EnvOptions(), options, options.comparator, handles_[2]);
|
||||
SstFileWriter sfw_unknown(EnvOptions(), options, options.comparator);
|
||||
|
||||
// default_cf.sst
|
||||
const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst";
|
||||
ASSERT_OK(sfw_default.Open(cf_default_sst));
|
||||
ASSERT_OK(sfw_default.Add("K1", "V1"));
|
||||
ASSERT_OK(sfw_default.Add("K2", "V2"));
|
||||
ASSERT_OK(sfw_default.Finish());
|
||||
|
||||
// cf1.sst
|
||||
const std::string cf1_sst = sst_files_dir_ + "/cf1.sst";
|
||||
ASSERT_OK(sfw_cf1.Open(cf1_sst));
|
||||
ASSERT_OK(sfw_cf1.Add("K3", "V1"));
|
||||
ASSERT_OK(sfw_cf1.Add("K4", "V2"));
|
||||
ASSERT_OK(sfw_cf1.Finish());
|
||||
|
||||
// cf_unknown.sst
|
||||
const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst";
|
||||
ASSERT_OK(sfw_unknown.Open(unknown_sst));
|
||||
ASSERT_OK(sfw_unknown.Add("K5", "V1"));
|
||||
ASSERT_OK(sfw_unknown.Add("K6", "V2"));
|
||||
ASSERT_OK(sfw_unknown.Finish());
|
||||
|
||||
IngestExternalFileOptions ifo;
|
||||
|
||||
// SST CF dont match
|
||||
ASSERT_NOK(db_->IngestExternalFile(handles_[0], {cf1_sst}, ifo));
|
||||
// SST CF dont match
|
||||
ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf1_sst}, ifo));
|
||||
// SST CF match
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[1], {cf1_sst}, ifo));
|
||||
|
||||
// SST CF dont match
|
||||
ASSERT_NOK(db_->IngestExternalFile(handles_[1], {cf_default_sst}, ifo));
|
||||
// SST CF dont match
|
||||
ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf_default_sst}, ifo));
|
||||
// SST CF match
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[0], {cf_default_sst}, ifo));
|
||||
|
||||
// SST CF unknown
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
|
||||
// SST CF unknown
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
|
||||
// SST CF unknown
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
|
||||
|
||||
// Cannot ingest a file into a dropped CF
|
||||
ASSERT_OK(db_->DropColumnFamily(handles_[1]));
|
||||
ASSERT_NOK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
|
||||
|
||||
// CF was not dropped, ok to Ingest
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
|
||||
}
|
||||
|
||||
class TestIngestExternalFileListener : public EventListener {
|
||||
public:
|
||||
void OnExternalFileIngested(DB* db,
|
||||
const ExternalFileIngestionInfo& info) override {
|
||||
ingested_files.push_back(info);
|
||||
}
|
||||
|
||||
std::vector<ExternalFileIngestionInfo> ingested_files;
|
||||
};
|
||||
|
||||
TEST_F(ExternalSSTFileTest, IngestionListener) {
|
||||
Options options = CurrentOptions();
|
||||
TestIngestExternalFileListener* listener =
|
||||
new TestIngestExternalFileListener();
|
||||
options.listeners.emplace_back(listener);
|
||||
CreateAndReopenWithCF({"koko", "toto"}, options);
|
||||
|
||||
// Ingest into default cf
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
|
||||
handles_[0]));
|
||||
ASSERT_EQ(listener->ingested_files.size(), 1);
|
||||
ASSERT_EQ(listener->ingested_files.back().cf_name, "default");
|
||||
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
|
||||
0);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
|
||||
"default");
|
||||
|
||||
// Ingest into cf1
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
|
||||
handles_[1]));
|
||||
ASSERT_EQ(listener->ingested_files.size(), 2);
|
||||
ASSERT_EQ(listener->ingested_files.back().cf_name, "koko");
|
||||
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
|
||||
1);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
|
||||
"koko");
|
||||
|
||||
// Ingest into cf2
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
|
||||
handles_[2]));
|
||||
ASSERT_EQ(listener->ingested_files.size(), 3);
|
||||
ASSERT_EQ(listener->ingested_files.back().cf_name, "toto");
|
||||
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
|
||||
2);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
|
||||
"toto");
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileTest, SnapshotInconsistencyBug) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
const int kNumKeys = 10000;
|
||||
|
||||
// Insert keys using normal path and take a snapshot
|
||||
for (int i = 0; i < kNumKeys; i++) {
|
||||
ASSERT_OK(Put(Key(i), Key(i) + "_V1"));
|
||||
}
|
||||
const Snapshot* snap = db_->GetSnapshot();
|
||||
|
||||
// Overwrite all keys using IngestExternalFile
|
||||
std::string sst_file_path = sst_files_dir_ + "file1.sst";
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
||||
ASSERT_OK(sst_file_writer.Open(sst_file_path));
|
||||
for (int i = 0; i < kNumKeys; i++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(i), Key(i) + "_V2"));
|
||||
}
|
||||
ASSERT_OK(sst_file_writer.Finish());
|
||||
|
||||
IngestExternalFileOptions ifo;
|
||||
ifo.move_files = true;
|
||||
ASSERT_OK(db_->IngestExternalFile({sst_file_path}, ifo));
|
||||
|
||||
for (int i = 0; i < kNumKeys; i++) {
|
||||
ASSERT_EQ(Get(Key(i), snap), Key(i) + "_V1");
|
||||
ASSERT_EQ(Get(Key(i)), Key(i) + "_V2");
|
||||
}
|
||||
|
||||
db_->ReleaseSnapshot(snap);
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -277,7 +277,8 @@ Status FlushJob::WriteLevel0Table() {
|
||||
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
|
||||
static_cast<int>(memtables.size()), &arena));
|
||||
std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
|
||||
&cfd_->internal_comparator(), &range_del_iters[0],
|
||||
&cfd_->internal_comparator(),
|
||||
range_del_iters.empty() ? nullptr : &range_del_iters[0],
|
||||
static_cast<int>(range_del_iters.size())));
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
|
||||
|
@ -221,6 +221,7 @@ static const std::string num_live_versions = "num-live-versions";
|
||||
static const std::string current_version_number =
|
||||
"current-super-version-number";
|
||||
static const std::string estimate_live_data_size = "estimate-live-data-size";
|
||||
static const std::string min_log_number_to_keep = "min-log-number-to-keep";
|
||||
static const std::string base_level = "base-level";
|
||||
static const std::string total_sst_files_size = "total-sst-files-size";
|
||||
static const std::string estimate_pending_comp_bytes =
|
||||
@ -285,6 +286,8 @@ const std::string DB::Properties::kCurrentSuperVersionNumber =
|
||||
rocksdb_prefix + current_version_number;
|
||||
const std::string DB::Properties::kEstimateLiveDataSize =
|
||||
rocksdb_prefix + estimate_live_data_size;
|
||||
const std::string DB::Properties::kMinLogNumberToKeep =
|
||||
rocksdb_prefix + min_log_number_to_keep;
|
||||
const std::string DB::Properties::kTotalSstFilesSize =
|
||||
rocksdb_prefix + total_sst_files_size;
|
||||
const std::string DB::Properties::kBaseLevel = rocksdb_prefix + base_level;
|
||||
@ -368,6 +371,8 @@ const std::unordered_map<std::string, DBPropertyInfo>
|
||||
nullptr}},
|
||||
{DB::Properties::kEstimateLiveDataSize,
|
||||
{true, nullptr, &InternalStats::HandleEstimateLiveDataSize, nullptr}},
|
||||
{DB::Properties::kMinLogNumberToKeep,
|
||||
{false, nullptr, &InternalStats::HandleMinLogNumberToKeep, nullptr}},
|
||||
{DB::Properties::kBaseLevel,
|
||||
{false, nullptr, &InternalStats::HandleBaseLevel, nullptr}},
|
||||
{DB::Properties::kTotalSstFilesSize,
|
||||
@ -705,6 +710,12 @@ bool InternalStats::HandleEstimateLiveDataSize(uint64_t* value, DBImpl* db,
|
||||
return true;
|
||||
}
|
||||
|
||||
bool InternalStats::HandleMinLogNumberToKeep(uint64_t* value, DBImpl* db,
|
||||
Version* version) {
|
||||
*value = db->MinLogNumberToKeep();
|
||||
return true;
|
||||
}
|
||||
|
||||
void InternalStats::DumpDBStats(std::string* value) {
|
||||
char buf[1000];
|
||||
// DB-level stats, only available from default column family
|
||||
|
@ -401,6 +401,7 @@ class InternalStats {
|
||||
Version* version);
|
||||
bool HandleEstimateLiveDataSize(uint64_t* value, DBImpl* db,
|
||||
Version* version);
|
||||
bool HandleMinLogNumberToKeep(uint64_t* value, DBImpl* db, Version* version);
|
||||
|
||||
// Total number of background errors encountered. Every time a flush task
|
||||
// or compaction task fails, this counter is incremented. The failure can
|
||||
|
@ -221,6 +221,8 @@ class MemTableList {
|
||||
// PickMemtablesToFlush() is called.
|
||||
void FlushRequested() { flush_requested_ = true; }
|
||||
|
||||
bool HasFlushRequested() { return flush_requested_; }
|
||||
|
||||
// Copying allowed
|
||||
// MemTableList(const MemTableList&);
|
||||
// void operator=(const MemTableList&);
|
||||
|
@ -317,7 +317,7 @@ void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
|
||||
|
||||
while (w != pg->last_writer) {
|
||||
// Writers that won't write don't get sequence allotment
|
||||
if (!w->CallbackFailed()) {
|
||||
if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) {
|
||||
sequence += WriteBatchInternal::Count(w->batch);
|
||||
}
|
||||
w = w->link_newer;
|
||||
|
@ -500,6 +500,10 @@ class DB {
|
||||
// live data in bytes.
|
||||
static const std::string kEstimateLiveDataSize;
|
||||
|
||||
// "rocksdb.min-log-number-to-keep" - return the minmum log number of the
|
||||
// log files that should be kept.
|
||||
static const std::string kMinLogNumberToKeep;
|
||||
|
||||
// "rocksdb.total-sst-files-size" - returns total size (bytes) of all SST
|
||||
// files.
|
||||
// WARNING: may slow down online queries if there are too many files.
|
||||
@ -565,6 +569,7 @@ class DB {
|
||||
// "rocksdb.num-live-versions"
|
||||
// "rocksdb.current-super-version-number"
|
||||
// "rocksdb.estimate-live-data-size"
|
||||
// "rocksdb.min-log-number-to-keep"
|
||||
// "rocksdb.total-sst-files-size"
|
||||
// "rocksdb.base-level"
|
||||
// "rocksdb.estimate-pending-compaction-bytes"
|
||||
|
@ -170,6 +170,20 @@ struct MemTableInfo {
|
||||
|
||||
};
|
||||
|
||||
struct ExternalFileIngestionInfo {
|
||||
// the name of the column family
|
||||
std::string cf_name;
|
||||
// Path of the file outside the DB
|
||||
std::string external_file_path;
|
||||
// Path of the file inside the DB
|
||||
std::string internal_file_path;
|
||||
// The global sequence number assigned to keys in this file
|
||||
SequenceNumber global_seqno;
|
||||
// Table properties of the table being flushed
|
||||
TableProperties table_properties;
|
||||
};
|
||||
|
||||
|
||||
// EventListener class contains a set of call-back functions that will
|
||||
// be called when specific RocksDB event happens such as flush. It can
|
||||
// be used as a building block for developing custom features such as
|
||||
@ -291,6 +305,15 @@ class EventListener {
|
||||
virtual void OnColumnFamilyHandleDeletionStarted(ColumnFamilyHandle* handle) {
|
||||
}
|
||||
|
||||
// A call-back function for RocksDB which will be called after an external
|
||||
// file is ingested using IngestExternalFile.
|
||||
//
|
||||
// Note that the this function will run on the same thread as
|
||||
// IngestExternalFile(), if this function is blocked, IngestExternalFile()
|
||||
// will be blocked from finishing.
|
||||
virtual void OnExternalFileIngested(
|
||||
DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}
|
||||
|
||||
virtual ~EventListener() {}
|
||||
};
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <string>
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/table_properties.h"
|
||||
#include "rocksdb/types.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -43,8 +44,12 @@ struct ExternalSstFileInfo {
|
||||
// All keys in files generated by SstFileWriter will have sequence number = 0
|
||||
class SstFileWriter {
|
||||
public:
|
||||
// User can pass `column_family` to specify that the the generated file will
|
||||
// be ingested into this column_family, note that passing nullptr means that
|
||||
// the column_family is unknown.
|
||||
SstFileWriter(const EnvOptions& env_options, const Options& options,
|
||||
const Comparator* user_comparator);
|
||||
const Comparator* user_comparator,
|
||||
ColumnFamilyHandle* column_family = nullptr);
|
||||
|
||||
~SstFileWriter();
|
||||
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 5
|
||||
#define ROCKSDB_MINOR 0
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 2
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
2
src.mk
2
src.mk
@ -233,6 +233,8 @@ MAIN_SOURCES = \
|
||||
db/db_inplace_update_test.cc \
|
||||
db/db_iterator_test.cc \
|
||||
db/db_log_iter_test.cc \
|
||||
db/db_memtable_test.cc \
|
||||
db/db_merge_operator_test.cc \
|
||||
db/db_options_test.cc \
|
||||
db/db_range_del_test.cc \
|
||||
db/db_sst_test.cc \
|
||||
|
@ -21,11 +21,12 @@ const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
|
||||
|
||||
struct SstFileWriter::Rep {
|
||||
Rep(const EnvOptions& _env_options, const Options& options,
|
||||
const Comparator* _user_comparator)
|
||||
const Comparator* _user_comparator, ColumnFamilyHandle* _cfh)
|
||||
: env_options(_env_options),
|
||||
ioptions(options),
|
||||
mutable_cf_options(options),
|
||||
internal_comparator(_user_comparator) {}
|
||||
internal_comparator(_user_comparator),
|
||||
cfh(_cfh) {}
|
||||
|
||||
std::unique_ptr<WritableFileWriter> file_writer;
|
||||
std::unique_ptr<TableBuilder> builder;
|
||||
@ -34,14 +35,16 @@ struct SstFileWriter::Rep {
|
||||
MutableCFOptions mutable_cf_options;
|
||||
InternalKeyComparator internal_comparator;
|
||||
ExternalSstFileInfo file_info;
|
||||
std::string column_family_name;
|
||||
InternalKey ikey;
|
||||
std::string column_family_name;
|
||||
ColumnFamilyHandle* cfh;
|
||||
};
|
||||
|
||||
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
|
||||
const Options& options,
|
||||
const Comparator* user_comparator)
|
||||
: rep_(new Rep(env_options, options, user_comparator)) {}
|
||||
const Comparator* user_comparator,
|
||||
ColumnFamilyHandle* column_family)
|
||||
: rep_(new Rep(env_options, options, user_comparator, column_family)) {}
|
||||
|
||||
SstFileWriter::~SstFileWriter() {
|
||||
if (rep_->builder) {
|
||||
@ -89,6 +92,18 @@ Status SstFileWriter::Open(const std::string& file_path) {
|
||||
user_collector_factories[i]));
|
||||
}
|
||||
int unknown_level = -1;
|
||||
uint32_t cf_id;
|
||||
|
||||
if (r->cfh != nullptr) {
|
||||
// user explicitly specified that this file will be ingested into cfh,
|
||||
// we can persist this information in the file.
|
||||
cf_id = r->cfh->GetID();
|
||||
r->column_family_name = r->cfh->GetName();
|
||||
} else {
|
||||
r->column_family_name = "";
|
||||
cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
|
||||
}
|
||||
|
||||
TableBuilderOptions table_builder_options(
|
||||
r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
|
||||
compression_type, r->ioptions.compression_opts,
|
||||
@ -100,9 +115,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
|
||||
// TODO(tec) : If table_factory is using compressed block cache, we will
|
||||
// be adding the external sst file blocks into it, which is wasteful.
|
||||
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
|
||||
table_builder_options,
|
||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
|
||||
r->file_writer.get()));
|
||||
table_builder_options, cf_id, r->file_writer.get()));
|
||||
|
||||
r->file_info.file_path = file_path;
|
||||
r->file_info.file_size = 0;
|
||||
|
@ -16,7 +16,7 @@ namespace rocksdb {
|
||||
|
||||
// Utility function to copy a file up to a specified length
|
||||
Status CopyFile(Env* env, const std::string& source,
|
||||
const std::string& destination, uint64_t size) {
|
||||
const std::string& destination, uint64_t size, bool use_fsync) {
|
||||
const EnvOptions soptions;
|
||||
Status s;
|
||||
unique_ptr<SequentialFileReader> src_reader;
|
||||
@ -62,6 +62,7 @@ Status CopyFile(Env* env, const std::string& source,
|
||||
}
|
||||
size -= slice.size();
|
||||
}
|
||||
dest_writer->Sync(use_fsync);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -12,9 +12,11 @@
|
||||
#include "util/db_options.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// use_fsync maps to options.use_fsync, which determines the way that
|
||||
// the file is synced after copying.
|
||||
extern Status CopyFile(Env* env, const std::string& source,
|
||||
const std::string& destination, uint64_t size = 0);
|
||||
const std::string& destination, uint64_t size,
|
||||
bool use_fsync);
|
||||
|
||||
extern Status CreateFile(Env* env, const std::string& destination,
|
||||
const std::string& contents);
|
||||
|
@ -706,7 +706,7 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeForPointLookup(
|
||||
block_based_options.block_cache =
|
||||
NewLRUCache(static_cast<size_t>(block_cache_size_mb * 1024 * 1024));
|
||||
table_factory.reset(new BlockBasedTableFactory(block_based_options));
|
||||
memtable_factory.reset(NewHashLinkListRepFactory());
|
||||
memtable_prefix_bloom_size_ratio = 0.02;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include <assert.h>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
@ -10,6 +10,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
@ -62,6 +62,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
Status s;
|
||||
std::vector<std::string> live_files;
|
||||
uint64_t manifest_file_size = 0;
|
||||
DBOptions db_options = db_->GetDBOptions();
|
||||
uint64_t min_log_num = port::kMaxUint64;
|
||||
uint64_t sequence_number = db_->GetLatestSequenceNumber();
|
||||
bool same_fs = true;
|
||||
VectorLogPtr live_wal_files;
|
||||
@ -78,6 +80,35 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
if (s.ok()) {
|
||||
// this will return live_files prefixed with "/"
|
||||
s = db_->GetLiveFiles(live_files, &manifest_file_size);
|
||||
|
||||
if (s.ok() && db_options.allow_2pc) {
|
||||
// If 2PC is enabled, we need to get minimum log number after the flush.
|
||||
// Need to refetch the live files to recapture the snapshot.
|
||||
if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
|
||||
&min_log_num)) {
|
||||
db_->EnableFileDeletions(false);
|
||||
return Status::InvalidArgument(
|
||||
"2PC enabled but cannot fine the min log number to keep.");
|
||||
}
|
||||
// We need to refetch live files with flush to handle this case:
|
||||
// A previous 000001.log contains the prepare record of transaction tnx1.
|
||||
// The current log file is 000002.log, and sequence_number points to this
|
||||
// file.
|
||||
// After calling GetLiveFiles(), 000003.log is created.
|
||||
// Then tnx1 is committed. The commit record is written to 000003.log.
|
||||
// Now we fetch min_log_num, which will be 3.
|
||||
// Then only 000002.log and 000003.log will be copied, and 000001.log will
|
||||
// be skipped. 000003.log contains commit message of tnx1, but we don't
|
||||
// have respective prepare record for it.
|
||||
// In order to avoid this situation, we need to force flush to make sure
|
||||
// all transactions commited before getting min_log_num will be flushed
|
||||
// to SST files.
|
||||
// We cannot get min_log_num before calling the GetLiveFiles() for the
|
||||
// first time, because if we do that, all the logs files will be included,
|
||||
// far more than needed.
|
||||
s = db_->GetLiveFiles(live_files, &manifest_file_size, /* flush */ true);
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
|
||||
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
|
||||
}
|
||||
@ -91,7 +122,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
}
|
||||
|
||||
size_t wal_size = live_wal_files.size();
|
||||
Log(db_->GetOptions().info_log,
|
||||
Log(db_options.info_log,
|
||||
"Started the snapshot process -- creating snapshot in directory %s",
|
||||
checkpoint_dir.c_str());
|
||||
|
||||
@ -130,7 +161,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
// * if it's kDescriptorFile, limit the size to manifest_file_size
|
||||
// * always copy if cross-device link
|
||||
if ((type == kTableFile) && same_fs) {
|
||||
Log(db_->GetOptions().info_log, "Hard Linking %s", src_fname.c_str());
|
||||
Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());
|
||||
s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname,
|
||||
full_private_path + src_fname);
|
||||
if (s.IsNotSupported()) {
|
||||
@ -139,39 +170,41 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
}
|
||||
}
|
||||
if ((type != kTableFile) || (!same_fs)) {
|
||||
Log(db_->GetOptions().info_log, "Copying %s", src_fname.c_str());
|
||||
Log(db_options.info_log, "Copying %s", src_fname.c_str());
|
||||
s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname,
|
||||
full_private_path + src_fname,
|
||||
(type == kDescriptorFile) ? manifest_file_size : 0);
|
||||
(type == kDescriptorFile) ? manifest_file_size : 0,
|
||||
db_options.use_fsync);
|
||||
}
|
||||
}
|
||||
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
|
||||
s = CreateFile(db_->GetEnv(), full_private_path + current_fname,
|
||||
manifest_fname.substr(1) + "\n");
|
||||
}
|
||||
Log(db_->GetOptions().info_log, "Number of log files %" ROCKSDB_PRIszt,
|
||||
Log(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
|
||||
live_wal_files.size());
|
||||
|
||||
// Link WAL files. Copy exact size of last one because it is the only one
|
||||
// that has changes after the last flush.
|
||||
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
|
||||
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
|
||||
(live_wal_files[i]->StartSequence() >= sequence_number)) {
|
||||
(live_wal_files[i]->StartSequence() >= sequence_number ||
|
||||
live_wal_files[i]->LogNumber() >= min_log_num)) {
|
||||
if (i + 1 == wal_size) {
|
||||
Log(db_->GetOptions().info_log, "Copying %s",
|
||||
Log(db_options.info_log, "Copying %s",
|
||||
live_wal_files[i]->PathName().c_str());
|
||||
s = CopyFile(db_->GetEnv(),
|
||||
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
|
||||
db_options.wal_dir + live_wal_files[i]->PathName(),
|
||||
full_private_path + live_wal_files[i]->PathName(),
|
||||
live_wal_files[i]->SizeFileBytes());
|
||||
live_wal_files[i]->SizeFileBytes(), db_options.use_fsync);
|
||||
break;
|
||||
}
|
||||
if (same_fs) {
|
||||
// we only care about live log files
|
||||
Log(db_->GetOptions().info_log, "Hard Linking %s",
|
||||
Log(db_options.info_log, "Hard Linking %s",
|
||||
live_wal_files[i]->PathName().c_str());
|
||||
s = db_->GetEnv()->LinkFile(
|
||||
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
|
||||
db_options.wal_dir + live_wal_files[i]->PathName(),
|
||||
full_private_path + live_wal_files[i]->PathName());
|
||||
if (s.IsNotSupported()) {
|
||||
same_fs = false;
|
||||
@ -179,11 +212,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
}
|
||||
}
|
||||
if (!same_fs) {
|
||||
Log(db_->GetOptions().info_log, "Copying %s",
|
||||
Log(db_options.info_log, "Copying %s",
|
||||
live_wal_files[i]->PathName().c_str());
|
||||
s = CopyFile(db_->GetEnv(),
|
||||
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
|
||||
full_private_path + live_wal_files[i]->PathName(), 0);
|
||||
db_options.wal_dir + live_wal_files[i]->PathName(),
|
||||
full_private_path + live_wal_files[i]->PathName(), 0,
|
||||
db_options.use_fsync);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -205,27 +239,26 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
|
||||
if (!s.ok()) {
|
||||
// clean all the files we might have created
|
||||
Log(db_->GetOptions().info_log, "Snapshot failed -- %s",
|
||||
s.ToString().c_str());
|
||||
Log(db_options.info_log, "Snapshot failed -- %s", s.ToString().c_str());
|
||||
// we have to delete the dir and all its children
|
||||
std::vector<std::string> subchildren;
|
||||
db_->GetEnv()->GetChildren(full_private_path, &subchildren);
|
||||
for (auto& subchild : subchildren) {
|
||||
std::string subchild_path = full_private_path + "/" + subchild;
|
||||
Status s1 = db_->GetEnv()->DeleteFile(subchild_path);
|
||||
Log(db_->GetOptions().info_log, "Delete file %s -- %s",
|
||||
subchild_path.c_str(), s1.ToString().c_str());
|
||||
Log(db_options.info_log, "Delete file %s -- %s", subchild_path.c_str(),
|
||||
s1.ToString().c_str());
|
||||
}
|
||||
// finally delete the private dir
|
||||
Status s1 = db_->GetEnv()->DeleteDir(full_private_path);
|
||||
Log(db_->GetOptions().info_log, "Delete dir %s -- %s",
|
||||
full_private_path.c_str(), s1.ToString().c_str());
|
||||
Log(db_options.info_log, "Delete dir %s -- %s", full_private_path.c_str(),
|
||||
s1.ToString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
// here we know that we succeeded and installed the new snapshot
|
||||
Log(db_->GetOptions().info_log, "Snapshot DONE. All is good");
|
||||
Log(db_->GetOptions().info_log, "Snapshot sequence number: %" PRIu64,
|
||||
Log(db_options.info_log, "Snapshot DONE. All is good");
|
||||
Log(db_options.info_log, "Snapshot sequence number: %" PRIu64,
|
||||
sequence_number);
|
||||
|
||||
return s;
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/utilities/checkpoint.h"
|
||||
#include "rocksdb/utilities/transaction_db.h"
|
||||
#include "util/sync_point.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/xfunc.h"
|
||||
@ -390,6 +391,120 @@ TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing) {
|
||||
snapshotDB = nullptr;
|
||||
}
|
||||
|
||||
TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing2PC) {
|
||||
const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot";
|
||||
const std::string dbname = test::TmpDir() + "/transaction_testdb";
|
||||
ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions()));
|
||||
ASSERT_OK(DestroyDB(dbname, CurrentOptions()));
|
||||
env_->DeleteDir(kSnapshotName);
|
||||
env_->DeleteDir(dbname);
|
||||
Close();
|
||||
|
||||
Options options = CurrentOptions();
|
||||
// allow_2pc is implicitly set with tx prepare
|
||||
// options.allow_2pc = true;
|
||||
TransactionDBOptions txn_db_options;
|
||||
TransactionDB* txdb;
|
||||
Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb);
|
||||
assert(s.ok());
|
||||
ColumnFamilyHandle* cfa;
|
||||
ColumnFamilyHandle* cfb;
|
||||
ColumnFamilyOptions cf_options;
|
||||
ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFA", &cfa));
|
||||
|
||||
WriteOptions write_options;
|
||||
// Insert something into CFB so lots of log files will be kept
|
||||
// before creating the checkpoint.
|
||||
ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFB", &cfb));
|
||||
ASSERT_OK(txdb->Put(write_options, cfb, "", ""));
|
||||
|
||||
ReadOptions read_options;
|
||||
std::string value;
|
||||
TransactionOptions txn_options;
|
||||
Transaction* txn = txdb->BeginTransaction(write_options, txn_options);
|
||||
s = txn->SetName("xid");
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ(txdb->GetTransactionByName("xid"), txn);
|
||||
|
||||
s = txn->Put(Slice("foo"), Slice("bar"));
|
||||
s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa"));
|
||||
ASSERT_OK(s);
|
||||
// Writing prepare into middle of first WAL, then flush WALs many times
|
||||
for (int i = 1; i <= 100000; i++) {
|
||||
Transaction* tx = txdb->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_OK(tx->SetName("x"));
|
||||
ASSERT_OK(tx->Put(Slice(std::to_string(i)), Slice("val")));
|
||||
ASSERT_OK(tx->Put(cfa, Slice("aaa"), Slice("111")));
|
||||
ASSERT_OK(tx->Prepare());
|
||||
ASSERT_OK(tx->Commit());
|
||||
if (i % 10000 == 0) {
|
||||
txdb->Flush(FlushOptions());
|
||||
}
|
||||
if (i == 88888) {
|
||||
ASSERT_OK(txn->Prepare());
|
||||
}
|
||||
}
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
|
||||
"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"},
|
||||
{"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit",
|
||||
"CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
std::thread t([&]() {
|
||||
Checkpoint* checkpoint;
|
||||
ASSERT_OK(Checkpoint::Create(txdb, &checkpoint));
|
||||
ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName));
|
||||
delete checkpoint;
|
||||
});
|
||||
TEST_SYNC_POINT("DBTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit");
|
||||
ASSERT_OK(txn->Commit());
|
||||
TEST_SYNC_POINT(
|
||||
"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit");
|
||||
t.join();
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
// No more than two logs files should exist.
|
||||
std::vector<std::string> files;
|
||||
env_->GetChildren(kSnapshotName, &files);
|
||||
int num_log_files = 0;
|
||||
for (auto& file : files) {
|
||||
uint64_t num;
|
||||
FileType type;
|
||||
WalFileType log_type;
|
||||
if (ParseFileName(file, &num, &type, &log_type) && type == kLogFile) {
|
||||
num_log_files++;
|
||||
}
|
||||
}
|
||||
// One flush after preapare + one outstanding file before checkpoint + one log
|
||||
// file generated after checkpoint.
|
||||
ASSERT_LE(num_log_files, 3);
|
||||
|
||||
TransactionDB* snapshotDB;
|
||||
std::vector<ColumnFamilyDescriptor> column_families;
|
||||
column_families.push_back(
|
||||
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
|
||||
column_families.push_back(
|
||||
ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
|
||||
column_families.push_back(
|
||||
ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> cf_handles;
|
||||
ASSERT_OK(TransactionDB::Open(options, txn_db_options, kSnapshotName,
|
||||
column_families, &cf_handles, &snapshotDB));
|
||||
ASSERT_OK(snapshotDB->Get(read_options, "foo", &value));
|
||||
ASSERT_EQ(value, "bar");
|
||||
ASSERT_OK(snapshotDB->Get(read_options, cf_handles[1], "foocfa", &value));
|
||||
ASSERT_EQ(value, "barcfa");
|
||||
|
||||
delete cfa;
|
||||
delete cfb;
|
||||
delete cf_handles[0];
|
||||
delete cf_handles[1];
|
||||
delete cf_handles[2];
|
||||
delete snapshotDB;
|
||||
snapshotDB = nullptr;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -9,6 +9,7 @@
|
||||
#ifndef OS_WIN
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <functional>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <functional>
|
||||
#include "util/random.h"
|
||||
#include "utilities/persistent_cache/hash_table.h"
|
||||
#include "utilities/persistent_cache/lrulist.h"
|
||||
|
@ -1178,6 +1178,108 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
|
||||
delete cfb;
|
||||
}
|
||||
|
||||
TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
|
||||
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
||||
|
||||
Status s;
|
||||
ColumnFamilyHandle *cfa, *cfb;
|
||||
|
||||
ColumnFamilyOptions cf_options;
|
||||
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
|
||||
ASSERT_OK(s);
|
||||
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
|
||||
ASSERT_OK(s);
|
||||
|
||||
WriteOptions wopts;
|
||||
wopts.disableWAL = false;
|
||||
wopts.sync = true;
|
||||
|
||||
auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
|
||||
auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
|
||||
|
||||
TransactionOptions topts1;
|
||||
Transaction* txn1 = db->BeginTransaction(wopts, topts1);
|
||||
s = txn1->SetName("xid1");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put(cfa, "boys", "girls1");
|
||||
ASSERT_OK(s);
|
||||
|
||||
Transaction* txn2 = db->BeginTransaction(wopts, topts1);
|
||||
s = txn2->SetName("xid2");
|
||||
ASSERT_OK(s);
|
||||
s = txn2->Put(cfb, "up", "down1");
|
||||
ASSERT_OK(s);
|
||||
|
||||
// prepre transaction in LOG A
|
||||
s = txn1->Prepare();
|
||||
ASSERT_OK(s);
|
||||
|
||||
// prepre transaction in LOG A
|
||||
s = txn2->Prepare();
|
||||
ASSERT_OK(s);
|
||||
|
||||
// regular put so that mem table can actually be flushed for log rolling
|
||||
s = db->Put(wopts, "cats", "dogs1");
|
||||
ASSERT_OK(s);
|
||||
|
||||
auto prepare_log_no = txn1->GetLogNumber();
|
||||
|
||||
// roll to LOG B
|
||||
s = db_impl->TEST_FlushMemTable(true);
|
||||
ASSERT_OK(s);
|
||||
|
||||
// now we pause background work so that
|
||||
// imm()s are not flushed before we can check their status
|
||||
s = db_impl->PauseBackgroundWork();
|
||||
ASSERT_OK(s);
|
||||
|
||||
ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
|
||||
ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
|
||||
ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
|
||||
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
|
||||
prepare_log_no);
|
||||
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
|
||||
|
||||
// commit in LOG B
|
||||
s = txn1->Commit();
|
||||
ASSERT_OK(s);
|
||||
|
||||
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no);
|
||||
|
||||
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
|
||||
|
||||
// request a flush for all column families such that the earliest
|
||||
// alive log file can be killed
|
||||
db_impl->TEST_MaybeFlushColumnFamilies();
|
||||
// log cannot be flushed because txn2 has not been commited
|
||||
ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
|
||||
ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog());
|
||||
|
||||
// assert that cfa has a flush requested
|
||||
ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
|
||||
|
||||
// cfb should not be flushed becuse it has no data from LOG A
|
||||
ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
|
||||
|
||||
// cfb now has data from LOG A
|
||||
s = txn2->Commit();
|
||||
ASSERT_OK(s);
|
||||
|
||||
db_impl->TEST_MaybeFlushColumnFamilies();
|
||||
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
|
||||
|
||||
// we should see that cfb now has a flush requested
|
||||
ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
|
||||
|
||||
// all data in LOG A resides in a memtable that has been
|
||||
// requested for a flush
|
||||
ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
|
||||
|
||||
delete txn1;
|
||||
delete txn2;
|
||||
delete cfa;
|
||||
delete cfb;
|
||||
}
|
||||
/*
|
||||
* 1) use prepare to keep first log around to determine starting sequence
|
||||
* during recovery.
|
||||
|
Loading…
Reference in New Issue
Block a user