Compare commits

...

30 Commits
main ... 5.1.fb

Author SHA1 Message Date
sdong
3500b728d1 Disable warning as error 2019-11-05 11:08:10 -08:00
sdong
275488d483 [FB Internal] Point to the latest tool chain. 2019-10-31 14:12:34 -07:00
sdong
b8674494bc Revert "[FB Internal] Remove code for FB-only toolchain (since GCC 4.8 is everywhere already)"
This reverts commit 61720850f6.
2019-10-31 13:57:46 -07:00
sdong
f659afc64a [FB Internal] Remove code for FB-only toolchain (since GCC 4.8 is everywhere already) 2019-10-31 13:53:03 -07:00
sdong
68c5ae432f [FB Only] use gcc-5 2017-07-17 22:07:08 -07:00
Islam AbdelRahman
8277cd14aa Bump version to 5.1.5 2017-03-08 17:45:02 -08:00
Aaron Gao
acb2e98a90 Reset DBIter::saved_key_ with proper user key anywhere before pass to DBIter::FindNextUserEntry
Summary:
fix db_iter bug introduced by [facebook#1413](https://github.com/facebook/rocksdb/pull/1413)
Closes https://github.com/facebook/rocksdb/pull/1962

Differential Revision: D4672369

Pulled By: lightmark

fbshipit-source-id: 6a22953
2017-03-08 17:41:09 -08:00
Maysam Yabandeh
241c45d1df Print the missed last layer in cfstats
Summary:
Printing compaction stats used to operate on two variable:
number_levels_: for printing the layer
num_levels_to_check: for updating the compaction score

After this commit: 361010d447 these two are mixed up and as a result the last layer might not be printed out: https://fb.facebook.com/groups/rocksdb.internal/permalink/1315716625143616/

number_levels_ was used to decide which layers to print: 672300f47f/db/internal_stats.cc (L753) but after the patch it is based on the return value of DumpCFMapStats 361010d447/db/internal_stats.cc (L929) which returns num_levels_to_check: 361010d447/db/internal_stats.cc (L917)
Closes https://github.com/facebook/rocksdb/pull/1853

Differential Revision: D4529280

Pulled By: maysamyabandeh

fbshipit-source-id: 3fd9448
2017-03-06 09:41:04 -08:00
Siying Dong
d19646e1c1 Make compaction_pri settable through option string
Summary: Closes https://github.com/facebook/rocksdb/pull/1941

Differential Revision: D4637253

Pulled By: siying

fbshipit-source-id: a59dcdb
2017-03-02 14:58:29 -08:00
Aaron Gao
462c21dda6 release v5.1.4 2017-02-24 16:23:38 -08:00
Siying Dong
d6cfa64721 Fix OSX build break after the fallocate change
Summary:
The recent update about fallocate failed OSX build. Fix it.
Closes https://github.com/facebook/rocksdb/pull/1830

Differential Revision: D4500235

Pulled By: siying

fbshipit-source-id: a5f2b40
2017-02-24 16:05:12 -08:00
Yulia Kartseva
1f055723b7 alignment is on in ReadaheadRandomAccessFile::Read()
Summary: Closes https://github.com/facebook/rocksdb/pull/1857

Differential Revision: D4534518

Pulled By: wat-ze-hex

fbshipit-source-id: b456946
2017-02-23 17:10:51 -08:00
Aaron Gao
dcafd40aed bump patch version number 2017-02-23 16:44:30 -08:00
Aaron Gao
a3576c7c4b add use_direct_io() to ReadaheadRandomAccessFile
Summary:
Missing this function will cause RandomAccessFileReader not doing alignment in Direct IO mode, which introduce an IOError: invalid argument.
Closes https://github.com/facebook/rocksdb/pull/1900

Differential Revision: D4601261

Pulled By: lightmark

fbshipit-source-id: c3eadf1
2017-02-23 16:44:30 -08:00
Aaron Gao
6fa04041de truncate patch
Summary:
omit the override for the previous commit
Closes https://github.com/facebook/rocksdb/pull/1898

Differential Revision: D4598743

Pulled By: lightmark

fbshipit-source-id: f98a378
2017-02-23 16:44:29 -08:00
Aaron Gao
4abd57290c posix writablefile truncate
Summary:
we occasionally missing this call so the file size will be wrong
Closes https://github.com/facebook/rocksdb/pull/1894

Differential Revision: D4598446

Pulled By: lightmark

fbshipit-source-id: 42b6ef5
2017-02-23 16:44:29 -08:00
Siying Dong
f35574a16d RangeSync() should work with ROCKSDB_FALLOCATE_PRESENT not set
Summary: Closes https://github.com/facebook/rocksdb/pull/1824

Differential Revision: D4493862

Pulled By: siying

fbshipit-source-id: c168446
2017-02-23 11:34:45 -08:00
Siying Dong
00c6c53fc8 Update HISTORY.md for wrong release date 2017-02-08 11:28:31 -08:00
Alex Yang
c2ca7a9f99 bump patch version number 2017-02-02 16:40:34 -08:00
Islam AbdelRahman
d4b0ac2760 Fix CompactFiles() bug when used with CompactionFilter using SuperVersion
Summary:
GetAndRefSuperVersion() should not be called again in the same thread before ReturnAndCleanupSuperVersion() is called.

If we have a compaction filter that is using DB::Get, This will happen
```
CompactFiles() {
  GetAndRefSuperVersion() // -- first call
    ..
    CompactionFilter() {
      GetAndRefSuperVersion() // -- second call
      ReturnAndCleanupSuperVersion()
    }
    ..
  ReturnAndCleanupSuperVersion()
}
```

We solve this issue in the same way Iterator is solving it, but using GetReferencedSuperVersion()

This was discovered in https://github.com/facebook/mysql-5.6/issues/427 by alxyang
Closes https://github.com/facebook/rocksdb/pull/1803

Differential Revision: D4460155

Pulled By: IslamAbdelRahman

fbshipit-source-id: 5e54322
2017-02-02 16:16:47 -08:00
Andrew Kryczka
a7a2005873 bump patch version number 2017-01-31 16:52:01 -08:00
Andrew Kryczka
acf6d6bcab Fix DeleteRange including sentinels in output files
Summary:
when writing RangeDelAggregator::AddToBuilder, I forgot that there are sentinel tombstones in the middle of the interval map since gaps between real tombstones are represented with sentinels.

blame: #1614
Closes https://github.com/facebook/rocksdb/pull/1804

Differential Revision: D4460426

Pulled By: ajkr

fbshipit-source-id: 69444b5
2017-01-27 10:50:01 -08:00
Andrew Kryczka
241267966c Test merge op covered by range deletion in memtable 2017-01-23 12:28:46 -08:00
Siying Dong
b642e94c88 Fix 2PC with concurrent memtable insert
Summary:
If concurrent memtable insert is enabled, and one prepare command and a normal command are grouped into a commit group, the sequence ID will be calculated incorrectly.
Closes https://github.com/facebook/rocksdb/pull/1730

Differential Revision: D4371081

Pulled By: siying

fbshipit-source-id: cd40c6d
2017-01-20 11:52:36 -08:00
Siying Dong
53571a7a70 Fix OptimizeForPointLookup()
Summary:
If users directly call OptimizeForPointLookup(), it is broken as the option isn't compatible with parallel memtable insert. Fix it by using memtable bloomo filter instead.
Closes https://github.com/facebook/rocksdb/pull/1791

Differential Revision: D4442836

Pulled By: siying

fbshipit-source-id: bf6c9cd
2017-01-20 11:15:38 -08:00
Yi Wu
7079b78827 Flush job should release reference current version if sync log failed
Summary:
Fix the bug when sync log fail, FlushJob::Run() will not be execute and
reference to cfd->current() will not be release.
Closes https://github.com/facebook/rocksdb/pull/1792

Differential Revision: D4441316

Pulled By: yiwu-arbug

fbshipit-source-id: 5523e28
2017-01-20 10:47:01 -08:00
Reid Horuff
512e441819 Fix for 2PC causing WAL to grow too large
Summary:
Consider the following single column family scenario:
prepare in log A
commit in log B
*WAL is too large, flush all CFs to releast log A*
*CFA is on log B so we do not see CFA is depending on log A so no flush is requested*

To fix this we must also consider the log containing the prepare section when determining what log a CF is dependent on.
Closes https://github.com/facebook/rocksdb/pull/1768

Differential Revision: D4403265

Pulled By: reidHoruff

fbshipit-source-id: ce800ff
2017-01-20 10:46:47 -08:00
Yi Wu
62a1c55418 Remove fadvise with direct IO read
Summary:
Remove the logic since we don't use buffer cache with direct IO. Resolve
read regression we currently have.
Closes https://github.com/facebook/rocksdb/pull/1782

Differential Revision: D4430408

Pulled By: yiwu-arbug

fbshipit-source-id: 5557bba
2017-01-18 15:51:54 -08:00
Andrew Kryczka
adce9c0a87 Fix DeleteRange file boundary correctness issue with max_compaction_bytes
Summary:
Cockroachdb exposed this bug in #1778. The bug happens when a compaction's output files are ended due to exceeding max_compaction_bytes. In that case we weren't taking into account the next file's start key when deciding how far to extend the current file's max_key. This caused the non-overlapping key-range invariant to be violated.

Note this was correctly handled for the usual case of cutting compaction output, which is file size exceeding max_output_file_size. I am not sure why these are two separate code paths, but we can consider refactoring it to prevent such errors in the future.
Closes https://github.com/facebook/rocksdb/pull/1784

Differential Revision: D4430235

Pulled By: ajkr

fbshipit-source-id: 80af748
2017-01-18 13:39:44 -08:00
Maysam Yabandeh
dabcce9dfb Update release version to 5.1.0 2017-01-13 16:44:14 -08:00
40 changed files with 808 additions and 121 deletions

View File

@ -1,5 +1,5 @@
# Rocksdb Change Log
## Unreleased
## 5.1.0 (01/13/2017)
### Public API Change
* Support dynamically change `delete_obsolete_files_period_micros` option via SetDBOptions().
* Added EventListener::OnExternalFileIngested which will be called when IngestExternalFile() add a file successfully.

View File

@ -223,11 +223,6 @@ default: all
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
-Wno-unused-parameter
ifndef DISABLE_WARNING_AS_ERROR
WARNING_FLAGS += -Werror
endif
ifdef LUA_PATH
ifndef LUA_INCLUDE

View File

@ -51,12 +51,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
FBCODE_BUILD="true"
# If we're compiling with TSAN we need pic build
PIC_BUILD=$COMPILE_WITH_TSAN
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
source "$PWD/build_tools/fbcode_config.sh"
else
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
fi
fi
# Delete existing output, if it exists

View File

@ -1,18 +1,19 @@
GCC_BASE=/mnt/gvfs/third-party2/gcc/cf7d14c625ce30bae1a4661c2319c5a283e4dd22/4.9.x/centos6-native/108cf83
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/8598c375b0e94e1448182eb3df034704144a838d/stable/centos6-native/3f16ddd
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/d6e0a7da6faba45f5e5b1638f9edd7afc2f34e7d/4.9.x/gcc-4.9-glibc-2.20/024dbc3
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/d282e6e8f3d20f4e40a516834847bdc038e07973/2.20/gcc-4.9-glibc-2.20/500e281
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/0882df3713c7a84f15abe368dc004581f20b39d7/1.2.8/gcc-5-glibc-2.23/9bc6787
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/740325875f6729f42d28deaa2147b0854f3a347e/1.0.6/gcc-5-glibc-2.23/9bc6787
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/9455f75ff7f4831dc9fda02a6a0f8c68922fad8f/1.0.0/gcc-5-glibc-2.23/9bc6787
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/f001a51b2854957676d07306ef3abf67186b5c8b/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/fc8a13ca1fffa4d0765c716c5a0b49f0c107518f/master/gcc-5-glibc-2.23/1c32b4b
NUMA_BASE=/mnt/gvfs/third-party2/numa/17c514c4d102a25ca15f4558be564eeed76f4b6a/2.0.8/gcc-5-glibc-2.23/9bc6787
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb
TBB_BASE=/mnt/gvfs/third-party2/tbb/9d9a554877d0c5bef330fe818ab7178806dd316a/4.0_update2/gcc-4.9-glibc-2.20/e9936bf
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/7c111ff27e0c466235163f00f280a9d617c3d2ec/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/b7fd454c4b10c6a81015d4524ed06cdeab558490/2.26/centos6-native/da39a3e
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d7f4d4d86674a57668e3a96f76f0e17dd0eb8765/3.10.0/gcc-4.9-glibc-2.20/e9936bf
LUA_BASE=/mnt/gvfs/third-party2/lua/61e4abf5813bbc39bc4f548757ccfcadde175a48/5.2.3/gcc-4.9-glibc-2.20/690f0d7
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832

View File

@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
CFLAGS=""
# libgcc
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
# glibc
@ -43,11 +43,15 @@ if test -z $PIC_BUILD; then
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
CFLAGS+=" -DLZ4"
fi
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
if test -z $PIC_BUILD; then
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
CFLAGS+=" -DZSTD"
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
# location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
else
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
fi
CFLAGS+=" -DGFLAGS=google"
CFLAGS+=" -DGFLAGS=gflags"
# location of jemalloc
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
@ -82,7 +86,7 @@ fi
CFLAGS+=" -DTBB"
# use Intel SSE support for checksum calculations
export USE_SSE=1
export USE_SSE=" -msse -msse4.2 "
BINUTILS="$BINUTILS_BASE/bin"
AR="$BINUTILS/ar"
@ -104,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
CXX="$GCC_BASE/bin/g++"
CFLAGS+=" -B$BINUTILS/gold"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
JEMALLOC=1
else
# clang
@ -116,8 +120,8 @@ else
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $CLANG_INCLUDE"
@ -128,13 +132,14 @@ else
fi
CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
# required by libtbb
EXEC_LDFLAGS+=" -ldl"
@ -144,12 +149,4 @@ EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GF
VALGRIND_VER="$VALGRIND_BASE/bin/"
LUA_PATH="$LUA_BASE"
if test -z $PIC_BUILD; then
LUA_LIB=" $LUA_PATH/lib/liblua.a"
else
LUA_LIB=" $LUA_PATH/lib/liblua_pic.a"
fi
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD LUA_PATH LUA_LIB
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD

View File

@ -370,7 +370,8 @@ ColumnFamilyData::ColumnFamilyData(
column_family_set_(column_family_set),
pending_flush_(false),
pending_compaction_(false),
prev_compaction_needed_bytes_(0) {
prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc) {
Ref();
// Convert user defined table properties collector factories to internal ones.
@ -492,6 +493,25 @@ ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
}
uint64_t ColumnFamilyData::OldestLogToKeep() {
auto current_log = GetLogNumber();
if (allow_2pc_) {
auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
if (imm_prep_log > 0 && imm_prep_log < current_log) {
current_log = imm_prep_log;
}
if (mem_prep_log > 0 && mem_prep_log < current_log) {
current_log = mem_prep_log;
}
}
return current_log;
}
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;

View File

@ -239,6 +239,9 @@ class ColumnFamilyData {
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
// calculate the oldest log needed for the durability of this column family
uint64_t OldestLogToKeep();
// See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
@ -404,6 +407,9 @@ class ColumnFamilyData {
bool pending_compaction_;
uint64_t prev_compaction_needed_bytes_;
// if the database was opened with 2pc enabled
bool allow_2pc_;
};
// ColumnFamilySet has interesting thread-safety requirements

View File

@ -253,6 +253,61 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) {
delete db;
}
TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
class FilterWithGet : public CompactionFilter {
public:
virtual bool Filter(int level, const Slice& key, const Slice& value,
std::string* new_value,
bool* value_changed) const override {
if (db_ == nullptr) {
return true;
}
std::string res;
db_->Get(ReadOptions(), "", &res);
return true;
}
void SetDB(DB* db) {
db_ = db;
}
virtual const char* Name() const override { return "FilterWithGet"; }
private:
DB* db_;
};
std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
Options options;
options.create_if_missing = true;
options.compaction_filter = cf.get();
DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
ASSERT_OK(s);
cf->SetDB(db);
// Write one L0 file
db->Put(WriteOptions(), "K1", "V1");
db->Flush(FlushOptions());
// Compact all L0 files using CompactFiles
rocksdb::ColumnFamilyMetaData meta;
db->GetColumnFamilyMetaData(&meta);
for (auto& file : meta.levels[0].files) {
std::string fname = file.db_path + "/" + file.name;
ASSERT_OK(
db->CompactFiles(rocksdb::CompactionOptions(), {fname}, 0));
}
delete db;
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -771,9 +771,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
key, sub_compact->current_output_file_size) &&
sub_compact->builder != nullptr) {
CompactionIterationStats range_del_out_stats;
status =
FinishCompactionOutputFile(input->status(), sub_compact,
range_del_agg.get(), &range_del_out_stats);
status = FinishCompactionOutputFile(input->status(), sub_compact,
range_del_agg.get(),
&range_del_out_stats, &key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
if (!status.ok()) {

View File

@ -9,6 +9,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/sync_point.h"
namespace rocksdb {
@ -47,6 +48,38 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
#endif // ROCKSDB_LITE
}
TEST_F(DBFlushTest, SyncFail) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(Env::Default()));
Options options;
options.disable_auto_compactions = true;
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
{"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
Put("key", "value");
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd();
int refs_before = cfd->current()->TEST_refs();
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(dbfull()->Flush(flush_options));
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
fault_injection_env->SetFilesystemActive(true);
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("", FilesPerLevel()); // flush failed.
// Flush job should release ref count to current version.
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
Destroy(options);
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -342,6 +342,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
last_stats_dump_time_microsec_(0),
next_job_id_(1),
has_unpersisted_data_(false),
unable_to_flush_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE
@ -663,6 +664,10 @@ void DBImpl::MaybeDumpStats() {
}
uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
if (!allow_2pc()) {
return 0;
}
uint64_t min_log = 0;
// we must look through the memtables for two phase transactions
@ -707,6 +712,11 @@ void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
}
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
if (!allow_2pc()) {
return 0;
}
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
uint64_t min_log = 0;
@ -1832,6 +1842,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
}
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld();
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
@ -1868,6 +1879,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
MarkLogsSynced(current_log_number - 1, true, s);
if (!s.ok()) {
bg_error_ = s;
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return s;
}
}
@ -1918,6 +1930,8 @@ Status DBImpl::FlushMemTableToOutputFile(
// is unlocked by the current thread.
if (s.ok()) {
s = flush_job.Run(&file_meta);
} else {
flush_job.Cancel();
}
if (s.ok()) {
@ -2135,7 +2149,7 @@ Status DBImpl::CompactFiles(
immutable_db_options_.info_log.get());
// Perform CompactFiles
SuperVersion* sv = GetAndRefSuperVersion(cfd);
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
{
InstrumentedMutexLock l(&mutex_);
@ -2147,7 +2161,12 @@ Status DBImpl::CompactFiles(
input_file_names, output_level,
output_path_id, &job_context, &log_buffer);
}
ReturnAndCleanupSuperVersion(cfd, sv);
if (sv->Unref()) {
mutex_.Lock();
sv->Cleanup();
mutex_.Unlock();
delete sv;
}
// Find and delete obsolete files
{
@ -2505,7 +2524,7 @@ Status DBImpl::SetDBOptions(
mutable_db_options_ = new_options;
if (total_log_size_ > GetMaxTotalWalSize()) {
FlushColumnFamilies();
MaybeFlushColumnFamilies();
}
persist_options_status = PersistOptions();
@ -2752,7 +2771,7 @@ void DBImpl::MarkLogsSynced(
++it;
}
}
assert(logs_.empty() || logs_[0].number > up_to ||
assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].getting_synced));
log_sync_cv_.SignalAll();
}
@ -4698,9 +4717,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
if (UNLIKELY(!single_column_family_mode_ &&
!alive_log_files_.begin()->getting_flushed &&
total_log_size_ > GetMaxTotalWalSize())) {
FlushColumnFamilies();
MaybeFlushColumnFamilies();
} else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
@ -5018,28 +5036,40 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status;
}
void DBImpl::FlushColumnFamilies() {
void DBImpl::MaybeFlushColumnFamilies() {
mutex_.AssertHeld();
WriteContext context;
if (alive_log_files_.begin()->getting_flushed) {
return;
}
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
alive_log_files_.begin()->getting_flushed = true;
auto oldest_alive_log = alive_log_files_.begin()->number;
auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep();
if (allow_2pc() &&
unable_to_flush_oldest_log_ &&
oldest_log_with_uncommited_prep > 0 &&
oldest_log_with_uncommited_prep <= oldest_alive_log) {
// we already attempted to flush all column families dependent on
// the oldest alive log but the log still contained uncommited transactions.
// the oldest alive log STILL contains uncommited transaction so there
// is still nothing that we can do.
return;
}
WriteContext context;
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
"Flushing all column families with data in WAL number %" PRIu64
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
flush_column_family_if_log_file, total_log_size_, GetMaxTotalWalSize());
oldest_alive_log, total_log_size_, GetMaxTotalWalSize());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
auto status = SwitchMemtable(cfd, &context);
if (!status.ok()) {
break;
@ -5049,6 +5079,26 @@ void DBImpl::FlushColumnFamilies() {
}
}
MaybeScheduleFlushOrCompaction();
// we only mark this log as getting flushed if we have successfully
// flushed all data in this log. If this log contains outstanding prepred
// transactions then we cannot flush this log until those transactions are commited.
unable_to_flush_oldest_log_ = false;
if (allow_2pc()) {
if (oldest_log_with_uncommited_prep == 0 ||
oldest_log_with_uncommited_prep > oldest_alive_log) {
// this log contains no outstanding prepared transactions
alive_log_files_.begin()->getting_flushed = true;
} else {
Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log,
"Unable to release oldest log due to uncommited transaction");
unable_to_flush_oldest_log_ = true;
}
} else {
alive_log_files_.begin()->getting_flushed = true;
}
}
uint64_t DBImpl::GetMaxTotalWalSize() const {

View File

@ -308,6 +308,16 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family = nullptr,
bool disallow_trivial_move = false);
void TEST_MaybeFlushColumnFamilies();
bool TEST_UnableToFlushOldestLog() {
return unable_to_flush_oldest_log_;
}
bool TEST_IsLogGettingFlushed() {
return alive_log_files_.begin()->getting_flushed;
}
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable(bool wait = true,
ColumnFamilyHandle* cfh = nullptr);
@ -734,7 +744,7 @@ class DBImpl : public DB {
// REQUIRES: mutex locked
Status PersistOptions();
void FlushColumnFamilies();
void MaybeFlushColumnFamilies();
uint64_t GetMaxTotalWalSize() const;
@ -994,6 +1004,15 @@ class DBImpl : public DB {
// Used when disableWAL is true.
bool has_unpersisted_data_;
// if an attempt was made to flush all column families that
// the oldest log depends on but uncommited data in the oldest
// log prevents the log from being released.
// We must attempt to free the dependent memtables again
// at a later time after the transaction in the oldest
// log is fully commited.
bool unable_to_flush_oldest_log_;
static const int KEEP_LOG_FILE_NUM = 1000;
// MSVC version 1800 still does not have constexpr for ::max()
static const uint64_t kNoTimeOut = port::kMaxUint64;

View File

@ -19,6 +19,11 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
}
void DBImpl::TEST_MaybeFlushColumnFamilies() {
InstrumentedMutexLock l(&mutex_);
MaybeFlushColumnFamilies();
}
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd;

View File

@ -341,6 +341,7 @@ void DBIter::Next() {
//
// NOTE: In between, saved_key_ can point to a user key that has
// a delete marker or a sequence number higher than sequence_
// saved_key_ MUST have a proper user_key before calling this function
//
// The prefix_check parameter controls whether we check the iterated
// keys against the prefix of the seeked key. Set to false when
@ -946,7 +947,6 @@ void DBIter::Seek(const Slice& target) {
StopWatch sw(env_, statistics_, DB_SEEK);
ReleaseTempPinnedData();
saved_key_.Clear();
// now saved_key is used to store internal key.
saved_key_.SetInternalKey(target, sequence_);
{
@ -961,6 +961,9 @@ void DBIter::Seek(const Slice& target) {
}
direction_ = kForward;
ClearSavedValue();
// convert the InternalKey to UserKey in saved_key_ before
// passed to FindNextUserEntry
saved_key_.Reserve(saved_key_.Size() - 8);
FindNextUserEntry(false /* not skipping */, prefix_same_as_start_);
if (!valid_) {
prefix_start_key_.clear();
@ -1039,6 +1042,8 @@ void DBIter::SeekToFirst() {
RecordTick(statistics_, NUMBER_DB_SEEK);
if (iter_->Valid()) {
saved_key_.SetKey(ExtractUserKey(iter_->key()),
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
if (statistics_ != nullptr) {
if (valid_) {

View File

@ -1897,6 +1897,66 @@ TEST_F(DBIteratorTest, DBIterator12) {
ASSERT_FALSE(db_iter->Valid());
}
TEST_F(DBIteratorTest, DBIterator13) {
Options options;
options.merge_operator = nullptr;
std::string key;
key.resize(9);
key.assign(9, static_cast<char>(0));
key[0] = 'b';
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut(key, "0");
internal_iter->AddPut(key, "1");
internal_iter->AddPut(key, "2");
internal_iter->AddPut(key, "3");
internal_iter->AddPut(key, "4");
internal_iter->AddPut(key, "5");
internal_iter->AddPut(key, "6");
internal_iter->AddPut(key, "7");
internal_iter->AddPut(key, "8");
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(
NewDBIterator(env_, ImmutableCFOptions(options), BytewiseComparator(),
internal_iter, 2, 3, 0));
db_iter->Seek("b");
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), key);
ASSERT_EQ(db_iter->value().ToString(), "2");
}
TEST_F(DBIteratorTest, DBIterator14) {
Options options;
options.merge_operator = nullptr;
std::string key("b");
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("b", "0");
internal_iter->AddPut("b", "1");
internal_iter->AddPut("b", "2");
internal_iter->AddPut("b", "3");
internal_iter->AddPut("a", "4");
internal_iter->AddPut("a", "5");
internal_iter->AddPut("a", "6");
internal_iter->AddPut("c", "7");
internal_iter->AddPut("c", "8");
internal_iter->AddPut("c", "9");
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(
NewDBIterator(env_, ImmutableCFOptions(options), BytewiseComparator(),
internal_iter, 4, 1, 0));
db_iter->Seek("b");
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "3");
db_iter->SeekToFirst();
ASSERT_EQ(db_iter->key().ToString(), "a");
ASSERT_EQ(db_iter->value().ToString(), "4");
}
class DBIterWithMergeIterTest : public testing::Test {
public:
DBIterWithMergeIterTest()

View File

@ -112,6 +112,81 @@ TEST_F(DBRangeDelTest, CompactionOutputFilesExactlyFilled) {
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
// Ensures range deletion spanning multiple compaction output files that are
// cut by max_compaction_bytes will have non-overlapping key-ranges.
// https://github.com/facebook/rocksdb/issues/1778
const int kNumFiles = 2, kNumPerFile = 1 << 8, kBytesPerVal = 1 << 12;
Options opts = CurrentOptions();
opts.comparator = test::Uint64Comparator();
opts.disable_auto_compactions = true;
opts.level0_file_num_compaction_trigger = kNumFiles;
opts.max_compaction_bytes = kNumPerFile * kBytesPerVal;
opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
// Want max_compaction_bytes to trigger the end of compaction output file, not
// target_file_size_base, so make the latter much bigger
opts.target_file_size_base = 100 * opts.max_compaction_bytes;
Reopen(opts);
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
// It spans the whole key-range, thus will be included in all output files
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(0),
GetNumericStr(kNumFiles * kNumPerFile - 1)));
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
std::vector<std::string> values;
// Write 1MB (256 values, each 4K)
for (int j = 0; j < kNumPerFile; j++) {
values.push_back(RandomString(&rnd, kBytesPerVal));
ASSERT_OK(Put(GetNumericStr(kNumPerFile * i + j), values[j]));
}
// extra entry to trigger SpecialSkipListFactory's flush
ASSERT_OK(Put(GetNumericStr(kNumPerFile), ""));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GE(NumTableFilesAtLevel(1), 2);
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
for (size_t i = 0; i < files[1].size() - 1; ++i) {
ASSERT_TRUE(InternalKeyComparator(opts.comparator)
.Compare(files[1][i].largest, files[1][i + 1].smallest) <
0);
}
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, SentinelsOmittedFromOutputFile) {
// Regression test for bug where sentinel range deletions (i.e., ones with
// sequence number of zero) were included in output files.
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
// gaps between ranges creates sentinels in our internal representation
std::vector<std::pair<std::string, std::string>> range_dels = {{"a", "b"}, {"c", "d"}, {"e", "f"}};
for (const auto& range_del : range_dels) {
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
range_del.first, range_del.second));
}
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
ASSERT_GT(files[0][0].smallest_seqno, 0);
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, FlushRangeDelsSameStartKey) {
db_->Put(WriteOptions(), "b1", "val");
ASSERT_OK(
@ -475,6 +550,36 @@ TEST_F(DBRangeDelTest, GetCoveredKeyFromSst) {
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, GetCoveredMergeOperandFromMemtable) {
const int kNumMergeOps = 10;
Options opts = CurrentOptions();
opts.merge_operator = MergeOperators::CreateUInt64AddOperator();
Reopen(opts);
for (int i = 0; i < kNumMergeOps; ++i) {
std::string val;
PutFixed64(&val, i);
db_->Merge(WriteOptions(), "key", val);
if (i == kNumMergeOps / 2) {
// deletes [0, 5]
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "key",
"key_");
}
}
ReadOptions read_opts;
std::string expected, actual;
ASSERT_OK(db_->Get(read_opts, "key", &actual));
PutFixed64(&expected, 30); // 6+7+8+9
ASSERT_EQ(expected, actual);
expected.clear();
read_opts.ignore_range_deletions = true;
ASSERT_OK(db_->Get(read_opts, "key", &actual));
PutFixed64(&expected, 45); // 0+1+2+...+9
ASSERT_EQ(expected, actual);
}
TEST_F(DBRangeDelTest, GetIgnoresRangeDeletions) {
Options opts = CurrentOptions();
opts.max_write_buffer_number = 4;

View File

@ -2216,8 +2216,40 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, OptimizeForPointLookup) {
Options options = CurrentOptions();
Close();
options.OptimizeForPointLookup(2);
ASSERT_OK(DB::Open(options, dbname_, &db_));
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
Flush();
ASSERT_EQ("v1", Get("foo"));
}
#endif // ROCKSDB_LITE
TEST_F(DBTest2, DirectIO) {
if (!IsDirectIOSupported()) {
return;
}
Options options = CurrentOptions();
options.use_direct_reads = options.use_direct_writes = true;
options.allow_mmap_reads = options.allow_mmap_writes = false;
DestroyAndReopen(options);
ASSERT_OK(Put(Key(0), "a"));
ASSERT_OK(Put(Key(5), "a"));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(10), "a"));
ASSERT_OK(Put(Key(15), "a"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
Reopen(options);
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -231,6 +231,20 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data);
}
}
Status PositionedAppend(const Slice& data, uint64_t offset) override {
if (env_->table_write_callback_) {
(*env_->table_write_callback_)();
}
if (env_->drop_writes_.load(std::memory_order_acquire)) {
// Drop writes on the floor
return Status::OK();
} else if (env_->no_space_.load(std::memory_order_acquire)) {
return Status::NoSpace("No space left on device");
} else {
env_->bytes_written_ += data.size();
return base_->PositionedAppend(data, offset);
}
}
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
Status Close() override {
// SyncPoint is not supported in Released Windows Mode.
@ -257,6 +271,9 @@ class SpecialEnv : public EnvWrapper {
Env::IOPriority GetIOPriority() override {
return base_->GetIOPriority();
}
bool use_direct_io() const override {
return base_->use_direct_io();
}
};
class ManifestFile : public WritableFile {
public:
@ -358,7 +375,14 @@ class SpecialEnv : public EnvWrapper {
return Status::IOError("simulated write error");
}
Status s = target()->NewWritableFile(f, r, soptions);
EnvOptions optimized = soptions;
if (strstr(f.c_str(), "MANIFEST") != nullptr ||
strstr(f.c_str(), "log") != nullptr) {
optimized.use_mmap_writes = false;
optimized.use_direct_writes = false;
}
Status s = target()->NewWritableFile(f, r, optimized);
if (s.ok()) {
if (strstr(f.c_str(), ".sst") != nullptr) {
r->reset(new SSTableFile(this, std::move(*r)));

View File

@ -230,6 +230,12 @@ Status FlushJob::Run(FileMetaData* file_meta) {
return s;
}
void FlushJob::Cancel() {
db_mutex_->AssertHeld();
assert(base_ != nullptr);
base_->Unref();
}
Status FlushJob::WriteLevel0Table() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0);

View File

@ -67,9 +67,11 @@ class FlushJob {
~FlushJob();
// Require db_mutex held
// Require db_mutex held.
// Once PickMemTable() is called, either Run() or Cancel() has to be call.
void PickMemTable();
Status Run(FileMetaData* file_meta = nullptr);
void Cancel();
TableProperties GetTableProperties() const { return table_properties_; }
private:

View File

@ -865,7 +865,7 @@ void InternalStats::DumpCFMapStats(std::map<std::string, double>* cf_stats) {
}
}
int InternalStats::DumpCFMapStats(
void InternalStats::DumpCFMapStats(
std::map<int, std::map<LevelStatType, double>>* levels_stats,
CompactionStats* compaction_stats_sum) {
const VersionStorageInfo* vstorage = cfd_->current()->storage_info();
@ -925,7 +925,6 @@ int InternalStats::DumpCFMapStats(
PrepareLevelStats(&sum_stats, total_files, total_files_being_compacted,
total_file_size, 0, w_amp, *compaction_stats_sum);
(*levels_stats)[-1] = sum_stats; // -1 is for the Sum level
return num_levels_to_check;
}
void InternalStats::DumpCFStats(std::string* value) {
@ -937,8 +936,8 @@ void InternalStats::DumpCFStats(std::string* value) {
// Print stats for each level
std::map<int, std::map<LevelStatType, double>> levels_stats;
CompactionStats compaction_stats_sum(0);
int levels = DumpCFMapStats(&levels_stats, &compaction_stats_sum);
for (int l = 0; l < levels; ++l) {
DumpCFMapStats(&levels_stats, &compaction_stats_sum);
for (int l = 0; l < number_levels_; ++l) {
if (levels_stats.find(l) != levels_stats.end()) {
PrintLevelStats(buf, sizeof(buf), "L" + ToString(l), levels_stats[l]);
value->append(buf);

View File

@ -273,7 +273,7 @@ class InternalStats {
private:
void DumpDBStats(std::string* value);
void DumpCFMapStats(std::map<std::string, double>* cf_stats);
int DumpCFMapStats(
void DumpCFMapStats(
std::map<int, std::map<LevelStatType, double>>* level_stats,
CompactionStats* compaction_stats_sum);
void DumpCFStats(std::string* value);

View File

@ -560,7 +560,7 @@ static bool SaveValue(void* arg, const char* entry) {
ValueType type;
UnPackSequenceAndType(tag, &s->seq, &type);
if ((type == kTypeValue || type == kTypeDeletion) &&
if ((type == kTypeValue || type == kTypeMerge) &&
range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) {
type = kTypeRangeDeletion;
}

View File

@ -221,6 +221,8 @@ class MemTableList {
// PickMemtablesToFlush() is called.
void FlushRequested() { flush_requested_ = true; }
bool HasFlushRequested() { return flush_requested_; }
// Copying allowed
// MemTableList(const MemTableList&);
// void operator=(const MemTableList&);

View File

@ -420,9 +420,10 @@ void RangeDelAggregator::AddToBuilder(
RangeTombstone tombstone;
if (collapse_deletions_) {
auto next_tombstone_map_iter = std::next(tombstone_map_iter);
if (next_tombstone_map_iter == stripe_map_iter->second.raw_map.end()) {
// it's the sentinel tombstone
break;
if (next_tombstone_map_iter == stripe_map_iter->second.raw_map.end() ||
tombstone_map_iter->second.seq_ == 0) {
// it's a sentinel tombstone
continue;
}
tombstone.start_key_ = tombstone_map_iter->first;
tombstone.end_key_ = next_tombstone_map_iter->first;

View File

@ -520,6 +520,8 @@ class Version {
return next_;
}
int TEST_refs() const { return refs_; }
VersionStorageInfo* storage_info() { return &storage_info_; }
VersionSet* version_set() { return vset_; }

View File

@ -317,7 +317,7 @@ void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
while (w != pg->last_writer) {
// Writers that won't write don't get sequence allotment
if (!w->CallbackFailed()) {
if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) {
sequence += WriteBatchInternal::Count(w->batch);
}
w = w->link_newer;

View File

@ -5,8 +5,8 @@
#pragma once
#define ROCKSDB_MAJOR 5
#define ROCKSDB_MINOR 0
#define ROCKSDB_PATCH 0
#define ROCKSDB_MINOR 1
#define ROCKSDB_PATCH 5
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -149,7 +149,7 @@ Status TestWritableFile::Flush() {
Status TestWritableFile::Sync() {
if (!env_->IsFilesystemActive()) {
return Status::OK();
return Status::IOError("FaultInjectionTestEnv: not active");
}
// No need to actual sync.
state_.pos_at_last_sync_ = state_.pos_;

View File

@ -468,13 +468,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
readahead_size_(readahead_size),
alignment_(file_->GetRequiredBufferAlignment()),
readahead_size_(Roundup(readahead_size, alignment_)),
forward_calls_(file_->ShouldForwardRawRequest()),
buffer_(),
buffer_offset_(0),
buffer_len_(0) {
if (!forward_calls_) {
buffer_.reset(new char[readahead_size_]);
buffer_.Alignment(alignment_);
buffer_.AllocateNewBuffer(readahead_size_ + alignment_);
} else if (readahead_size_ > 0) {
file_->EnableReadAhead();
}
@ -500,31 +502,45 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
std::unique_lock<std::mutex> lk(lock_);
size_t copied = 0;
// if offset between [buffer_offset_, buffer_offset_ + buffer_len>
if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
uint64_t offset_in_buffer = offset - buffer_offset_;
copied = std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
if (copied == n) {
// fully cached
*result = Slice(scratch, n);
size_t cached_len = 0;
// Check if there is a cache hit, means that [offset, offset + n) is either
// complitely or partially in the buffer
// If it's completely cached, including end of file case when offset + n is
// greater than EOF, return
if (TryReadFromCache_(offset, n, &cached_len, scratch) &&
(cached_len == n ||
// End of file
buffer_len_ < readahead_size_ + alignment_)) {
*result = Slice(scratch, cached_len);
return Status::OK();
}
}
size_t advanced_offset = offset + cached_len;
// In the case of cache hit advanced_offset is already aligned, means that
// chunk_offset equals to advanced_offset
size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
Slice readahead_result;
Status s = file_->Read(offset + copied, readahead_size_, &readahead_result,
buffer_.get());
Status s = file_->Read(chunk_offset, readahead_size_ + alignment_,
&readahead_result, buffer_.BufferStart());
if (!s.ok()) {
return s;
}
// In the case of cache miss, i.e. when cached_len equals 0, an offset can
// exceed the file end position, so the following check is required
if (advanced_offset < chunk_offset + readahead_result.size()) {
// In the case of cache miss, the first chunk_padding bytes in buffer_ are
// stored for alignment only and must be skipped
size_t chunk_padding = advanced_offset - chunk_offset;
auto remaining_len =
std::min(readahead_result.size() - chunk_padding, n - cached_len);
memcpy(scratch + cached_len, readahead_result.data() + chunk_padding,
remaining_len);
*result = Slice(scratch, cached_len + remaining_len);
} else {
*result = Slice(scratch, cached_len);
}
auto left_to_copy = std::min(readahead_result.size(), n - copied);
memcpy(scratch + copied, readahead_result.data(), left_to_copy);
*result = Slice(scratch, copied + left_to_copy);
if (readahead_result.data() == buffer_.get()) {
buffer_offset_ = offset + copied;
if (readahead_result.data() == buffer_.BufferStart()) {
buffer_offset_ = chunk_offset;
buffer_len_ = readahead_result.size();
} else {
buffer_len_ = 0;
@ -543,13 +559,31 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
return file_->InvalidateCache(offset, length);
}
virtual bool use_direct_io() const override {
return file_->use_direct_io();
}
private:
bool TryReadFromCache_(uint64_t offset, size_t n, size_t* cached_len,
char* scratch) const {
if (offset < buffer_offset_ || offset >= buffer_offset_ + buffer_len_) {
*cached_len = 0;
return false;
}
uint64_t offset_in_buffer = offset - buffer_offset_;
*cached_len =
std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
return true;
}
std::unique_ptr<RandomAccessFile> file_;
const size_t alignment_;
size_t readahead_size_;
const bool forward_calls_;
mutable std::mutex lock_;
mutable std::unique_ptr<char[]> buffer_;
mutable AlignedBuffer buffer_;
mutable uint64_t buffer_offset_;
mutable size_t buffer_len_;
};

View File

@ -3,10 +3,12 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include <vector>
#include "util/file_reader_writer.h"
#include <algorithm>
#include <vector>
#include "util/random.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
@ -125,6 +127,108 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) {
ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
}
class ReadaheadRandomAccessFileTest
: public testing::Test,
public testing::WithParamInterface<size_t> {
public:
static std::vector<size_t> GetReadaheadSizeList() {
return {1lu << 12, 1lu << 16};
}
virtual void SetUp() override {
readahead_size_ = GetParam();
scratch_.reset(new char[2 * readahead_size_]);
ResetSourceStr();
}
ReadaheadRandomAccessFileTest() : control_contents_() {}
std::string Read(uint64_t offset, size_t n) {
Slice result;
test_read_holder_->Read(offset, n, &result, scratch_.get());
return std::string(result.data(), result.size());
}
void ResetSourceStr(const std::string& str = "") {
auto write_holder = std::unique_ptr<WritableFileWriter>(
test::GetWritableFileWriter(new test::StringSink(&control_contents_)));
write_holder->Append(Slice(str));
write_holder->Flush();
auto read_holder = std::unique_ptr<RandomAccessFile>(
new test::StringSource(control_contents_));
test_read_holder_ =
NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
}
size_t GetReadaheadSize() const { return readahead_size_; }
private:
size_t readahead_size_;
Slice control_contents_;
std::unique_ptr<RandomAccessFile> test_read_holder_;
std::unique_ptr<char[]> scratch_;
};
TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStrTest) {
ASSERT_EQ("", Read(0, 1));
ASSERT_EQ("", Read(0, 0));
ASSERT_EQ("", Read(13, 13));
}
TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSizeTest) {
std::string str = "abcdefghijklmnopqrs";
ResetSourceStr(str);
ASSERT_EQ(str.substr(3, 4), Read(3, 4));
ASSERT_EQ(str.substr(0, 3), Read(0, 3));
ASSERT_EQ(str, Read(0, str.size()));
ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
Read(7, 30));
ASSERT_EQ("", Read(100, 100));
}
TEST_P(ReadaheadRandomAccessFileTest,
SourceStrLenCanBeGreaterThanReadaheadSizeTest) {
Random rng(42);
for (int k = 0; k < 100; ++k) {
size_t strLen = k * GetReadaheadSize() +
rng.Uniform(static_cast<int>(GetReadaheadSize()));
std::string str =
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
ResetSourceStr(str);
for (int test = 1; test <= 100; ++test) {
size_t offset = rng.Uniform(static_cast<int>(strLen));
size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
Read(offset, n));
}
}
}
TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) {
Random rng(7);
size_t strLen = 4 * GetReadaheadSize() +
rng.Uniform(static_cast<int>(GetReadaheadSize()));
std::string str =
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
ResetSourceStr(str);
for (int test = 1; test <= 100; ++test) {
size_t offset = rng.Uniform(static_cast<int>(strLen));
size_t n =
GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
Read(offset, n));
}
}
INSTANTIATE_TEST_CASE_P(
EmptySourceStrTest, ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
SourceStrLenLessThanReadaheadSizeTest, ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
SourceStrLenCanBeGreaterThanReadaheadSizeTest,
ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
NExceedReadaheadTest, ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -265,11 +265,6 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
// An error: return a non-ok status
s = IOError(filename_, errno);
}
if (!use_direct_io()) {
// we need to fadvise away the entire range of pages because
// we do not want readahead pages to be cached.
Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
}
*result = Slice(scratch, (r < 0) ? 0 : n - left);
return s;
}
@ -661,6 +656,17 @@ Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
return Status::OK();
}
Status PosixWritableFile::Truncate(uint64_t size) {
Status s;
int r = ftruncate(fd_, size);
if (r < 0) {
s = IOError(filename_, errno);
} else {
filesize_ = size;
}
return s;
}
Status PosixWritableFile::Close() {
Status s;
@ -760,7 +766,9 @@ Status PosixWritableFile::Allocate(uint64_t offset, uint64_t len) {
return IOError(filename_, errno);
}
}
#endif
#ifdef OS_LINUX
Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) {
assert(offset <= std::numeric_limits<off_t>::max());
assert(nbytes <= std::numeric_limits<off_t>::max());

View File

@ -93,9 +93,9 @@ class PosixWritableFile : public WritableFile {
const EnvOptions& options);
virtual ~PosixWritableFile();
// Means Close() will properly take care of truncate
// and it does not need any additional information
virtual Status Truncate(uint64_t size) override { return Status::OK(); }
// Need to implement this so the file is truncated correctly
// with direct I/O
virtual Status Truncate(uint64_t size) override;
virtual Status Close() override;
virtual Status Append(const Slice& data) override;
virtual Status PositionedAppend(const Slice& data, uint64_t offset) override;
@ -113,6 +113,8 @@ class PosixWritableFile : public WritableFile {
virtual Status InvalidateCache(size_t offset, size_t length) override;
#ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(uint64_t offset, uint64_t len) override;
#endif
#ifdef OS_LINUX
virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
#endif

View File

@ -700,7 +700,7 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeForPointLookup(
block_based_options.block_cache =
NewLRUCache(static_cast<size_t>(block_cache_size_mb * 1024 * 1024));
table_factory.reset(new BlockBasedTableFactory(block_based_options));
memtable_factory.reset(NewHashLinkListRepFactory());
memtable_prefix_bloom_size_ratio = 0.02;
return this;
}

View File

@ -536,6 +536,10 @@ bool ParseOptionHelper(char* opt_address, const OptionType& opt_type,
return ParseEnum<CompactionStyle>(
compaction_style_string_map, value,
reinterpret_cast<CompactionStyle*>(opt_address));
case OptionType::kCompactionPri:
return ParseEnum<CompactionPri>(
compaction_pri_string_map, value,
reinterpret_cast<CompactionPri*>(opt_address));
case OptionType::kCompressionType:
return ParseEnum<CompressionType>(
compression_type_string_map, value,
@ -617,6 +621,10 @@ bool SerializeSingleOptionHelper(const char* opt_address,
return SerializeEnum<CompactionStyle>(
compaction_style_string_map,
*(reinterpret_cast<const CompactionStyle*>(opt_address)), value);
case OptionType::kCompactionPri:
return SerializeEnum<CompactionPri>(
compaction_pri_string_map,
*(reinterpret_cast<const CompactionPri*>(opt_address)), value);
case OptionType::kCompressionType:
return SerializeEnum<CompressionType>(
compression_type_string_map,

View File

@ -93,6 +93,7 @@ enum class OptionType {
kString,
kDouble,
kCompactionStyle,
kCompactionPri,
kSliceTransform,
kCompressionType,
kVectorCompressionType,
@ -560,8 +561,10 @@ static std::unordered_map<std::string, OptionTypeInfo> cf_options_type_info = {
OptionType::kMergeOperator, OptionVerificationType::kByName, false, 0}},
{"compaction_style",
{offsetof(struct ColumnFamilyOptions, compaction_style),
OptionType::kCompactionStyle, OptionVerificationType::kNormal, false,
0}}};
OptionType::kCompactionStyle, OptionVerificationType::kNormal, false, 0}},
{"compaction_pri",
{offsetof(struct ColumnFamilyOptions, compaction_pri),
OptionType::kCompactionPri, OptionVerificationType::kNormal, false, 0}}};
static std::unordered_map<std::string, OptionTypeInfo>
block_based_table_type_info = {
@ -688,6 +691,13 @@ static std::unordered_map<std::string, CompactionStyle>
{"kCompactionStyleFIFO", kCompactionStyleFIFO},
{"kCompactionStyleNone", kCompactionStyleNone}};
static std::unordered_map<std::string, CompactionPri>
compaction_pri_string_map = {
{"kByCompensatedSize", kByCompensatedSize},
{"kOldestLargestSeqFirst", kOldestLargestSeqFirst},
{"kOldestSmallestSeqFirst", kOldestSmallestSeqFirst},
{"kMinOverlappingRatio", kMinOverlappingRatio}};
static std::unordered_map<std::string,
WALRecoveryMode> wal_recovery_mode_string_map = {
{"kTolerateCorruptedTailRecords",

View File

@ -542,6 +542,9 @@ bool AreEqualOptions(
case OptionType::kCompactionStyle:
return (*reinterpret_cast<const CompactionStyle*>(offset1) ==
*reinterpret_cast<const CompactionStyle*>(offset2));
case OptionType::kCompactionPri:
return (*reinterpret_cast<const CompactionPri*>(offset1) ==
*reinterpret_cast<const CompactionPri*>(offset2));
case OptionType::kCompressionType:
return (*reinterpret_cast<const CompressionType*>(offset1) ==
*reinterpret_cast<const CompressionType*>(offset2));

View File

@ -372,7 +372,6 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
// Following options are not settable through
// GetColumnFamilyOptionsFromString():
options->rate_limit_delay_max_milliseconds = 33;
options->compaction_pri = CompactionPri::kOldestSmallestSeqFirst;
options->compaction_options_universal = CompactionOptionsUniversal();
options->compression_opts = CompressionOptions();
options->hard_rate_limit = 0;
@ -432,6 +431,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"level_compaction_dynamic_level_bytes=false;"
"inplace_update_support=false;"
"compaction_style=kCompactionStyleFIFO;"
"compaction_pri=kMinOverlappingRatio;"
"purge_redundant_kvs_while_flush=true;"
"hard_pending_compaction_bytes_limit=0;"
"disable_auto_compactions=false;"

View File

@ -77,6 +77,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"arena_block_size", "22"},
{"disable_auto_compactions", "true"},
{"compaction_style", "kCompactionStyleLevel"},
{"compaction_pri", "kOldestSmallestSeqFirst"},
{"verify_checksums_in_compaction", "false"},
{"compaction_options_fifo", "23"},
{"max_sequential_skip_in_iterations", "24"},
@ -176,6 +177,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.disable_auto_compactions, true);
ASSERT_EQ(new_cf_opt.compaction_style, kCompactionStyleLevel);
ASSERT_EQ(new_cf_opt.verify_checksums_in_compaction, false);
ASSERT_EQ(new_cf_opt.compaction_pri, kOldestSmallestSeqFirst);
ASSERT_EQ(new_cf_opt.compaction_options_fifo.max_table_files_size,
static_cast<uint64_t>(23));
ASSERT_EQ(new_cf_opt.max_sequential_skip_in_iterations,

View File

@ -1355,6 +1355,108 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
delete cfb;
}
TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
Status s;
ColumnFamilyHandle *cfa, *cfb;
ColumnFamilyOptions cf_options;
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
ASSERT_OK(s);
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
ASSERT_OK(s);
WriteOptions wopts;
wopts.disableWAL = false;
wopts.sync = true;
auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
TransactionOptions topts1;
Transaction* txn1 = db->BeginTransaction(wopts, topts1);
s = txn1->SetName("xid1");
ASSERT_OK(s);
s = txn1->Put(cfa, "boys", "girls1");
ASSERT_OK(s);
Transaction* txn2 = db->BeginTransaction(wopts, topts1);
s = txn2->SetName("xid2");
ASSERT_OK(s);
s = txn2->Put(cfb, "up", "down1");
ASSERT_OK(s);
// prepre transaction in LOG A
s = txn1->Prepare();
ASSERT_OK(s);
// prepre transaction in LOG A
s = txn2->Prepare();
ASSERT_OK(s);
// regular put so that mem table can actually be flushed for log rolling
s = db->Put(wopts, "cats", "dogs1");
ASSERT_OK(s);
auto prepare_log_no = txn1->GetLogNumber();
// roll to LOG B
s = db_impl->TEST_FlushMemTable(true);
ASSERT_OK(s);
// now we pause background work so that
// imm()s are not flushed before we can check their status
s = db_impl->PauseBackgroundWork();
ASSERT_OK(s);
ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
prepare_log_no);
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
// commit in LOG B
s = txn1->Commit();
ASSERT_OK(s);
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no);
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
// request a flush for all column families such that the earliest
// alive log file can be killed
db_impl->TEST_MaybeFlushColumnFamilies();
// log cannot be flushed because txn2 has not been commited
ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog());
// assert that cfa has a flush requested
ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
// cfb should not be flushed becuse it has no data from LOG A
ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
// cfb now has data from LOG A
s = txn2->Commit();
ASSERT_OK(s);
db_impl->TEST_MaybeFlushColumnFamilies();
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
// we should see that cfb now has a flush requested
ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
// all data in LOG A resides in a memtable that has been
// requested for a flush
ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
delete txn1;
delete txn2;
delete cfa;
delete cfb;
}
/*
* 1) use prepare to keep first log around to determine starting sequence
* during recovery.