Compare commits

...

30 Commits
main ... 5.0.fb

Author SHA1 Message Date
sdong
fe55e3e18f remove warning as error. 2019-11-05 11:06:51 -08:00
sdong
3d267c9941 Add one more #include<functional> 2019-11-05 11:06:00 -08:00
sdong
315d3afcde Add some include<functional> 2019-10-31 14:17:26 -07:00
sdong
dae099db27 [FB Internal] Point to the latest tool chain. 2019-10-31 14:12:48 -07:00
sdong
eb22eb7dfe Revert "[FB Internal] Remove code for FB-only toolchain (since GCC 4.8 is everywhere already)"
This reverts commit 61720850f6.
2019-10-31 13:58:06 -07:00
sdong
b26658ca6b [FB Internal] Remove code for FB-only toolchain (since GCC 4.8 is everywhere already) 2019-10-31 13:54:16 -07:00
sdong
57f8dd3dc1 [fb only] revert unintended change of USE_SSE
The previuos change that use gcc-5 set USE_SSE to wrong flag by mistake. Fix it.
2017-07-17 22:24:13 -07:00
sdong
61c915ac50 [FB Only] use gcc-5 2017-07-17 22:03:22 -07:00
sdong
63cbcd25d1 Add two missing entries in the default option change log file. 2017-01-31 11:41:52 -08:00
Siying Dong
b2892047fd Always fsync the file after file copying
Summary:
File copying happens when creating checkpoints and bulkloading files from different FS partition. We should fsync the files when copying them to guarantee durability. A side effect will be that the dirty pages in file system buffers won't grow too large.
Closes https://github.com/facebook/rocksdb/pull/1728

Differential Revision: D4371083

Pulled By: siying

fbshipit-source-id: 579e14c
2017-01-20 12:24:08 -08:00
Siying Dong
beb5daeeac Fix bug of Checkpoint loses recent transactions with 2PC
Summary:
If 2PC is enabled, checkpoint may not copy previous log files that contain uncommitted prepare records. In this diff we keep those files.
Closes https://github.com/facebook/rocksdb/pull/1724

Differential Revision: D4368319

Pulled By: siying

fbshipit-source-id: cc2c746
2017-01-20 12:20:26 -08:00
Islam AbdelRahman
a00f9bc498 Update HISTORY.md to mention 2PC WAL size fix 2017-01-20 12:18:59 -08:00
Reid Horuff
7d8218912d Fix for 2PC causing WAL to grow too large
Summary:
Consider the following single column family scenario:
prepare in log A
commit in log B
*WAL is too large, flush all CFs to releast log A*
*CFA is on log B so we do not see CFA is depending on log A so no flush is requested*

To fix this we must also consider the log containing the prepare section when determining what log a CF is dependent on.
Closes https://github.com/facebook/rocksdb/pull/1768

Differential Revision: D4403265

Pulled By: reidHoruff

fbshipit-source-id: ce800ff
2017-01-20 12:16:32 -08:00
sdong
a5163cfa60 Update HISTORY.md for recent two hot fixes. 2017-01-20 12:11:16 -08:00
Siying Dong
6bca22522b Fix OptimizeForPointLookup()
Summary:
If users directly call OptimizeForPointLookup(), it is broken as the option isn't compatible with parallel memtable insert. Fix it by using memtable bloomo filter instead.
Closes https://github.com/facebook/rocksdb/pull/1791

Differential Revision: D4442836

Pulled By: siying

fbshipit-source-id: bf6c9cd
2017-01-20 12:09:56 -08:00
Siying Dong
2c4981cada Fix 2PC with concurrent memtable insert
Summary:
If concurrent memtable insert is enabled, and one prepare command and a normal command are grouped into a commit group, the sequence ID will be calculated incorrectly.
Closes https://github.com/facebook/rocksdb/pull/1730

Differential Revision: D4371081

Pulled By: siying

fbshipit-source-id: cd40c6d
2017-01-20 12:09:56 -08:00
Andrew Kryczka
9891486ddd Bump version to 5.0.2 2017-01-18 13:16:33 -08:00
Andrew Kryczka
849efe3863 Fix DeleteRange file boundary correctness issue with max_compaction_bytes
Summary:
Cockroachdb exposed this bug in #1778. The bug happens when a compaction's output files are ended due to exceeding max_compaction_bytes. In that case we weren't taking into account the next file's start key when deciding how far to extend the current file's max_key. This caused the non-overlapping key-range invariant to be violated.

Note this was correctly handled for the usual case of cutting compaction output, which is file size exceeding max_output_file_size. I am not sure why these are two separate code paths, but we can consider refactoring it to prevent such errors in the future.
Closes https://github.com/facebook/rocksdb/pull/1784

Differential Revision: D4430235

Pulled By: ajkr

fbshipit-source-id: 80af748
2017-01-18 13:14:19 -08:00
Islam AbdelRahman
e14eaa31fd Bump version to 5.0.1 2016-12-16 17:49:01 -08:00
Andrew Kryczka
8e5a257bfe Reduce compaction iterator status checks
Summary:
seems it's expensive to check status since the underlying merge iterator checks status of all its children. so only do it when it's really necessary to get the status before invoking Next(), i.e., when we're advancing to get the first key in the next file.
Closes https://github.com/facebook/rocksdb/pull/1691

Differential Revision: D4343446

Pulled By: siying

fbshipit-source-id: 70ab315
2016-12-16 17:48:27 -08:00
Yi Wu
a0cdf54bd3 Iterator should be in corrupted status if merge operator return false
Summary:
Iterator should be in corrupted status if merge operator return false.
Also add test to make sure if max_successive_merges is hit during write,
data will not be lost.
Closes https://github.com/facebook/rocksdb/pull/1665

Differential Revision: D4322695

Pulled By: yiwu-arbug

fbshipit-source-id: b327b05
2016-12-16 11:20:08 -08:00
Islam AbdelRahman
935ce6d386 break Flush wait for dropped CF
Summary:
In FlushJob we dont do the Flush if the CF is dropped
https://github.com/facebook/rocksdb/blob/master/db/flush_job.cc#L184-L188

but inside WaitForFlushMemTable we keep waiting forever even if the CF is dropped.
Closes https://github.com/facebook/rocksdb/pull/1664

Differential Revision: D4321032

Pulled By: IslamAbdelRahman

fbshipit-source-id: 6e2b25d
2016-12-14 13:27:12 -08:00
Islam AbdelRahman
c93f7848c6 Disallow ingesting files into dropped CFs
Summary:
This PR update IngestExternalFile to return an error if we try to ingest a file into a dropped CF.

Right now if IngestExternalFile want to flush a memtable, and it's ingesting a file into a dropped CF, it will wait forever since flushing is not possible for the dropped CF
Closes https://github.com/facebook/rocksdb/pull/1657

Differential Revision: D4318657

Pulled By: IslamAbdelRahman

fbshipit-source-id: ed6ea2b
2016-12-14 13:26:40 -08:00
Islam AbdelRahman
882e706400 Fix issue where IngestExternalFile insert blocks in block cache with g_seqno=0
Summary:
When we Ingest an external file we open it to read some metadata and first/last key
during doing that we insert blocks into the block cache with global_seqno = 0

If we move the file (did not copy it) into the DB, we will use these blocks with the wrong seqno in the read path
Closes https://github.com/facebook/rocksdb/pull/1627

Differential Revision: D4293332

Pulled By: yiwu-arbug

fbshipit-source-id: 3ce5523
2016-12-14 13:26:27 -08:00
Islam AbdelRahman
f04765f7cf Add EventListener::OnExternalFileIngested() event
Summary:
Add EventListener::OnExternalFileIngested() to allow user to subscribe to external file ingestion events
Closes https://github.com/facebook/rocksdb/pull/1623

Differential Revision: D4285844

Pulled By: IslamAbdelRahman

fbshipit-source-id: 0b95a88
2016-12-14 13:25:48 -08:00
Islam AbdelRahman
7768975517 Allow user to specify a CF for SST files generated by SstFileWriter
Summary:
Allow user to explicitly specify that the generated file by SstFileWriter will be ingested in a specific CF.
This allow us to persist the CF id in the generated file
Closes https://github.com/facebook/rocksdb/pull/1615

Differential Revision: D4270422

Pulled By: IslamAbdelRahman

fbshipit-source-id: 7fb954e
2016-12-14 13:24:14 -08:00
Mike Kolupaev
314828c973 Fixed a crash in debug build in flush_job.cc
Summary:
It was doing `&range_del_iters[0]` on an empty vector. Even though the resulting pointer is never dereferenced, it's still bad for two reasons:
* the practical reason: it crashes with `std::out_of_range` exception in our debug build,
* the "C++ standard lawyer" reason: it's undefined behavior because, in `std::vector` implementation, it probably "dereferences" a null pointer, which is invalid even though it doesn't actually read the pointed memory, just converts a pointer into a reference (and then flush_job.cc converts it back to pointer); nullptr references are undefined behavior.
Closes https://github.com/facebook/rocksdb/pull/1612

Differential Revision: D4265625

Pulled By: al13n321

fbshipit-source-id: db26fb9
2016-12-14 13:08:37 -08:00
Yi Wu
8a6d7a349a Mention IngestExternalFile changes in HISTORY.md
Summary:
I hit the land button too fast and didn't include the line.
Closes https://github.com/facebook/rocksdb/pull/1622

Differential Revision: D4281316

Pulled By: yiwu-arbug

fbshipit-source-id: c7b38e0
2016-12-05 16:15:03 -08:00
Yi Wu
1991ad0409 Update HISTORY.md for 5.0 branch
Summary:
These changes are included in the new branch-cut.
Closes https://github.com/facebook/rocksdb/pull/1621

Differential Revision: D4281015

Pulled By: yiwu-arbug

fbshipit-source-id: d88858b
2016-12-05 16:14:44 -08:00
Mike Kolupaev
2be7301d42 Fixed CompactionFilter::Decision::kRemoveAndSkipUntil
Summary:
Embarassingly enough, the first time I tried to use my new feature in logdevice it crashed with this assertion failure:

  db/pinned_iterators_manager.h:30: void rocksdb::PinnedIteratorsManager::StartPinning(): Assertion `pinning_enabled == false' failed

The issue was that `pinned_iters_mgr_.StartPinning()` was called but `pinned_iters_mgr_.ReleasePinnedData()` wasn't.
Closes https://github.com/facebook/rocksdb/pull/1611

Differential Revision: D4265622

Pulled By: al13n321

fbshipit-source-id: 747b10f
2016-12-05 15:35:20 -08:00
46 changed files with 1093 additions and 193 deletions

View File

@ -502,6 +502,7 @@ set(TESTS
db/db_iter_test.cc
db/db_log_iter_test.cc
db/db_memtable_test.cc
db/db_merge_operator_test.cc
db/db_options_test.cc
db/db_properties_test.cc
db/db_table_properties_test.cc

View File

@ -1,4 +1,8 @@
# RocksDB default options change log
## 5.0 (11/17/2016)
* Options::allow_concurrent_memtable_write and Options::enable_write_thread_adaptive_yield are now true by default
* Options.level0_stop_writes_trigger default value changes from 24 to 32.
## 4.8.0 (5/2/2016)
* options.max_open_files changes from 5000 to -1. It improves performance, but users need to set file descriptor limit to be large enough and watch memory usage for index and bloom filters.
* options.base_background_compactions changes from max_background_compactions to 1. When users set higher max_background_compactions but the write throughput is not high, the writes are less spiky to disks.

View File

@ -1,9 +1,11 @@
# Rocksdb Change Log
## Unreleased
### Public API Change
* Options.level0_stop_writes_trigger default value changes from 24 to 32.
* New compaction filter API: CompactionFilter::FilterV2(). Allows to drop ranges of keys.
* Removed flashcache support.
### Bug fixes
* Fix the data corruption bug in the case that concurrent memtable write is enabled and 2PC is used.
* OptimizeForPointLookup() doesn't work with the default DB setting of allow_concurrent_memtable_write=true. Fix it.
* Fix a 2PC related bug where WAL files size grow too large.
* Fix the bug that if 2PC is enabled, checkpoints may loss some recent transactions.
* When file copying is needed when creating checkpoints or bulk loading files, fsync the file after the file copying.
## 5.0.0 (11/17/2016)
### Public API Change
@ -13,6 +15,10 @@
* Support dynamically change `delayed_write_rate` option via SetDBOptions().
* Options::allow_concurrent_memtable_write and Options::enable_write_thread_adaptive_yield are now true by default.
* Remove Tickers::SEQUENCE_NUMBER to avoid confusion if statistics object is shared among RocksDB instance. Alternatively DB::GetLatestSequenceNumber() can be used to get the same value.
* Options.level0_stop_writes_trigger default value changes from 24 to 32.
* New compaction filter API: CompactionFilter::FilterV2(). Allows to drop ranges of keys.
* Removed flashcache support.
* DB::AddFile() is deprecated and is replaced with DB::IngestExternalFile(). DB::IngestExternalFile() remove all the restrictions that existed for DB::AddFile.
### New Features
* Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable via SetDBOptions().

View File

@ -220,10 +220,6 @@ default: all
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
-Wno-unused-parameter
ifndef DISABLE_WARNING_AS_ERROR
WARNING_FLAGS += -Werror
endif
ifdef LUA_PATH
@ -309,6 +305,7 @@ TESTS = \
db_inplace_update_test \
db_iterator_test \
db_memtable_test \
db_merge_operator_test \
db_options_test \
db_range_del_test \
db_sst_test \
@ -996,6 +993,9 @@ db_iterator_test: db/db_iterator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHA
db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

View File

@ -51,12 +51,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
FBCODE_BUILD="true"
# If we're compiling with TSAN we need pic build
PIC_BUILD=$COMPILE_WITH_TSAN
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
source "$PWD/build_tools/fbcode_config.sh"
else
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
fi
fi
# Delete existing output, if it exists

View File

@ -1,18 +1,19 @@
GCC_BASE=/mnt/gvfs/third-party2/gcc/cf7d14c625ce30bae1a4661c2319c5a283e4dd22/4.9.x/centos6-native/108cf83
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/8598c375b0e94e1448182eb3df034704144a838d/stable/centos6-native/3f16ddd
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/d6e0a7da6faba45f5e5b1638f9edd7afc2f34e7d/4.9.x/gcc-4.9-glibc-2.20/024dbc3
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/d282e6e8f3d20f4e40a516834847bdc038e07973/2.20/gcc-4.9-glibc-2.20/500e281
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/0882df3713c7a84f15abe368dc004581f20b39d7/1.2.8/gcc-5-glibc-2.23/9bc6787
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/740325875f6729f42d28deaa2147b0854f3a347e/1.0.6/gcc-5-glibc-2.23/9bc6787
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/9455f75ff7f4831dc9fda02a6a0f8c68922fad8f/1.0.0/gcc-5-glibc-2.23/9bc6787
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/f001a51b2854957676d07306ef3abf67186b5c8b/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/fc8a13ca1fffa4d0765c716c5a0b49f0c107518f/master/gcc-5-glibc-2.23/1c32b4b
NUMA_BASE=/mnt/gvfs/third-party2/numa/17c514c4d102a25ca15f4558be564eeed76f4b6a/2.0.8/gcc-5-glibc-2.23/9bc6787
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb
TBB_BASE=/mnt/gvfs/third-party2/tbb/9d9a554877d0c5bef330fe818ab7178806dd316a/4.0_update2/gcc-4.9-glibc-2.20/e9936bf
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/7c111ff27e0c466235163f00f280a9d617c3d2ec/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/b7fd454c4b10c6a81015d4524ed06cdeab558490/2.26/centos6-native/da39a3e
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d7f4d4d86674a57668e3a96f76f0e17dd0eb8765/3.10.0/gcc-4.9-glibc-2.20/e9936bf
LUA_BASE=/mnt/gvfs/third-party2/lua/61e4abf5813bbc39bc4f548757ccfcadde175a48/5.2.3/gcc-4.9-glibc-2.20/690f0d7
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832

View File

@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
CFLAGS=""
# libgcc
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
# glibc
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
CFLAGS+=" -DLZ4"
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
CFLAGS+=" -DZSTD"
fi
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
if test -z $PIC_BUILD; then
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
# location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
if test -z $PIC_BUILD; then
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
else
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
fi
CFLAGS+=" -DGFLAGS=google"
CFLAGS+=" -DGFLAGS=gflags"
# location of jemalloc
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
@ -104,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
CXX="$GCC_BASE/bin/g++"
CFLAGS+=" -B$BINUTILS/gold"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
JEMALLOC=1
else
# clang
@ -116,8 +120,8 @@ else
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $CLANG_INCLUDE"
@ -128,13 +132,14 @@ else
fi
CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
# required by libtbb
EXEC_LDFLAGS+=" -ldl"
@ -144,12 +149,4 @@ EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GF
VALGRIND_VER="$VALGRIND_BASE/bin/"
LUA_PATH="$LUA_BASE"
if test -z $PIC_BUILD; then
LUA_LIB=" $LUA_PATH/lib/liblua.a"
else
LUA_LIB=" $LUA_PATH/lib/liblua_pic.a"
fi
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD LUA_PATH LUA_LIB
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD

View File

@ -370,7 +370,8 @@ ColumnFamilyData::ColumnFamilyData(
column_family_set_(column_family_set),
pending_flush_(false),
pending_compaction_(false),
prev_compaction_needed_bytes_(0) {
prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc) {
Ref();
// Convert user defined table properties collector factories to internal ones.
@ -492,6 +493,25 @@ ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
}
uint64_t ColumnFamilyData::OldestLogToKeep() {
auto current_log = GetLogNumber();
if (allow_2pc_) {
auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
if (imm_prep_log > 0 && imm_prep_log < current_log) {
current_log = imm_prep_log;
}
if (mem_prep_log > 0 && mem_prep_log < current_log) {
current_log = mem_prep_log;
}
}
return current_log;
}
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;

View File

@ -239,6 +239,9 @@ class ColumnFamilyData {
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
// calculate the oldest log needed for the durability of this column family
uint64_t OldestLogToKeep();
// See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
@ -404,6 +407,9 @@ class ColumnFamilyData {
bool pending_compaction_;
uint64_t prev_compaction_needed_bytes_;
// if the database was opened with 2pc enabled
bool allow_2pc_;
};
// ColumnFamilySet has interesting thread-safety requirements

View File

@ -437,9 +437,7 @@ void CompactionIterator::NextFromInput() {
bottommost_level_);
merge_out_iter_.SeekToFirst();
if (merge_helper_->FilteredUntil(&skip_until)) {
need_skip = true;
} else if (merge_out_iter_.Valid()) {
if (merge_out_iter_.Valid()) {
// NOTE: key, value, and ikey_ refer to old entries.
// These will be correctly set below.
key_ = merge_out_iter_.key();
@ -460,6 +458,10 @@ void CompactionIterator::NextFromInput() {
// coming after the merges
has_current_user_key_ = false;
pinned_iters_mgr_.ReleasePinnedData();
if (merge_helper_->FilteredUntil(&skip_until)) {
need_skip = true;
}
}
} else {
// 1. new user key -OR-

View File

@ -272,8 +272,8 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
return Decision::kKeep;
}
if (k == "i") {
EXPECT_EQ(ValueType::kValue, t);
EXPECT_EQ("iv95", v);
EXPECT_EQ(ValueType::kMergeOperand, t);
EXPECT_EQ("im95", v);
*skip_until = "z";
return Decision::kRemoveAndSkipUntil;
}
@ -299,10 +299,10 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
test::KeyStr("f", 30, kTypeMerge), // skip to "g+"
test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue),
test::KeyStr("h", 91, kTypeValue), // keep
test::KeyStr("i", 95, kTypeValue), // skip to "z"
test::KeyStr("i", 95, kTypeMerge), // skip to "z"
test::KeyStr("j", 99, kTypeValue)},
{"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
"fv25", "gv90", "hv91", "iv95", "jv99"},
"fv25", "gv90", "hv91", "im95", "jv99"},
{}, {}, kMaxSequenceNumber, &merge_op, &filter);
// Compaction should output just "a", "e" and "h" keys.

View File

@ -772,9 +772,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
key, sub_compact->current_output_file_size) &&
sub_compact->builder != nullptr) {
CompactionIterationStats range_del_out_stats;
status =
FinishCompactionOutputFile(input->status(), sub_compact,
range_del_agg.get(), &range_del_out_stats);
status = FinishCompactionOutputFile(input->status(), sub_compact,
range_del_agg.get(),
&range_del_out_stats, &key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
if (!status.ok()) {
@ -853,9 +853,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
}
Status input_status = input->status();
c_iter->Next();
// Close output file if it is big enough
// TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is
@ -864,6 +861,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (sub_compact->compaction->output_level() != 0 &&
sub_compact->current_output_file_size >=
sub_compact->compaction->max_output_file_size()) {
Status input_status = input->status();
c_iter->Next();
const Slice* next_key = nullptr;
if (c_iter->Valid()) {
next_key = &c_iter->key();
@ -879,6 +879,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// files.
sub_compact->compression_dict = std::move(compression_dict);
}
} else {
c_iter->Next();
}
}

View File

@ -338,6 +338,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
last_stats_dump_time_microsec_(0),
next_job_id_(1),
has_unpersisted_data_(false),
unable_to_flush_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE
@ -654,6 +655,10 @@ void DBImpl::MaybeDumpStats() {
}
uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
if (!allow_2pc()) {
return 0;
}
uint64_t min_log = 0;
// we must look through the memtables for two phase transactions
@ -698,6 +703,11 @@ void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
}
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
if (!allow_2pc()) {
return 0;
}
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
uint64_t min_log = 0;
@ -736,6 +746,34 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
}
}
uint64_t DBImpl::MinLogNumberToKeep() {
uint64_t log_number = versions_->MinLogNumber();
if (allow_2pc()) {
// if are 2pc we must consider logs containing prepared
// sections of outstanding transactions.
//
// We must check min logs with outstanding prep before we check
// logs referneces by memtables because a log referenced by the
// first data structure could transition to the second under us.
//
// TODO(horuff): iterating over all column families under db mutex.
// should find more optimial solution
auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) {
log_number = min_log_in_prep_heap;
}
auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) {
log_number = min_log_refed_by_mem;
}
}
return log_number;
}
// * Returns the list of live files in 'sst_live'
// If it's doing full scan:
// * Returns the list of all files in the filesystem in
@ -794,32 +832,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->manifest_file_number = versions_->manifest_file_number();
job_context->pending_manifest_file_number =
versions_->pending_manifest_file_number();
job_context->log_number = versions_->MinLogNumber();
if (allow_2pc()) {
// if are 2pc we must consider logs containing prepared
// sections of outstanding transactions.
//
// We must check min logs with outstanding prep before we check
// logs referneces by memtables because a log referenced by the
// first data structure could transition to the second under us.
//
// TODO(horuff): iterating over all column families under db mutex.
// should find more optimial solution
auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
if (min_log_in_prep_heap != 0 &&
min_log_in_prep_heap < job_context->log_number) {
job_context->log_number = min_log_in_prep_heap;
}
auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
if (min_log_refed_by_mem != 0 &&
min_log_refed_by_mem < job_context->log_number) {
job_context->log_number = min_log_refed_by_mem;
}
}
job_context->log_number = MinLogNumberToKeep();
job_context->prev_log_number = versions_->prev_log_number();
@ -2493,7 +2506,7 @@ Status DBImpl::SetDBOptions(
mutable_db_options_ = new_options;
if (total_log_size_ > GetMaxTotalWalSize()) {
FlushColumnFamilies();
MaybeFlushColumnFamilies();
}
persist_options_status = PersistOptions();
@ -2939,6 +2952,12 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
if (cfd->IsDropped()) {
// FlushJob cannot flush a dropped CF, if we did not break here
// we will loop forever since cfd->imm()->NumNotFlushed() will never
// drop to zero
return Status::InvalidArgument("Cannot flush a dropped CF");
}
bg_cv_.Wait();
}
if (!bg_error_.ok()) {
@ -4680,9 +4699,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
if (UNLIKELY(!single_column_family_mode_ &&
!alive_log_files_.begin()->getting_flushed &&
total_log_size_ > GetMaxTotalWalSize())) {
FlushColumnFamilies();
MaybeFlushColumnFamilies();
} else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
@ -5000,28 +5018,40 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status;
}
void DBImpl::FlushColumnFamilies() {
void DBImpl::MaybeFlushColumnFamilies() {
mutex_.AssertHeld();
WriteContext context;
if (alive_log_files_.begin()->getting_flushed) {
return;
}
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
alive_log_files_.begin()->getting_flushed = true;
auto oldest_alive_log = alive_log_files_.begin()->number;
auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep();
if (allow_2pc() &&
unable_to_flush_oldest_log_ &&
oldest_log_with_uncommited_prep > 0 &&
oldest_log_with_uncommited_prep <= oldest_alive_log) {
// we already attempted to flush all column families dependent on
// the oldest alive log but the log still contained uncommited transactions.
// the oldest alive log STILL contains uncommited transaction so there
// is still nothing that we can do.
return;
}
WriteContext context;
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
"Flushing all column families with data in WAL number %" PRIu64
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
flush_column_family_if_log_file, total_log_size_, GetMaxTotalWalSize());
oldest_alive_log, total_log_size_, GetMaxTotalWalSize());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
auto status = SwitchMemtable(cfd, &context);
if (!status.ok()) {
break;
@ -5031,6 +5061,26 @@ void DBImpl::FlushColumnFamilies() {
}
}
MaybeScheduleFlushOrCompaction();
// we only mark this log as getting flushed if we have successfully
// flushed all data in this log. If this log contains outstanding prepred
// transactions then we cannot flush this log until those transactions are commited.
unable_to_flush_oldest_log_ = false;
if (allow_2pc()) {
if (oldest_log_with_uncommited_prep == 0 ||
oldest_log_with_uncommited_prep > oldest_alive_log) {
// this log contains no outstanding prepared transactions
alive_log_files_.begin()->getting_flushed = true;
} else {
Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log,
"Unable to release oldest log due to uncommited transaction");
unable_to_flush_oldest_log_ = true;
}
} else {
alive_log_files_.begin()->getting_flushed = true;
}
}
uint64_t DBImpl::GetMaxTotalWalSize() const {
@ -6493,14 +6543,23 @@ Status DBImpl::IngestExternalFile(
num_running_ingest_file_++;
// We cannot ingest a file into a dropped CF
if (cfd->IsDropped()) {
status = Status::InvalidArgument(
"Cannot ingest an external file into a dropped CF");
}
// Figure out if we need to flush the memtable first
if (status.ok()) {
bool need_flush = false;
status = ingestion_job.NeedsFlush(&need_flush);
if (status.ok() && need_flush) {
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(), true /* writes_stopped */);
mutex_.Lock();
}
}
// Run the ingestion job
if (status.ok()) {
@ -6541,9 +6600,35 @@ Status DBImpl::IngestExternalFile(
// Cleanup
ingestion_job.Cleanup(status);
if (status.ok()) {
NotifyOnExternalFileIngested(cfd, ingestion_job);
}
return status;
}
void DBImpl::NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.empty()) {
return;
}
for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
ExternalFileIngestionInfo info;
info.cf_name = cfd->GetName();
info.external_file_path = f.external_file_path;
info.internal_file_path = f.internal_file_path;
info.global_seqno = f.assigned_seqno;
info.table_properties = f.table_properties;
for (auto listener : immutable_db_options_.listeners) {
listener->OnExternalFileIngested(this, info);
}
}
#endif
}
void DBImpl::WaitForIngestFile() {
mutex_.AssertHeld();
while (num_running_ingest_file_ > 0) {

View File

@ -23,6 +23,7 @@
#include "db/column_family.h"
#include "db/compaction_job.h"
#include "db/dbformat.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
@ -307,6 +308,16 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family = nullptr,
bool disallow_trivial_move = false);
void TEST_MaybeFlushColumnFamilies();
bool TEST_UnableToFlushOldestLog() {
return unable_to_flush_oldest_log_;
}
bool TEST_IsLogGettingFlushed() {
return alive_log_files_.begin()->getting_flushed;
}
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable(bool wait = true,
ColumnFamilyHandle* cfh = nullptr);
@ -379,6 +390,8 @@ class DBImpl : public DB {
// schedule a purge
void ScheduleBgLogWriterClose(JobContext* job_context);
uint64_t MinLogNumberToKeep();
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than
@ -556,6 +569,9 @@ class DBImpl : public DB {
void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
const MemTableInfo& mem_table_info);
void NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
@ -728,7 +744,7 @@ class DBImpl : public DB {
// REQUIRES: mutex locked
Status PersistOptions();
void FlushColumnFamilies();
void MaybeFlushColumnFamilies();
uint64_t GetMaxTotalWalSize() const;
@ -987,6 +1003,15 @@ class DBImpl : public DB {
// Used when disableWAL is true.
bool has_unpersisted_data_;
// if an attempt was made to flush all column families that
// the oldest log depends on but uncommited data in the oldest
// log prevents the log from being released.
// We must attempt to free the dependent memtables again
// at a later time after the transaction in the oldest
// log is fully commited.
bool unable_to_flush_oldest_log_;
static const int KEEP_LOG_FILE_NUM = 1000;
// MSVC version 1800 still does not have constexpr for ::max()
static const uint64_t kNoTimeOut = port::kMaxUint64;

View File

@ -19,6 +19,11 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
}
void DBImpl::TEST_MaybeFlushColumnFamilies() {
InstrumentedMutexLock l(&mutex_);
MaybeFlushColumnFamilies();
}
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd;

View File

@ -518,6 +518,7 @@ void DBIter::MergeValuesNewToOld() {
iter_->IsValuePinned() /* operand_pinned */);
ParsedInternalKey ikey;
Status s;
for (iter_->Next(); iter_->Valid(); iter_->Next()) {
if (!ParseKey(&ikey)) {
// skip corrupted key
@ -538,9 +539,12 @@ void DBIter::MergeValuesNewToOld() {
// final result in saved_value_. We are done!
// ignore corruption if there is any.
const Slice val = iter_->value();
MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
s = MergeHelper::TimedFullMerge(
merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_, &pinned_value_);
if (!s.ok()) {
status_ = s;
}
// iter_ is positioned after put
iter_->Next();
return;
@ -559,9 +563,12 @@ void DBIter::MergeValuesNewToOld() {
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
if (!s.ok()) {
status_ = s;
}
}
void DBIter::Prev() {
@ -742,6 +749,7 @@ bool DBIter::FindValueForCurrentKey() {
FindParseableKey(&ikey, kReverse);
}
Status s;
switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeSingleDeletion:
@ -753,16 +761,16 @@ bool DBIter::FindValueForCurrentKey() {
if (last_not_merge_type == kTypeDeletion ||
last_not_merge_type == kTypeSingleDeletion ||
last_not_merge_type == kTypeRangeDeletion) {
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
&pinned_value_);
&saved_value_, logger_, statistics_,
env_, &pinned_value_);
} else {
assert(last_not_merge_type == kTypeValue);
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
&pinned_value_,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetKey(), &pinned_value_,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
env_, &pinned_value_);
}
break;
case kTypeValue:
@ -773,6 +781,9 @@ bool DBIter::FindValueForCurrentKey() {
break;
}
valid_ = true;
if (!s.ok()) {
status_ = s;
}
return true;
}
@ -818,13 +829,15 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
FindParseableKey(&ikey, kForward);
}
Status s;
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(ikey)) {
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
&pinned_value_);
// Make iter_ valid and point to saved_key_
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
@ -832,14 +845,20 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
valid_ = true;
if (!s.ok()) {
status_ = s;
}
return true;
}
const Slice& val = iter_->value();
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
valid_ = true;
if (!s.ok()) {
status_ = s;
}
return true;
}

View File

@ -0,0 +1,98 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <string>
#include <vector>
#include "db/db_test_util.h"
#include "db/forward_iterator.h"
#include "port/stack_trace.h"
#include "utilities/merge_operators.h"
namespace rocksdb {
// Test merge operator functionality.
class DBMergeOperatorTest : public DBTestBase {
public:
DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
};
// A test merge operator mimics put but also fails if one of merge operands is
// "corrupted".
class TestPutOperator : public MergeOperator {
public:
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
if (merge_in.existing_value != nullptr &&
*(merge_in.existing_value) == "corrupted") {
return false;
}
for (auto value : merge_in.operand_list) {
if (value == "corrupted") {
return false;
}
}
merge_out->existing_operand = merge_in.operand_list.back();
return true;
}
virtual const char* Name() const override { return "TestPutOperator"; }
};
TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new TestPutOperator());
Reopen(options);
ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Merge("k1", "corrupted"));
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
}
TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new TestPutOperator());
options.max_successive_merges = 3;
Reopen(options);
ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Merge("k1", "v2"));
// Will trigger a merge when hitting max_successive_merges and the merge
// will fail. The delta will be inserted nevertheless.
ASSERT_OK(Merge("k1", "corrupted"));
// Data should stay unmerged after the error.
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
}
TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new TestPutOperator());
DestroyAndReopen(options);
ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Merge("k1", "corrupted"));
ASSERT_OK(Put("k2", "v2"));
VerifyDBFromMap({{"k1", ""}, {"k2", "v2"}}, nullptr, false,
{{"k1", Status::Corruption()}});
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
DestroyAndReopen(options);
ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Merge("k2", "corrupted"));
VerifyDBFromMap({{"k1", "v1"}, {"k2", ""}}, nullptr, false,
{{"k2", Status::Corruption()}});
VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -112,6 +112,59 @@ TEST_F(DBRangeDelTest, CompactionOutputFilesExactlyFilled) {
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
// Ensures range deletion spanning multiple compaction output files that are
// cut by max_compaction_bytes will have non-overlapping key-ranges.
// https://github.com/facebook/rocksdb/issues/1778
const int kNumFiles = 2, kNumPerFile = 1 << 8, kBytesPerVal = 1 << 12;
Options opts = CurrentOptions();
opts.comparator = test::Uint64Comparator();
opts.disable_auto_compactions = true;
opts.level0_file_num_compaction_trigger = kNumFiles;
opts.max_compaction_bytes = kNumPerFile * kBytesPerVal;
opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
// Want max_compaction_bytes to trigger the end of compaction output file, not
// target_file_size_base, so make the latter much bigger
opts.target_file_size_base = 100 * opts.max_compaction_bytes;
Reopen(opts);
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
// It spans the whole key-range, thus will be included in all output files
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(0),
GetNumericStr(kNumFiles * kNumPerFile - 1)));
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
std::vector<std::string> values;
// Write 1MB (256 values, each 4K)
for (int j = 0; j < kNumPerFile; j++) {
values.push_back(RandomString(&rnd, kBytesPerVal));
ASSERT_OK(Put(GetNumericStr(kNumPerFile * i + j), values[j]));
}
// extra entry to trigger SpecialSkipListFactory's flush
ASSERT_OK(Put(GetNumericStr(kNumPerFile), ""));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GE(NumTableFilesAtLevel(1), 2);
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
for (size_t i = 0; i < files[1].size() - 1; ++i) {
ASSERT_TRUE(InternalKeyComparator(opts.comparator)
.Compare(files[1][i].largest, files[1][i + 1].smallest) <
0);
}
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, FlushRangeDelsSameStartKey) {
db_->Put(WriteOptions(), "b1", "val");
ASSERT_OK(

View File

@ -2212,6 +2212,18 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, OptimizeForPointLookup) {
Options options = CurrentOptions();
Close();
options.OptimizeForPointLookup(2);
ASSERT_OK(DB::Open(options, dbname_, &db_));
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
Flush();
ASSERT_EQ("v1", Get("foo"));
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

View File

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
#include "db/forward_iterator.h"
namespace rocksdb {
@ -501,6 +502,15 @@ Status DBTestBase::Put(int cf, const Slice& k, const Slice& v,
}
}
Status DBTestBase::Merge(const Slice& k, const Slice& v, WriteOptions wo) {
return db_->Merge(wo, k, v);
}
Status DBTestBase::Merge(int cf, const Slice& k, const Slice& v,
WriteOptions wo) {
return db_->Merge(wo, handles_[cf], k, v);
}
Status DBTestBase::Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
@ -1089,11 +1099,18 @@ std::vector<std::uint64_t> DBTestBase::ListTableFiles(Env* env,
}
void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
size_t* total_reads_res, bool tailing_iter) {
size_t* total_reads_res, bool tailing_iter,
std::map<std::string, Status> status) {
size_t total_reads = 0;
for (auto& kv : true_data) {
Status s = status[kv.first];
if (s.ok()) {
ASSERT_EQ(Get(kv.first), kv.second);
} else {
std::string value;
ASSERT_EQ(s, db_->Get(ReadOptions(), kv.first, &value));
}
total_reads++;
}
@ -1106,21 +1123,40 @@ void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
// Verify Iterator::Next()
iter_cnt = 0;
auto data_iter = true_data.begin();
Status s;
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
ASSERT_EQ(iter->key().ToString(), data_iter->first);
Status current_status = status[data_iter->first];
if (!current_status.ok()) {
s = current_status;
}
ASSERT_EQ(iter->status(), s);
if (current_status.ok()) {
ASSERT_EQ(iter->value().ToString(), data_iter->second);
}
iter_cnt++;
total_reads++;
}
ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
<< true_data.size();
delete iter;
// Verify Iterator::Prev()
// Use a new iterator to make sure its status is clean.
iter = db_->NewIterator(ro);
iter_cnt = 0;
s = Status::OK();
auto data_rev = true_data.rbegin();
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
ASSERT_EQ(iter->key().ToString(), data_rev->first);
Status current_status = status[data_rev->first];
if (!current_status.ok()) {
s = current_status;
}
ASSERT_EQ(iter->status(), s);
if (current_status.ok()) {
ASSERT_EQ(iter->value().ToString(), data_rev->second);
}
iter_cnt++;
total_reads++;
}
@ -1134,7 +1170,6 @@ void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
ASSERT_EQ(kv.second, iter->value().ToString());
total_reads++;
}
delete iter;
}
@ -1176,6 +1211,25 @@ void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
}
}
void DBTestBase::VerifyDBInternal(
std::vector<std::pair<std::string, std::string>> true_data) {
Arena arena;
InternalKeyComparator icmp(last_options_.comparator);
RangeDelAggregator range_del_agg(icmp, {});
auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg);
iter->SeekToFirst();
for (auto p : true_data) {
ASSERT_TRUE(iter->Valid());
ParsedInternalKey ikey;
ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey));
ASSERT_EQ(p.first, ikey.user_key);
ASSERT_EQ(p.second, iter->value());
iter->Next();
};
ASSERT_FALSE(iter->Valid());
iter->~InternalIterator();
}
#ifndef ROCKSDB_LITE
uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily(

View File

@ -699,6 +699,12 @@ class DBTestBase : public testing::Test {
Status Put(int cf, const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions());
Status Merge(const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions());
Status Merge(int cf, const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions());
Status Delete(const std::string& k);
Status Delete(int cf, const std::string& k);
@ -827,7 +833,11 @@ class DBTestBase : public testing::Test {
void VerifyDBFromMap(std::map<std::string, std::string> true_data,
size_t* total_reads_res = nullptr,
bool tailing_iter = false);
bool tailing_iter = false,
std::map<std::string, Status> status = {});
void VerifyDBInternal(
std::vector<std::pair<std::string, std::string>> true_data);
#ifndef ROCKSDB_LITE
uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,

View File

@ -38,6 +38,15 @@ Status ExternalSstFileIngestionJob::Prepare(
files_to_ingest_.push_back(file_to_ingest);
}
for (const IngestedFileInfo& f : files_to_ingest_) {
if (f.cf_id !=
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
f.cf_id != cfd_->GetID()) {
return Status::InvalidArgument(
"External file column family id dont match");
}
}
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
auto num_files = files_to_ingest_.size();
if (num_files == 0) {
@ -87,10 +96,12 @@ Status ExternalSstFileIngestionJob::Prepare(
status = env_->LinkFile(path_outside_db, path_inside_db);
if (status.IsNotSupported()) {
// Original file is on a different FS, use copy instead of hard linking
status = CopyFile(env_, path_outside_db, path_inside_db, 0);
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
}
} else {
status = CopyFile(env_, path_outside_db, path_inside_db, 0);
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
}
TEST_SYNC_POINT("DBImpl::AddFile:FileCopied");
if (!status.ok()) {
@ -302,8 +313,14 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
file_to_ingest->num_entries = props->num_entries;
ParsedInternalKey key;
std::unique_ptr<InternalIterator> iter(
table_reader->NewIterator(ReadOptions()));
ReadOptions ro;
// During reading the external file we can cache blocks that we read into
// the block cache, if we later change the global seqno of this file, we will
// have block in cache that will include keys with wrong seqno.
// We need to disable fill_cache so that we read from the file without
// updating the block cache.
ro.fill_cache = false;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(ro));
// Get first (smallest) key from file
iter->SeekToFirst();
@ -325,6 +342,10 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
}
file_to_ingest->largest_user_key = key.user_key.ToString();
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
file_to_ingest->table_properties = *props;
return status;
}

View File

@ -36,6 +36,10 @@ struct IngestedFileInfo {
uint64_t file_size;
// total number of keys in external file
uint64_t num_entries;
// Id of column family this file shoule be ingested into
uint32_t cf_id;
// TableProperties read from external file
TableProperties table_properties;
// Version of external file
int version;
@ -96,6 +100,10 @@ class ExternalSstFileIngestionJob {
VersionEdit* edit() { return &edit_; }
const autovector<IngestedFileInfo>& files_to_ingest() const {
return files_to_ingest_;
}
private:
// Open the external file and populate `file_to_ingest` with all the
// external information we need to ingest this file.

View File

@ -15,7 +15,7 @@ namespace rocksdb {
class ExternalSSTFileTest : public DBTestBase {
public:
ExternalSSTFileTest() : DBTestBase("/external_sst_file_test") {
sst_files_dir_ = test::TmpDir(env_) + "/sst_files/";
sst_files_dir_ = dbname_ + "/sst_files/";
DestroyAndRecreateExternalSSTFilesDir();
}
@ -28,7 +28,8 @@ class ExternalSSTFileTest : public DBTestBase {
const Options options,
std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
bool allow_global_seqno = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr) {
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
// Generate a file id if not provided
if (file_id == -1) {
file_id = last_file_id_ + 1;
@ -51,7 +52,8 @@ class ExternalSSTFileTest : public DBTestBase {
data.resize(uniq_iter - data.begin());
}
std::string file_path = sst_files_dir_ + ToString(file_id);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator,
cfh);
Status s = sst_file_writer.Open(file_path);
if (!s.ok()) {
@ -69,8 +71,12 @@ class ExternalSSTFileTest : public DBTestBase {
if (s.ok()) {
IngestExternalFileOptions ifo;
ifo.allow_global_seqno = allow_global_seqno;
if (cfh) {
s = db_->IngestExternalFile(cfh, {file_path}, ifo);
} else {
s = db_->IngestExternalFile({file_path}, ifo);
}
}
if (s.ok() && true_data) {
for (auto& entry : data) {
@ -84,25 +90,29 @@ class ExternalSSTFileTest : public DBTestBase {
Status GenerateAndAddExternalFile(
const Options options, std::vector<std::pair<int, std::string>> data,
int file_id = -1, bool allow_global_seqno = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr) {
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
for (auto& entry : data) {
file_data.emplace_back(Key(entry.first), entry.second);
}
return GenerateAndAddExternalFile(options, file_data, file_id,
allow_global_seqno, sort_data, true_data);
allow_global_seqno, sort_data, true_data,
cfh);
}
Status GenerateAndAddExternalFile(
const Options options, std::vector<int> keys, int file_id = -1,
bool allow_global_seqno = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr) {
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
for (auto& k : keys) {
file_data.emplace_back(Key(k), Key(k) + ToString(file_id));
}
return GenerateAndAddExternalFile(options, file_data, file_id,
allow_global_seqno, sort_data, true_data);
allow_global_seqno, sort_data, true_data,
cfh);
}
Status DeprecatedAddFile(const std::vector<std::string>& files,
@ -1780,6 +1790,151 @@ TEST_F(ExternalSSTFileTest, DirtyExit) {
ASSERT_NOK(sst_file_writer->Finish());
}
TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko", "toto"}, options);
SstFileWriter sfw_default(EnvOptions(), options, options.comparator,
handles_[0]);
SstFileWriter sfw_cf1(EnvOptions(), options, options.comparator, handles_[1]);
SstFileWriter sfw_cf2(EnvOptions(), options, options.comparator, handles_[2]);
SstFileWriter sfw_unknown(EnvOptions(), options, options.comparator);
// default_cf.sst
const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst";
ASSERT_OK(sfw_default.Open(cf_default_sst));
ASSERT_OK(sfw_default.Add("K1", "V1"));
ASSERT_OK(sfw_default.Add("K2", "V2"));
ASSERT_OK(sfw_default.Finish());
// cf1.sst
const std::string cf1_sst = sst_files_dir_ + "/cf1.sst";
ASSERT_OK(sfw_cf1.Open(cf1_sst));
ASSERT_OK(sfw_cf1.Add("K3", "V1"));
ASSERT_OK(sfw_cf1.Add("K4", "V2"));
ASSERT_OK(sfw_cf1.Finish());
// cf_unknown.sst
const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst";
ASSERT_OK(sfw_unknown.Open(unknown_sst));
ASSERT_OK(sfw_unknown.Add("K5", "V1"));
ASSERT_OK(sfw_unknown.Add("K6", "V2"));
ASSERT_OK(sfw_unknown.Finish());
IngestExternalFileOptions ifo;
// SST CF dont match
ASSERT_NOK(db_->IngestExternalFile(handles_[0], {cf1_sst}, ifo));
// SST CF dont match
ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf1_sst}, ifo));
// SST CF match
ASSERT_OK(db_->IngestExternalFile(handles_[1], {cf1_sst}, ifo));
// SST CF dont match
ASSERT_NOK(db_->IngestExternalFile(handles_[1], {cf_default_sst}, ifo));
// SST CF dont match
ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf_default_sst}, ifo));
// SST CF match
ASSERT_OK(db_->IngestExternalFile(handles_[0], {cf_default_sst}, ifo));
// SST CF unknown
ASSERT_OK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
// SST CF unknown
ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
// SST CF unknown
ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
// Cannot ingest a file into a dropped CF
ASSERT_OK(db_->DropColumnFamily(handles_[1]));
ASSERT_NOK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
// CF was not dropped, ok to Ingest
ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
}
class TestIngestExternalFileListener : public EventListener {
public:
void OnExternalFileIngested(DB* db,
const ExternalFileIngestionInfo& info) override {
ingested_files.push_back(info);
}
std::vector<ExternalFileIngestionInfo> ingested_files;
};
TEST_F(ExternalSSTFileTest, IngestionListener) {
Options options = CurrentOptions();
TestIngestExternalFileListener* listener =
new TestIngestExternalFileListener();
options.listeners.emplace_back(listener);
CreateAndReopenWithCF({"koko", "toto"}, options);
// Ingest into default cf
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
handles_[0]));
ASSERT_EQ(listener->ingested_files.size(), 1);
ASSERT_EQ(listener->ingested_files.back().cf_name, "default");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
"default");
// Ingest into cf1
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
handles_[1]));
ASSERT_EQ(listener->ingested_files.size(), 2);
ASSERT_EQ(listener->ingested_files.back().cf_name, "koko");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
1);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
"koko");
// Ingest into cf2
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
handles_[2]));
ASSERT_EQ(listener->ingested_files.size(), 3);
ASSERT_EQ(listener->ingested_files.back().cf_name, "toto");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
2);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
"toto");
}
TEST_F(ExternalSSTFileTest, SnapshotInconsistencyBug) {
Options options = CurrentOptions();
DestroyAndReopen(options);
const int kNumKeys = 10000;
// Insert keys using normal path and take a snapshot
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(i), Key(i) + "_V1"));
}
const Snapshot* snap = db_->GetSnapshot();
// Overwrite all keys using IngestExternalFile
std::string sst_file_path = sst_files_dir_ + "file1.sst";
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
ASSERT_OK(sst_file_writer.Open(sst_file_path));
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(sst_file_writer.Add(Key(i), Key(i) + "_V2"));
}
ASSERT_OK(sst_file_writer.Finish());
IngestExternalFileOptions ifo;
ifo.move_files = true;
ASSERT_OK(db_->IngestExternalFile({sst_file_path}, ifo));
for (int i = 0; i < kNumKeys; i++) {
ASSERT_EQ(Get(Key(i), snap), Key(i) + "_V1");
ASSERT_EQ(Get(Key(i)), Key(i) + "_V2");
}
db_->ReleaseSnapshot(snap);
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

View File

@ -277,7 +277,8 @@ Status FlushJob::WriteLevel0Table() {
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
static_cast<int>(memtables.size()), &arena));
std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
&cfd_->internal_comparator(), &range_del_iters[0],
&cfd_->internal_comparator(),
range_del_iters.empty() ? nullptr : &range_del_iters[0],
static_cast<int>(range_del_iters.size())));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",

View File

@ -221,6 +221,7 @@ static const std::string num_live_versions = "num-live-versions";
static const std::string current_version_number =
"current-super-version-number";
static const std::string estimate_live_data_size = "estimate-live-data-size";
static const std::string min_log_number_to_keep = "min-log-number-to-keep";
static const std::string base_level = "base-level";
static const std::string total_sst_files_size = "total-sst-files-size";
static const std::string estimate_pending_comp_bytes =
@ -285,6 +286,8 @@ const std::string DB::Properties::kCurrentSuperVersionNumber =
rocksdb_prefix + current_version_number;
const std::string DB::Properties::kEstimateLiveDataSize =
rocksdb_prefix + estimate_live_data_size;
const std::string DB::Properties::kMinLogNumberToKeep =
rocksdb_prefix + min_log_number_to_keep;
const std::string DB::Properties::kTotalSstFilesSize =
rocksdb_prefix + total_sst_files_size;
const std::string DB::Properties::kBaseLevel = rocksdb_prefix + base_level;
@ -368,6 +371,8 @@ const std::unordered_map<std::string, DBPropertyInfo>
nullptr}},
{DB::Properties::kEstimateLiveDataSize,
{true, nullptr, &InternalStats::HandleEstimateLiveDataSize, nullptr}},
{DB::Properties::kMinLogNumberToKeep,
{false, nullptr, &InternalStats::HandleMinLogNumberToKeep, nullptr}},
{DB::Properties::kBaseLevel,
{false, nullptr, &InternalStats::HandleBaseLevel, nullptr}},
{DB::Properties::kTotalSstFilesSize,
@ -705,6 +710,12 @@ bool InternalStats::HandleEstimateLiveDataSize(uint64_t* value, DBImpl* db,
return true;
}
bool InternalStats::HandleMinLogNumberToKeep(uint64_t* value, DBImpl* db,
Version* version) {
*value = db->MinLogNumberToKeep();
return true;
}
void InternalStats::DumpDBStats(std::string* value) {
char buf[1000];
// DB-level stats, only available from default column family

View File

@ -401,6 +401,7 @@ class InternalStats {
Version* version);
bool HandleEstimateLiveDataSize(uint64_t* value, DBImpl* db,
Version* version);
bool HandleMinLogNumberToKeep(uint64_t* value, DBImpl* db, Version* version);
// Total number of background errors encountered. Every time a flush task
// or compaction task fails, this counter is incremented. The failure can

View File

@ -221,6 +221,8 @@ class MemTableList {
// PickMemtablesToFlush() is called.
void FlushRequested() { flush_requested_ = true; }
bool HasFlushRequested() { return flush_requested_; }
// Copying allowed
// MemTableList(const MemTableList&);
// void operator=(const MemTableList&);

View File

@ -317,7 +317,7 @@ void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
while (w != pg->last_writer) {
// Writers that won't write don't get sequence allotment
if (!w->CallbackFailed()) {
if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) {
sequence += WriteBatchInternal::Count(w->batch);
}
w = w->link_newer;

View File

@ -500,6 +500,10 @@ class DB {
// live data in bytes.
static const std::string kEstimateLiveDataSize;
// "rocksdb.min-log-number-to-keep" - return the minmum log number of the
// log files that should be kept.
static const std::string kMinLogNumberToKeep;
// "rocksdb.total-sst-files-size" - returns total size (bytes) of all SST
// files.
// WARNING: may slow down online queries if there are too many files.
@ -565,6 +569,7 @@ class DB {
// "rocksdb.num-live-versions"
// "rocksdb.current-super-version-number"
// "rocksdb.estimate-live-data-size"
// "rocksdb.min-log-number-to-keep"
// "rocksdb.total-sst-files-size"
// "rocksdb.base-level"
// "rocksdb.estimate-pending-compaction-bytes"

View File

@ -170,6 +170,20 @@ struct MemTableInfo {
};
struct ExternalFileIngestionInfo {
// the name of the column family
std::string cf_name;
// Path of the file outside the DB
std::string external_file_path;
// Path of the file inside the DB
std::string internal_file_path;
// The global sequence number assigned to keys in this file
SequenceNumber global_seqno;
// Table properties of the table being flushed
TableProperties table_properties;
};
// EventListener class contains a set of call-back functions that will
// be called when specific RocksDB event happens such as flush. It can
// be used as a building block for developing custom features such as
@ -291,6 +305,15 @@ class EventListener {
virtual void OnColumnFamilyHandleDeletionStarted(ColumnFamilyHandle* handle) {
}
// A call-back function for RocksDB which will be called after an external
// file is ingested using IngestExternalFile.
//
// Note that the this function will run on the same thread as
// IngestExternalFile(), if this function is blocked, IngestExternalFile()
// will be blocked from finishing.
virtual void OnExternalFileIngested(
DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}
virtual ~EventListener() {}
};

View File

@ -7,6 +7,7 @@
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
namespace rocksdb {
@ -43,8 +44,12 @@ struct ExternalSstFileInfo {
// All keys in files generated by SstFileWriter will have sequence number = 0
class SstFileWriter {
public:
// User can pass `column_family` to specify that the the generated file will
// be ingested into this column_family, note that passing nullptr means that
// the column_family is unknown.
SstFileWriter(const EnvOptions& env_options, const Options& options,
const Comparator* user_comparator);
const Comparator* user_comparator,
ColumnFamilyHandle* column_family = nullptr);
~SstFileWriter();

View File

@ -6,7 +6,7 @@
#define ROCKSDB_MAJOR 5
#define ROCKSDB_MINOR 0
#define ROCKSDB_PATCH 0
#define ROCKSDB_PATCH 2
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

2
src.mk
View File

@ -233,6 +233,8 @@ MAIN_SOURCES = \
db/db_inplace_update_test.cc \
db/db_iterator_test.cc \
db/db_log_iter_test.cc \
db/db_memtable_test.cc \
db/db_merge_operator_test.cc \
db/db_options_test.cc \
db/db_range_del_test.cc \
db/db_sst_test.cc \

View File

@ -21,11 +21,12 @@ const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
struct SstFileWriter::Rep {
Rep(const EnvOptions& _env_options, const Options& options,
const Comparator* _user_comparator)
const Comparator* _user_comparator, ColumnFamilyHandle* _cfh)
: env_options(_env_options),
ioptions(options),
mutable_cf_options(options),
internal_comparator(_user_comparator) {}
internal_comparator(_user_comparator),
cfh(_cfh) {}
std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<TableBuilder> builder;
@ -34,14 +35,16 @@ struct SstFileWriter::Rep {
MutableCFOptions mutable_cf_options;
InternalKeyComparator internal_comparator;
ExternalSstFileInfo file_info;
std::string column_family_name;
InternalKey ikey;
std::string column_family_name;
ColumnFamilyHandle* cfh;
};
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
const Options& options,
const Comparator* user_comparator)
: rep_(new Rep(env_options, options, user_comparator)) {}
const Comparator* user_comparator,
ColumnFamilyHandle* column_family)
: rep_(new Rep(env_options, options, user_comparator, column_family)) {}
SstFileWriter::~SstFileWriter() {
if (rep_->builder) {
@ -89,6 +92,18 @@ Status SstFileWriter::Open(const std::string& file_path) {
user_collector_factories[i]));
}
int unknown_level = -1;
uint32_t cf_id;
if (r->cfh != nullptr) {
// user explicitly specified that this file will be ingested into cfh,
// we can persist this information in the file.
cf_id = r->cfh->GetID();
r->column_family_name = r->cfh->GetName();
} else {
r->column_family_name = "";
cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
}
TableBuilderOptions table_builder_options(
r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
compression_type, r->ioptions.compression_opts,
@ -100,9 +115,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
// TODO(tec) : If table_factory is using compressed block cache, we will
// be adding the external sst file blocks into it, which is wasteful.
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
table_builder_options,
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
r->file_writer.get()));
table_builder_options, cf_id, r->file_writer.get()));
r->file_info.file_path = file_path;
r->file_info.file_size = 0;

View File

@ -16,7 +16,7 @@ namespace rocksdb {
// Utility function to copy a file up to a specified length
Status CopyFile(Env* env, const std::string& source,
const std::string& destination, uint64_t size) {
const std::string& destination, uint64_t size, bool use_fsync) {
const EnvOptions soptions;
Status s;
unique_ptr<SequentialFileReader> src_reader;
@ -62,6 +62,7 @@ Status CopyFile(Env* env, const std::string& source,
}
size -= slice.size();
}
dest_writer->Sync(use_fsync);
return Status::OK();
}

View File

@ -12,9 +12,11 @@
#include "util/db_options.h"
namespace rocksdb {
// use_fsync maps to options.use_fsync, which determines the way that
// the file is synced after copying.
extern Status CopyFile(Env* env, const std::string& source,
const std::string& destination, uint64_t size = 0);
const std::string& destination, uint64_t size,
bool use_fsync);
extern Status CreateFile(Env* env, const std::string& destination,
const std::string& contents);

View File

@ -706,7 +706,7 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeForPointLookup(
block_based_options.block_cache =
NewLRUCache(static_cast<size_t>(block_cache_size_mb * 1024 * 1024));
table_factory.reset(new BlockBasedTableFactory(block_based_options));
memtable_factory.reset(NewHashLinkListRepFactory());
memtable_prefix_bloom_size_ratio = 0.02;
return this;
}

View File

@ -6,6 +6,7 @@
#include <assert.h>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <thread>

View File

@ -10,6 +10,7 @@
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <unordered_map>
#include <vector>

View File

@ -62,6 +62,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
Status s;
std::vector<std::string> live_files;
uint64_t manifest_file_size = 0;
DBOptions db_options = db_->GetDBOptions();
uint64_t min_log_num = port::kMaxUint64;
uint64_t sequence_number = db_->GetLatestSequenceNumber();
bool same_fs = true;
VectorLogPtr live_wal_files;
@ -78,6 +80,35 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
if (s.ok()) {
// this will return live_files prefixed with "/"
s = db_->GetLiveFiles(live_files, &manifest_file_size);
if (s.ok() && db_options.allow_2pc) {
// If 2PC is enabled, we need to get minimum log number after the flush.
// Need to refetch the live files to recapture the snapshot.
if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
&min_log_num)) {
db_->EnableFileDeletions(false);
return Status::InvalidArgument(
"2PC enabled but cannot fine the min log number to keep.");
}
// We need to refetch live files with flush to handle this case:
// A previous 000001.log contains the prepare record of transaction tnx1.
// The current log file is 000002.log, and sequence_number points to this
// file.
// After calling GetLiveFiles(), 000003.log is created.
// Then tnx1 is committed. The commit record is written to 000003.log.
// Now we fetch min_log_num, which will be 3.
// Then only 000002.log and 000003.log will be copied, and 000001.log will
// be skipped. 000003.log contains commit message of tnx1, but we don't
// have respective prepare record for it.
// In order to avoid this situation, we need to force flush to make sure
// all transactions commited before getting min_log_num will be flushed
// to SST files.
// We cannot get min_log_num before calling the GetLiveFiles() for the
// first time, because if we do that, all the logs files will be included,
// far more than needed.
s = db_->GetLiveFiles(live_files, &manifest_file_size, /* flush */ true);
}
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
}
@ -91,7 +122,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
}
size_t wal_size = live_wal_files.size();
Log(db_->GetOptions().info_log,
Log(db_options.info_log,
"Started the snapshot process -- creating snapshot in directory %s",
checkpoint_dir.c_str());
@ -130,7 +161,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
// * if it's kDescriptorFile, limit the size to manifest_file_size
// * always copy if cross-device link
if ((type == kTableFile) && same_fs) {
Log(db_->GetOptions().info_log, "Hard Linking %s", src_fname.c_str());
Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());
s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname,
full_private_path + src_fname);
if (s.IsNotSupported()) {
@ -139,39 +170,41 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
}
}
if ((type != kTableFile) || (!same_fs)) {
Log(db_->GetOptions().info_log, "Copying %s", src_fname.c_str());
Log(db_options.info_log, "Copying %s", src_fname.c_str());
s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname,
full_private_path + src_fname,
(type == kDescriptorFile) ? manifest_file_size : 0);
(type == kDescriptorFile) ? manifest_file_size : 0,
db_options.use_fsync);
}
}
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
s = CreateFile(db_->GetEnv(), full_private_path + current_fname,
manifest_fname.substr(1) + "\n");
}
Log(db_->GetOptions().info_log, "Number of log files %" ROCKSDB_PRIszt,
Log(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
live_wal_files.size());
// Link WAL files. Copy exact size of last one because it is the only one
// that has changes after the last flush.
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(live_wal_files[i]->StartSequence() >= sequence_number)) {
(live_wal_files[i]->StartSequence() >= sequence_number ||
live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) {
Log(db_->GetOptions().info_log, "Copying %s",
Log(db_options.info_log, "Copying %s",
live_wal_files[i]->PathName().c_str());
s = CopyFile(db_->GetEnv(),
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
db_options.wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(),
live_wal_files[i]->SizeFileBytes());
live_wal_files[i]->SizeFileBytes(), db_options.use_fsync);
break;
}
if (same_fs) {
// we only care about live log files
Log(db_->GetOptions().info_log, "Hard Linking %s",
Log(db_options.info_log, "Hard Linking %s",
live_wal_files[i]->PathName().c_str());
s = db_->GetEnv()->LinkFile(
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
db_options.wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName());
if (s.IsNotSupported()) {
same_fs = false;
@ -179,11 +212,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
}
}
if (!same_fs) {
Log(db_->GetOptions().info_log, "Copying %s",
Log(db_options.info_log, "Copying %s",
live_wal_files[i]->PathName().c_str());
s = CopyFile(db_->GetEnv(),
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(), 0);
db_options.wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(), 0,
db_options.use_fsync);
}
}
}
@ -205,27 +239,26 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
if (!s.ok()) {
// clean all the files we might have created
Log(db_->GetOptions().info_log, "Snapshot failed -- %s",
s.ToString().c_str());
Log(db_options.info_log, "Snapshot failed -- %s", s.ToString().c_str());
// we have to delete the dir and all its children
std::vector<std::string> subchildren;
db_->GetEnv()->GetChildren(full_private_path, &subchildren);
for (auto& subchild : subchildren) {
std::string subchild_path = full_private_path + "/" + subchild;
Status s1 = db_->GetEnv()->DeleteFile(subchild_path);
Log(db_->GetOptions().info_log, "Delete file %s -- %s",
subchild_path.c_str(), s1.ToString().c_str());
Log(db_options.info_log, "Delete file %s -- %s", subchild_path.c_str(),
s1.ToString().c_str());
}
// finally delete the private dir
Status s1 = db_->GetEnv()->DeleteDir(full_private_path);
Log(db_->GetOptions().info_log, "Delete dir %s -- %s",
full_private_path.c_str(), s1.ToString().c_str());
Log(db_options.info_log, "Delete dir %s -- %s", full_private_path.c_str(),
s1.ToString().c_str());
return s;
}
// here we know that we succeeded and installed the new snapshot
Log(db_->GetOptions().info_log, "Snapshot DONE. All is good");
Log(db_->GetOptions().info_log, "Snapshot sequence number: %" PRIu64,
Log(db_options.info_log, "Snapshot DONE. All is good");
Log(db_options.info_log, "Snapshot sequence number: %" PRIu64,
sequence_number);
return s;

View File

@ -21,6 +21,7 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/xfunc.h"
@ -390,6 +391,120 @@ TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing) {
snapshotDB = nullptr;
}
TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing2PC) {
const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot";
const std::string dbname = test::TmpDir() + "/transaction_testdb";
ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions()));
ASSERT_OK(DestroyDB(dbname, CurrentOptions()));
env_->DeleteDir(kSnapshotName);
env_->DeleteDir(dbname);
Close();
Options options = CurrentOptions();
// allow_2pc is implicitly set with tx prepare
// options.allow_2pc = true;
TransactionDBOptions txn_db_options;
TransactionDB* txdb;
Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb);
assert(s.ok());
ColumnFamilyHandle* cfa;
ColumnFamilyHandle* cfb;
ColumnFamilyOptions cf_options;
ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFA", &cfa));
WriteOptions write_options;
// Insert something into CFB so lots of log files will be kept
// before creating the checkpoint.
ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFB", &cfb));
ASSERT_OK(txdb->Put(write_options, cfb, "", ""));
ReadOptions read_options;
std::string value;
TransactionOptions txn_options;
Transaction* txn = txdb->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid");
ASSERT_OK(s);
ASSERT_EQ(txdb->GetTransactionByName("xid"), txn);
s = txn->Put(Slice("foo"), Slice("bar"));
s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa"));
ASSERT_OK(s);
// Writing prepare into middle of first WAL, then flush WALs many times
for (int i = 1; i <= 100000; i++) {
Transaction* tx = txdb->BeginTransaction(write_options, txn_options);
ASSERT_OK(tx->SetName("x"));
ASSERT_OK(tx->Put(Slice(std::to_string(i)), Slice("val")));
ASSERT_OK(tx->Put(cfa, Slice("aaa"), Slice("111")));
ASSERT_OK(tx->Prepare());
ASSERT_OK(tx->Commit());
if (i % 10000 == 0) {
txdb->Flush(FlushOptions());
}
if (i == 88888) {
ASSERT_OK(txn->Prepare());
}
}
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"},
{"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit",
"CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread t([&]() {
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(txdb, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName));
delete checkpoint;
});
TEST_SYNC_POINT("DBTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit");
ASSERT_OK(txn->Commit());
TEST_SYNC_POINT(
"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit");
t.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// No more than two logs files should exist.
std::vector<std::string> files;
env_->GetChildren(kSnapshotName, &files);
int num_log_files = 0;
for (auto& file : files) {
uint64_t num;
FileType type;
WalFileType log_type;
if (ParseFileName(file, &num, &type, &log_type) && type == kLogFile) {
num_log_files++;
}
}
// One flush after preapare + one outstanding file before checkpoint + one log
// file generated after checkpoint.
ASSERT_LE(num_log_files, 3);
TransactionDB* snapshotDB;
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
column_families.push_back(
ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
column_families.push_back(
ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
std::vector<rocksdb::ColumnFamilyHandle*> cf_handles;
ASSERT_OK(TransactionDB::Open(options, txn_db_options, kSnapshotName,
column_families, &cf_handles, &snapshotDB));
ASSERT_OK(snapshotDB->Get(read_options, "foo", &value));
ASSERT_EQ(value, "bar");
ASSERT_OK(snapshotDB->Get(read_options, cf_handles[1], "foocfa", &value));
ASSERT_EQ(value, "barcfa");
delete cfa;
delete cfb;
delete cf_handles[0];
delete cf_handles[1];
delete cf_handles[2];
delete snapshotDB;
snapshotDB = nullptr;
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -9,6 +9,7 @@
#ifndef OS_WIN
#include <unistd.h>
#endif
#include <functional>
#include <memory>
#include <vector>

View File

@ -6,6 +6,7 @@
#ifndef ROCKSDB_LITE
#include <functional>
#include <list>
#include <memory>
#include <string>

View File

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE
#include <functional>
#include "util/random.h"
#include "utilities/persistent_cache/hash_table.h"
#include "utilities/persistent_cache/lrulist.h"

View File

@ -1178,6 +1178,108 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
delete cfb;
}
TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
Status s;
ColumnFamilyHandle *cfa, *cfb;
ColumnFamilyOptions cf_options;
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
ASSERT_OK(s);
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
ASSERT_OK(s);
WriteOptions wopts;
wopts.disableWAL = false;
wopts.sync = true;
auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
TransactionOptions topts1;
Transaction* txn1 = db->BeginTransaction(wopts, topts1);
s = txn1->SetName("xid1");
ASSERT_OK(s);
s = txn1->Put(cfa, "boys", "girls1");
ASSERT_OK(s);
Transaction* txn2 = db->BeginTransaction(wopts, topts1);
s = txn2->SetName("xid2");
ASSERT_OK(s);
s = txn2->Put(cfb, "up", "down1");
ASSERT_OK(s);
// prepre transaction in LOG A
s = txn1->Prepare();
ASSERT_OK(s);
// prepre transaction in LOG A
s = txn2->Prepare();
ASSERT_OK(s);
// regular put so that mem table can actually be flushed for log rolling
s = db->Put(wopts, "cats", "dogs1");
ASSERT_OK(s);
auto prepare_log_no = txn1->GetLogNumber();
// roll to LOG B
s = db_impl->TEST_FlushMemTable(true);
ASSERT_OK(s);
// now we pause background work so that
// imm()s are not flushed before we can check their status
s = db_impl->PauseBackgroundWork();
ASSERT_OK(s);
ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
prepare_log_no);
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
// commit in LOG B
s = txn1->Commit();
ASSERT_OK(s);
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no);
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
// request a flush for all column families such that the earliest
// alive log file can be killed
db_impl->TEST_MaybeFlushColumnFamilies();
// log cannot be flushed because txn2 has not been commited
ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog());
// assert that cfa has a flush requested
ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
// cfb should not be flushed becuse it has no data from LOG A
ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
// cfb now has data from LOG A
s = txn2->Commit();
ASSERT_OK(s);
db_impl->TEST_MaybeFlushColumnFamilies();
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
// we should see that cfb now has a flush requested
ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
// all data in LOG A resides in a memtable that has been
// requested for a flush
ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
delete txn1;
delete txn2;
delete cfa;
delete cfb;
}
/*
* 1) use prepare to keep first log around to determine starting sequence
* during recovery.