Compare commits

...

19 Commits

Author SHA1 Message Date
sdong
32fda17a2e Disable error as warning 2019-11-05 11:05:04 -08:00
sdong
2594970c39 Add one more #include<functional> 2019-11-05 11:05:03 -08:00
sdong
1b4234f331 Add some include<functional> 2019-10-31 14:23:01 -07:00
sdong
54ceceafb3 [FB Internal] Point to the latest tool chain. 2019-10-31 14:22:10 -07:00
sdong
9ec602f5f7 [fb only] revert unintended change of USE_SSE
The previuos change that use gcc-5 set USE_SSE to wrong flag by mistake. Fix it.
2017-07-17 22:22:50 -07:00
sdong
6d1896f4ca [FB Only] use gcc-5 2017-07-17 21:56:58 -07:00
sdong
98084910a7 Release 4.11.2 2016-09-15 18:11:36 -07:00
Andrew Kryczka
fbd1c98255 Fix recovery for WALs without data for all CFs
Summary:
if one or more CFs had no data in the WAL, the log number that's used
by FindObsoleteFiles() wasn't updated. We need to treat this case the same as
if the data for that WAL had been flushed.

Test Plan: new unit test

Reviewers: IslamAbdelRahman, yiwu, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D63963
2016-09-15 18:03:33 -07:00
Andrew Kryczka
c743289d81 Fix GetSortedWalFiles when log recycling enabled
Summary:
Previously the sequence number was mistakenly passed in an argument
where the log number should go. This caused the reader to assume the old WAL
format was used, which is incompatible with the WAL recycling format.

Test Plan:
new unit test, verified it fails before this change and passes
afterwards.

Reviewers: yiwu, lightmark, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D63987
2016-09-15 18:03:24 -07:00
Mike Kolupaev
2be165b291 Fix a crash when compaction fails to open a file
Summary:
We've got a crash with this stack trace:

  Program terminated with signal SIGTRAP, Trace/breakpoint trap.

  #0  0x00007fc85f2f4009 in raise () from /usr/local/fbcode/gcc-4.9-glibc-2.20-fb/lib/libpthread.so.0
  #1  0x00000000005c8f61 in facebook::logdevice::handle_sigsegv(int) () at logdevice/server/sigsegv.cpp:159
  #2  0x00007fc85f2f4150 in <signal handler called> () at /usr/local/fbcode/gcc-4.9-glibc-2.20-fb/lib/libpthread.so.0
  #3  0x00000000031ed80c in rocksdb::NewReadaheadRandomAccessFile() at util/file_reader_writer.cc:383
  #4  0x00000000031ed80c in rocksdb::NewReadaheadRandomAccessFile() at util/file_reader_writer.cc:472
  #5  0x00000000031558e7 in rocksdb::TableCache::GetTableReader() at db/table_cache.cc:99
  #6  0x0000000003156329 in rocksdb::TableCache::NewIterator() at db/table_cache.cc:198
  #7  0x0000000003166568 in rocksdb::VersionSet::MakeInputIterator() at db/version_set.cc:3345
  #8  0x000000000324a94f in rocksdb::CompactionJob::ProcessKeyValueCompaction(rocksdb::CompactionJob::SubcompactionState*) () at db/compaction_job.cc:650
  #9  0x000000000324c2f6 in rocksdb::CompactionJob::Run() () at db/compaction_job.cc:530
  #10 0x00000000030f5ae5 in rocksdb::DBImpl::BackgroundCompaction() at db/db_impl.cc:3269
  #11 0x0000000003108d36 in rocksdb::DBImpl::BackgroundCallCompaction(void*) () at db/db_impl.cc:2970
  #12 0x00000000029a2a9a in facebook::logdevice::RocksDBEnv::callback(void*) () at logdevice/server/locallogstore/RocksDBEnv.cpp:26
  #13 0x00000000029a2a9a in facebook::logdevice::RocksDBEnv::callback(void*) () at logdevice/server/locallogstore/RocksDBEnv.cpp:30
  #14 0x00000000031e7521 in rocksdb::ThreadPool::BGThread() at util/threadpool.cc:230
  #15 0x00000000031e7663 in rocksdb::BGThreadWrapper(void*) () at util/threadpool.cc:254
  #16 0x00007fc85f2ea7f1 in start_thread () at /usr/local/fbcode/gcc-4.9-glibc-2.20-fb/lib/libpthread.so.0
  #17 0x00007fc85e8fb46d in clone () at /usr/local/fbcode/gcc-4.9-glibc-2.20-fb/lib/libc.so.6

From looking at the code, probably what happened is this:
 - `TableCache::GetTableReader()` called `Env::NewRandomAccessFile()`, which dispatched to a `PosixEnv::NewRandomAccessFile()`, where probably an `open()` call failed, so the `NewRandomAccessFile()` left a nullptr in the resulting file,
 - `TableCache::GetTableReader()` called `NewReadaheadRandomAccessFile()` with that `nullptr` file,
 - it tried to call file's method and crashed.

This diff is a trivial fix to this crash.

Test Plan: `make -j check`

Reviewers: sdong, andrewkr, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D62451
2016-09-15 17:59:07 -07:00
sdong
0f4c98d305 Increase version to 4.11.1
Summary:
After back-porting two bug fixes, increase the version to 4.11.1
2016-08-30 11:55:08 -07:00
Aaron Gao
c75f4faa9d fix data race in NewIndexIterator() in block_based_table_reader.cc
Summary: fixed data race described in https://github.com/facebook/rocksdb/issues/1267 and add regression test

Test Plan:
./table_test --gtest_filter=BlockBasedTableTest.NewIndexIteratorLeak
make all check -j64
core dump before fix. ok after fix.

Reviewers: andrewkr, sdong

Reviewed By: sdong

Subscribers: igor, andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D62361
2016-08-30 11:37:23 -07:00
sdong
56f497bb21 Mitigate regression bug of options.max_successive_merges hit during DB Recovery
Summary:
After 1b8a2e8fdd, DB Pointer is passed to WriteBatchInternal::InsertInto() while DB recovery. This can cause deadlock if options.max_successive_merges hits. In that case DB::Get() will be called. Get() will try to acquire the DB mutex, which is already held by the DB::Open(), causing a deadlock condition.

This commit mitigates the problem by not passing the DB pointer unless 2PC is allowed.

Test Plan: Add a new test and run it.

Reviewers: IslamAbdelRahman, andrewkr, kradhakrishnan, horuff

Reviewed By: kradhakrishnan

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D62625
2016-08-30 11:24:55 -07:00
sdong
051c88dd79 Compaction stats printing: "batch" => "commit group"
Summary: "Batch" is ambiguous in this context. It can mean "write batch" or commit group. Change it to commit group to be clear.

Test Plan: Build

Reviewers: MarkCallaghan, yhchiang, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D62055
2016-08-15 11:26:01 -07:00
Yueh-Hsuan Chiang
071893eb27 Fix a destruction order issue in ThreadStatusUpdater
Summary:
Env holds a pointer of ThreadStatusUpdater, which will be deleted when
Env is deleted.  However, in case a rocksdb database is deleted after
Env is deleted.  Then this will introduce a free-after-use of this
ThreadStatusUpdater.

This patch fix this by never deleting the ThreadStatusUpdater in Env,
which is in general safe as Env is a singleton in most cases.

Test Plan: thread_list_test

Reviewers: andrewkr, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D59187
2016-08-15 10:41:53 -07:00
sdong
7d7234a9c8 Change HISTORY.md for release 4.11
Summary:
Need to change HISTORY.md for 4.11.
4.10 was not updated either. Update it together.

Test Plan: Not needed.

Reviewers: kradhakrishnan, andrewkr, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61665
2016-08-10 13:19:48 -07:00
sdong
d641bb8fb0 read_options.background_purge_on_iterator_cleanup to cover forward iterator and log file closing too.
Summary: With read_options.background_purge_on_iterator_cleanup=true, File deletion and closing can still happen in forward iterator, or WAL file closing. Cover those cases too.

Test Plan: I am adding unit tests.

Reviewers: andrewkr, IslamAbdelRahman, yiwu

Reviewed By: yiwu

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61503
2016-08-10 13:19:10 -07:00
sdong
e98b76dd9b A utility function to help users migrate DB after options change
Summary: Add a utility function that trigger necessary full compaction and put output to the correct level by looking at new options and old options.

Test Plan: Add unit tests for it.

Reviewers: andrewkr, igor, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: muthu, sumeet, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60783
2016-08-05 16:09:33 -07:00
Yi Wu
4001e799ab Ignore write stall triggers when auto-compaction is disabled
Summary:
My understanding is that the purpose of write stall triggers are to wait for auto-compaction to catch up. Without auto-compaction, we don't need to stall writes.

Also with this diff, flush/compaction conditions are recalculated on dynamic option change. Previously the conditions are recalculate only when write stall options are changed.

Test Plan: See the new test. Removed two tests that are no longer valid.

Reviewers: IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D61437
2016-08-05 16:09:12 -07:00
36 changed files with 1033 additions and 202 deletions

View File

@ -264,6 +264,7 @@ set(SOURCES
utilities/merge_operators/put.cc utilities/merge_operators/put.cc
utilities/merge_operators/max.cc utilities/merge_operators/max.cc
utilities/merge_operators/uint64add.cc utilities/merge_operators/uint64add.cc
utilities/option_change_migration/option_change_migration.cc
utilities/options/options_util.cc utilities/options/options_util.cc
utilities/persistent_cache/persistent_cache_tier.cc utilities/persistent_cache/persistent_cache_tier.cc
utilities/persistent_cache/volatile_tier_impl.cc utilities/persistent_cache/volatile_tier_impl.cc
@ -432,6 +433,7 @@ set(TESTS
utilities/geodb/geodb_test.cc utilities/geodb/geodb_test.cc
utilities/memory/memory_test.cc utilities/memory/memory_test.cc
utilities/merge_operators/string_append/stringappend_test.cc utilities/merge_operators/string_append/stringappend_test.cc
utilities/option_change_migration/option_change_migration_test.cc
utilities/options/options_util_test.cc utilities/options/options_util_test.cc
utilities/persistent_cache/hash_table_test.cc utilities/persistent_cache/hash_table_test.cc
utilities/persistent_cache/persistent_cache_test.cc utilities/persistent_cache/persistent_cache_test.cc

View File

@ -1,10 +1,27 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## 4.11.2 (9/15/2016)
### Bug fixes
* Segfault when failing to open an SST file for read-ahead iterators.
* WAL without data for all CFs is not deleted after recovery.
## 4.11.1 (8/30/2016)
### Bug Fixes
* Mitigate the regression bug of deadlock condition during recovery when options.max_successive_merges hits.
* Fix data race condition related to hash index in block based table when putting indexes in the block cache.
## 4.11.0 (8/1/2016)
### Public API Change
* options.memtable_prefix_bloom_huge_page_tlb_size => memtable_huge_page_size. When it is set, RocksDB will try to allocate memory from huge page for memtable too, rather than just memtable bloom filter.
### New Features
* A tool to migrate DB after options change. See include/rocksdb/utilities/option_change_migration.h.
* Add ReadOptions.background_purge_on_iterator_cleanup. If true, we avoid file deletion when destorying iterators.
## 4.10.0 (7/5/2016)
### Public API Change ### Public API Change
* options.memtable_prefix_bloom_bits changes to options.memtable_prefix_bloom_bits_ratio and deprecate options.memtable_prefix_bloom_probes * options.memtable_prefix_bloom_bits changes to options.memtable_prefix_bloom_bits_ratio and deprecate options.memtable_prefix_bloom_probes
* enum type CompressionType and PerfLevel changes from char to unsigned char. Value of all PerfLevel shift by one. * enum type CompressionType and PerfLevel changes from char to unsigned char. Value of all PerfLevel shift by one.
* Deprecate options.filter_deletes. * Deprecate options.filter_deletes.
* options.memtable_prefix_bloom_huge_page_tlb_size => memtable_huge_page_size. When it is set, RocksDB will try to allocate memory from huge page for memtable too, rather than just memtable bloom filter.
### New Features ### New Features
* Add avoid_flush_during_recovery option. * Add avoid_flush_during_recovery option.

View File

@ -216,10 +216,6 @@ default: all
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \ WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
-Wno-unused-parameter -Wno-unused-parameter
ifndef DISABLE_WARNING_AS_ERROR
WARNING_FLAGS += -Werror
endif
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers
@ -374,6 +370,7 @@ TESTS = \
heap_test \ heap_test \
compact_on_deletion_collector_test \ compact_on_deletion_collector_test \
compaction_job_stats_test \ compaction_job_stats_test \
option_change_migration_test \
transaction_test \ transaction_test \
ldb_cmd_test \ ldb_cmd_test \
iostats_context_test \ iostats_context_test \
@ -865,7 +862,7 @@ arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
autovector_test: util/autovector_test.o $(LIBOBJECTS) $(TESTHARNESS) autovector_test: util/autovector_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
column_family_test: db/column_family_test.o $(LIBOBJECTS) $(TESTHARNESS) column_family_test: db/column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
table_properties_collector_test: db/table_properties_collector_test.o $(LIBOBJECTS) $(TESTHARNESS) table_properties_collector_test: db/table_properties_collector_test.o $(LIBOBJECTS) $(TESTHARNESS)
@ -886,6 +883,9 @@ cache_test: util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS)
coding_test: util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS) coding_test: util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
option_change_migration_test: utilities/option_change_migration/option_change_migration_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS) stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

View File

@ -52,12 +52,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
FBCODE_BUILD="true" FBCODE_BUILD="true"
# If we're compiling with TSAN we need pic build # If we're compiling with TSAN we need pic build
PIC_BUILD=$COMPILE_WITH_TSAN PIC_BUILD=$COMPILE_WITH_TSAN
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then source "$PWD/build_tools/fbcode_config.sh"
source "$PWD/build_tools/fbcode_config.sh"
else
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
fi
fi fi
# Delete existing output, if it exists # Delete existing output, if it exists

View File

@ -1,16 +1,19 @@
GCC_BASE=/mnt/gvfs/third-party2/gcc/ebc96bc2fb751b5a0300b8d91a95bdf24ac1d88b/4.9.x/centos6-native/108cf83 # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/b91de48a4974ec839946d824402b098d43454cef/stable/centos6-native/7aaccbe GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/53e0eac8911888a105aa98b9a35fe61cf1d8b278/4.9.x/gcc-4.9-glibc-2.20/024dbc3 CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/ee36ac9a72dfac4a995f1b215bb4c0fc8a0f6f91/2.20/gcc-4.9-glibc-2.20/500e281 LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2b24f1e99454f9ca7b0301720f94836dae1bf71b/1.2.8/gcc-5-glibc-2.23/9bc6787 SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/af7c14c9b652cdd5ec34eadd25c3f38a9b306c5d/1.0.6/gcc-5-glibc-2.23/9bc6787 ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787 BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/1408484d03b15492aa54b10356104e9dc22e1cc5/0.6.1/gcc-5-glibc-2.23/9bc6787 LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/5a5c7a6608cb32f1e1e7f814023d5bdfbd136370/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/a4c2adecffcaa68d5585d06be2252e3efa52555b/master/gcc-5-glibc-2.23/1c32b4b GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
NUMA_BASE=/mnt/gvfs/third-party2/numa/1abc0d3c01743b854676423cf2d3629912f34930/2.0.8/gcc-5-glibc-2.23/9bc6787 JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/f24be37d170e04be6e469af487644d4d62e1c6c1/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/9d65c666b9adf8f2a989fd4b98a9a5e7d3afa233/2.26/centos6-native/da39a3e TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/9cee5a3628dc9d4b93897972c58eba865e25b270/3.10.0/gcc-4.9-glibc-2.20/e9936bf KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832

View File

@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
CFLAGS="" CFLAGS=""
# libgcc # libgcc
LIBGCC_INCLUDE="$LIBGCC_BASE/include" LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib" LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
# glibc # glibc
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
LZ4_INCLUDE=" -I $LZ4_BASE/include/" LZ4_INCLUDE=" -I $LZ4_BASE/include/"
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a" LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
CFLAGS+=" -DLZ4" CFLAGS+=" -DLZ4"
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
CFLAGS+=" -DZSTD"
fi fi
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
if test -z $PIC_BUILD; then
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
# location of gflags headers and libraries # location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/" GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
if test -z $PIC_BUILD; then if test -z $PIC_BUILD; then
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
else else
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a" GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
fi fi
CFLAGS+=" -DGFLAGS=google" CFLAGS+=" -DGFLAGS=gflags"
# location of jemalloc # location of jemalloc
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/" JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
@ -72,13 +76,22 @@ if test -z $PIC_BUILD; then
LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a" LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a"
fi fi
# location of TBB
TBB_INCLUDE=" -isystem $TBB_BASE/include/"
if test -z $PIC_BUILD; then
TBB_LIBS="$TBB_BASE/lib/libtbb.a"
else
TBB_LIBS="$TBB_BASE/lib/libtbb_pic.a"
fi
CFLAGS+=" -DTBB"
# use Intel SSE support for checksum calculations # use Intel SSE support for checksum calculations
export USE_SSE=1 export USE_SSE=1
BINUTILS="$BINUTILS_BASE/bin" BINUTILS="$BINUTILS_BASE/bin"
AR="$BINUTILS/ar" AR="$BINUTILS/ar"
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE" DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
STDLIBS="-L $GCC_BASE/lib64" STDLIBS="-L $GCC_BASE/lib64"
@ -95,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
CXX="$GCC_BASE/bin/g++" CXX="$GCC_BASE/bin/g++"
CFLAGS+=" -B$BINUTILS/gold" CFLAGS+=" -B$BINUTILS/gold"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE" CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
JEMALLOC=1 JEMALLOC=1
else else
# clang # clang
@ -107,8 +120,8 @@ else
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include" KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib" CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x " CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux " CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $GLIBC_INCLUDE" CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE" CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $CLANG_INCLUDE" CFLAGS+=" -isystem $CLANG_INCLUDE"
@ -119,18 +132,21 @@ else
fi fi
CFLAGS+=" $DEPS_INCLUDE" CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE" CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
CXXFLAGS+=" $CFLAGS" CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB" EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so" EXEC_LDFLAGS+=" -B$BINUTILS/gold"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND" EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib" EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
# required by libtbb
EXEC_LDFLAGS+=" -ldl"
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++" PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS" EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
VALGRIND_VER="$VALGRIND_BASE/bin/" VALGRIND_VER="$VALGRIND_BASE/bin/"
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD

View File

@ -558,8 +558,9 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"(waiting for flush), max_write_buffer_number is set to %d", "(waiting for flush), max_write_buffer_number is set to %d",
name_.c_str(), imm()->NumNotFlushed(), name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number); mutable_cf_options.max_write_buffer_number);
} else if (vstorage->l0_delay_trigger_count() >= } else if (!mutable_cf_options.disable_auto_compactions &&
mutable_cf_options.level0_stop_writes_trigger) { vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken(); write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1); internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) { if (compaction_picker_->IsLevel0CompactionInProgress()) {
@ -569,7 +570,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stopping writes because we have %d level-0 files", "[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count()); name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && } else if (!mutable_cf_options.disable_auto_compactions &&
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
compaction_needed_bytes >= compaction_needed_bytes >=
mutable_cf_options.hard_pending_compaction_bytes_limit) { mutable_cf_options.hard_pending_compaction_bytes_limit) {
write_controller_token_ = write_controller->GetStopToken(); write_controller_token_ = write_controller->GetStopToken();
@ -594,7 +596,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), imm()->NumNotFlushed(), name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number, mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate()); write_controller->delayed_write_rate());
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && } else if (!mutable_cf_options.disable_auto_compactions &&
mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
vstorage->l0_delay_trigger_count() >= vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_slowdown_writes_trigger) { mutable_cf_options.level0_slowdown_writes_trigger) {
write_controller_token_ = write_controller_token_ =
@ -611,7 +614,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"rate %" PRIu64, "rate %" PRIu64,
name_.c_str(), vstorage->l0_delay_trigger_count(), name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate()); write_controller->delayed_write_rate());
} else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && } else if (!mutable_cf_options.disable_auto_compactions &&
mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
vstorage->estimated_compaction_needed_bytes() >= vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit) { mutable_cf_options.soft_pending_compaction_bytes_limit) {
write_controller_token_ = write_controller_token_ =

View File

@ -2413,6 +2413,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
mutable_cf_options.level0_stop_writes_trigger = 10000; mutable_cf_options.level0_stop_writes_trigger = 10000;
mutable_cf_options.soft_pending_compaction_bytes_limit = 200; mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
mutable_cf_options.disable_auto_compactions = false;
vstorage->TEST_set_estimated_compaction_needed_bytes(50); vstorage->TEST_set_estimated_compaction_needed_bytes(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2559,16 +2560,17 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
vstorage->set_l0_delay_trigger_count(50); vstorage->set_l0_delay_trigger_count(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(60); vstorage->set_l0_delay_trigger_count(60);
vstorage->TEST_set_estimated_compaction_needed_bytes(300); vstorage->TEST_set_estimated_compaction_needed_bytes(300);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
mutable_cf_options.disable_auto_compactions = false;
vstorage->set_l0_delay_trigger_count(70); vstorage->set_l0_delay_trigger_count(70);
vstorage->TEST_set_estimated_compaction_needed_bytes(500); vstorage->TEST_set_estimated_compaction_needed_bytes(500);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2576,7 +2578,6 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
mutable_cf_options.disable_auto_compactions = false;
vstorage->set_l0_delay_trigger_count(71); vstorage->set_l0_delay_trigger_count(71);
vstorage->TEST_set_estimated_compaction_needed_bytes(501); vstorage->TEST_set_estimated_compaction_needed_bytes(501);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2803,6 +2804,217 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
} }
#ifndef ROCKSDB_LITE
TEST_F(ColumnFamilyTest, FlushCloseWALFiles) {
SpecialEnv env(Env::Default());
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
Open();
CreateColumnFamilies({"one"});
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(0, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
// Block flush jobs from running
test::SleepingBackgroundTask sleeping_task;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::HIGH);
WriteOptions wo;
wo.sync = true;
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
ASSERT_EQ(2, env.num_open_wal_file_.load());
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
WaitForFlush(1);
ASSERT_EQ(1, env.num_open_wal_file_.load());
Reopen();
ASSERT_EQ("mirko", Get(0, "fodor"));
ASSERT_EQ("mirko", Get(1, "fodor"));
db_options_.env = env_;
Close();
}
#endif // !ROCKSDB_LITE
TEST_F(ColumnFamilyTest, IteratorCloseWALFile1) {
SpecialEnv env(Env::Default());
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
Open();
CreateColumnFamilies({"one"});
ASSERT_OK(Put(1, "fodor", "mirko"));
// Create an iterator holding the current super version.
Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]);
// A flush will make `it` hold the last reference of its super version.
Flush(1);
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(0, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
// Flush jobs will close previous WAL files after finishing. By
// block flush jobs from running, we trigger a condition where
// the iterator destructor should close the WAL files.
test::SleepingBackgroundTask sleeping_task;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::HIGH);
WriteOptions wo;
wo.sync = true;
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
ASSERT_EQ(2, env.num_open_wal_file_.load());
// Deleting the iterator will clear its super version, triggering
// closing all files
delete it;
ASSERT_EQ(1, env.num_open_wal_file_.load());
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
WaitForFlush(1);
Reopen();
ASSERT_EQ("mirko", Get(0, "fodor"));
ASSERT_EQ("mirko", Get(1, "fodor"));
db_options_.env = env_;
Close();
}
TEST_F(ColumnFamilyTest, IteratorCloseWALFile2) {
SpecialEnv env(Env::Default());
// Allow both of flush and purge job to schedule.
env.SetBackgroundThreads(2, Env::HIGH);
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
Open();
CreateColumnFamilies({"one"});
ASSERT_OK(Put(1, "fodor", "mirko"));
// Create an iterator holding the current super version.
ReadOptions ro;
ro.background_purge_on_iterator_cleanup = true;
Iterator* it = db_->NewIterator(ro, handles_[1]);
// A flush will make `it` hold the last reference of its super version.
Flush(1);
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(0, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"ColumnFamilyTest::IteratorCloseWALFile2:0",
"DBImpl::BGWorkPurge:start"},
{"ColumnFamilyTest::IteratorCloseWALFile2:2",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
wo.sync = true;
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
ASSERT_EQ(2, env.num_open_wal_file_.load());
// Deleting the iterator will clear its super version, triggering
// closing all files
delete it;
ASSERT_EQ(2, env.num_open_wal_file_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
ASSERT_EQ(1, env.num_open_wal_file_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
WaitForFlush(1);
ASSERT_EQ(1, env.num_open_wal_file_.load());
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Reopen();
ASSERT_EQ("mirko", Get(0, "fodor"));
ASSERT_EQ("mirko", Get(1, "fodor"));
db_options_.env = env_;
Close();
}
#ifndef ROCKSDB_LITE // TEST functions are not supported in lite
TEST_F(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
SpecialEnv env(Env::Default());
// Allow both of flush and purge job to schedule.
env.SetBackgroundThreads(2, Env::HIGH);
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(3));
column_family_options_.level0_file_num_compaction_trigger = 2;
Open();
CreateColumnFamilies({"one"});
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodar2", "mirko"));
Flush(1);
// Create an iterator holding the current super version, as well as
// the SST file just flushed.
ReadOptions ro;
ro.tailing = true;
ro.background_purge_on_iterator_cleanup = true;
Iterator* it = db_->NewIterator(ro, handles_[1]);
// A flush will make `it` hold the last reference of its super version.
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodar2", "mirko"));
Flush(1);
WaitForCompaction();
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(0, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"ColumnFamilyTest::IteratorCloseWALFile2:0",
"DBImpl::BGWorkPurge:start"},
{"ColumnFamilyTest::IteratorCloseWALFile2:2",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
wo.sync = true;
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
env.delete_count_.store(0);
ASSERT_EQ(2, env.num_open_wal_file_.load());
// Deleting the iterator will clear its super version, triggering
// closing all files
it->Seek("");
ASSERT_EQ(2, env.num_open_wal_file_.load());
ASSERT_EQ(0, env.delete_count_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
ASSERT_EQ(1, env.num_open_wal_file_.load());
ASSERT_EQ(1, env.delete_count_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
WaitForFlush(1);
ASSERT_EQ(1, env.num_open_wal_file_.load());
ASSERT_EQ(1, env.delete_count_.load());
delete it;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Reopen();
ASSERT_EQ("mirko", Get(0, "fodor"));
ASSERT_EQ("mirko", Get(1, "fodor"));
db_options_.env = env_;
Close();
}
#endif // !ROCKSDB_LITE
// Disable on windows because SyncWAL requires env->IsSyncThreadSafe() // Disable on windows because SyncWAL requires env->IsSyncThreadSafe()
// to return true which is not so in unbuffered mode. // to return true which is not so in unbuffered mode.
#ifndef OS_WIN #ifndef OS_WIN

View File

@ -716,6 +716,16 @@ uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
return min_log; return min_log;
} }
void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
if (!job_context->logs_to_free.empty()) {
for (auto l : job_context->logs_to_free) {
AddToLogsToFreeQueue(l);
}
job_context->logs_to_free.clear();
SchedulePurge();
}
}
// * Returns the list of live files in 'sst_live' // * Returns the list of live files in 'sst_live'
// If it's doing full scan: // If it's doing full scan:
// * Returns the list of all files in the filesystem in // * Returns the list of all files in the filesystem in
@ -1566,10 +1576,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// we just ignore the update. // we just ignore the update.
// That's why we set ignore missing column families to true // That's why we set ignore missing column families to true
bool has_valid_writes = false; bool has_valid_writes = false;
// If we pass DB through and options.max_successive_merges is hit
// during recovery, Get() will be issued which will try to acquire
// DB mutex and cause deadlock, as DB mutex is already held.
// The DB pointer is not needed unless 2PC is used.
// TODO(sdong) fix the allow_2pc case too.
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true, &batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, false /* concurrent_memtable_writes */, log_number, db_options_.allow_2pc ? this : nullptr,
next_sequence, &has_valid_writes); false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes);
// If it is the first log file and there is no column family updated // If it is the first log file and there is no column family updated
// after replaying the file, this file may be a stale file. We ignore // after replaying the file, this file may be a stale file. We ignore
// sequence IDs from the file. Otherwise, if a newer stale log file that // sequence IDs from the file. Otherwise, if a newer stale log file that
@ -1685,7 +1701,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// recovered and should be ignored on next reincarnation. // recovered and should be ignored on next reincarnation.
// Since we already recovered max_log_number, we want all logs // Since we already recovered max_log_number, we want all logs
// with numbers `<= max_log_number` (includes this one) to be ignored // with numbers `<= max_log_number` (includes this one) to be ignored
if (flushed) { if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
edit->SetLogNumber(max_log_number + 1); edit->SetLogNumber(max_log_number + 1);
} }
// we must mark the next log number as used, even though it's // we must mark the next log number as used, even though it's
@ -2358,20 +2374,6 @@ void DBImpl::NotifyOnCompactionCompleted(
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
bool DBImpl::NeedFlushOrCompaction(const MutableCFOptions& base_options,
const MutableCFOptions& new_options) {
return (base_options.disable_auto_compactions &&
!new_options.disable_auto_compactions) ||
base_options.level0_slowdown_writes_trigger <
new_options.level0_slowdown_writes_trigger ||
base_options.level0_stop_writes_trigger <
new_options.level0_stop_writes_trigger ||
base_options.soft_pending_compaction_bytes_limit <
new_options.soft_pending_compaction_bytes_limit ||
base_options.hard_pending_compaction_bytes_limit <
new_options.hard_pending_compaction_bytes_limit;
}
Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& options_map) { const std::unordered_map<std::string, std::string>& options_map) {
#ifdef ROCKSDB_LITE #ifdef ROCKSDB_LITE
@ -2385,7 +2387,6 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
return Status::InvalidArgument("empty input"); return Status::InvalidArgument("empty input");
} }
MutableCFOptions prev_options = *cfd->GetLatestMutableCFOptions();
MutableCFOptions new_options; MutableCFOptions new_options;
Status s; Status s;
Status persist_options_status; Status persist_options_status;
@ -2394,14 +2395,12 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
s = cfd->SetOptions(options_map); s = cfd->SetOptions(options_map);
if (s.ok()) { if (s.ok()) {
new_options = *cfd->GetLatestMutableCFOptions(); new_options = *cfd->GetLatestMutableCFOptions();
if (NeedFlushOrCompaction(prev_options, new_options)) { // Trigger possible flush/compactions. This has to be before we persist
// Trigger possible flush/compactions. This has to be before we persist // options to file, otherwise there will be a deadlock with writer
// options to file, otherwise there will be a deadlock with writer // thread.
// thread. auto* old_sv =
auto* old_sv = InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); delete old_sv;
delete old_sv;
}
// Persist RocksDB options under the single write thread // Persist RocksDB options under the single write thread
WriteThread::Writer w; WriteThread::Writer w;
@ -3005,8 +3004,9 @@ void DBImpl::BGWorkCompaction(void* arg) {
void DBImpl::BGWorkPurge(void* db) { void DBImpl::BGWorkPurge(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
TEST_SYNC_POINT("DBImpl::BGWorkPurge"); TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge(); reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
} }
void DBImpl::UnscheduleCallback(void* arg) { void DBImpl::UnscheduleCallback(void* arg) {
@ -3021,20 +3021,32 @@ void DBImpl::UnscheduleCallback(void* arg) {
void DBImpl::BackgroundCallPurge() { void DBImpl::BackgroundCallPurge() {
mutex_.Lock(); mutex_.Lock();
while (!purge_queue_.empty()) { // We use one single loop to clear both queues so that after existing the loop
auto purge_file = purge_queue_.begin(); // both queues are empty. This is stricter than what is needed, but can make
auto fname = purge_file->fname; // it easier for us to reason the correctness.
auto type = purge_file->type; while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) {
auto number = purge_file->number; if (!purge_queue_.empty()) {
auto path_id = purge_file->path_id; auto purge_file = purge_queue_.begin();
auto job_id = purge_file->job_id; auto fname = purge_file->fname;
purge_queue_.pop_front(); auto type = purge_file->type;
auto number = purge_file->number;
auto path_id = purge_file->path_id;
auto job_id = purge_file->job_id;
purge_queue_.pop_front();
mutex_.Unlock(); mutex_.Unlock();
Status file_deletion_status; Status file_deletion_status;
DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number, DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number,
path_id); path_id);
mutex_.Lock(); mutex_.Lock();
} else {
assert(!logs_to_free_queue_.empty());
log::Writer* log_writer = *(logs_to_free_queue_.begin());
logs_to_free_queue_.pop_front();
mutex_.Unlock();
delete log_writer;
mutex_.Lock();
}
} }
bg_purge_scheduled_--; bg_purge_scheduled_--;
@ -3101,6 +3113,8 @@ void DBImpl::BackgroundCallFlush() {
JobContext job_context(next_job_id_.fetch_add(1), true); JobContext job_context(next_job_id_.fetch_add(1), true);
assert(bg_flush_scheduled_); assert(bg_flush_scheduled_);
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
@ -3343,7 +3357,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// NOTE: try to avoid unnecessary copy of MutableCFOptions if // NOTE: try to avoid unnecessary copy of MutableCFOptions if
// compaction is not necessary. Need to make sure mutex is held // compaction is not necessary. Need to make sure mutex is held
// until we make a copy in the following code // until we make a copy in the following code
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
if (c != nullptr) { if (c != nullptr) {
// update statistics // update statistics
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
@ -3670,6 +3686,9 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
state->mu->Lock(); state->mu->Lock();
state->super_version->Cleanup(); state->super_version->Cleanup();
state->db->FindObsoleteFiles(&job_context, false, true); state->db->FindObsoleteFiles(&job_context, false, true);
if (state->background_purge) {
state->db->ScheduleBgLogWriterClose(&job_context);
}
state->mu->Unlock(); state->mu->Unlock();
delete state->super_version; delete state->super_version;

View File

@ -365,6 +365,10 @@ class DBImpl : public DB {
// compaction status. // compaction status.
int BGCompactionsAllowed() const; int BGCompactionsAllowed() const;
// move logs pending closing from job_context to the DB queue and
// schedule a purge
void ScheduleBgLogWriterClose(JobContext* job_context);
// Returns the list of live files in 'live' and the list // Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'. // of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than // If force == false and the last call was less than
@ -493,6 +497,9 @@ class DBImpl : public DB {
void MarkLogAsHavingPrepSectionFlushed(uint64_t log); void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
void MarkLogAsContainingPrepSection(uint64_t log); void MarkLogAsContainingPrepSection(uint64_t log);
void AddToLogsToFreeQueue(log::Writer* log_writer) {
logs_to_free_queue_.push_back(log_writer);
}
Status NewDB(); Status NewDB();
@ -672,11 +679,6 @@ class DBImpl : public DB {
Status BackgroundFlush(bool* madeProgress, JobContext* job_context, Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer); LogBuffer* log_buffer);
// Compare options before and after to see whether flush or compaction is
// needed immediately after dynamic option change.
bool NeedFlushOrCompaction(const MutableCFOptions& base_options,
const MutableCFOptions& new_options);
void PrintStatistics(); void PrintStatistics();
// dump rocksdb.stats to LOG // dump rocksdb.stats to LOG
@ -884,6 +886,9 @@ class DBImpl : public DB {
// A queue to store filenames of the files to be purged // A queue to store filenames of the files to be purged
std::deque<PurgeFileInfo> purge_queue_; std::deque<PurgeFileInfo> purge_queue_;
// A queue to store log writers to close
std::deque<log::Writer*> logs_to_free_queue_;
int unscheduled_flushes_; int unscheduled_flushes_;
int unscheduled_compactions_; int unscheduled_compactions_;

View File

@ -6,6 +6,9 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <limits>
#include <string>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -20,80 +23,101 @@ class DBOptionsTest : public DBTestBase {
// RocksDB lite don't support dynamic options. // RocksDB lite don't support dynamic options.
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// When write stalls, user can enable auto compaction to unblock writes. TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) {
// However, we had an issue where the stalled write thread blocks the attempt const std::string kValue(1024, 'v');
// to persist auto compaction option, thus creating a deadlock. The test for (int method_type = 0; method_type < 2; method_type++) {
// verifies the issue is fixed. for (int option_type = 0; option_type < 4; option_type++) {
TEST_F(DBOptionsTest, EnableAutoCompactionToUnblockWrites) { Options options;
Options options; options.create_if_missing = true;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.write_buffer_size = 1000 * 1000; // 1M options.write_buffer_size = 1024 * 1024;
options.level0_file_num_compaction_trigger = 1; options.compression = CompressionType::kNoCompression;
options.level0_slowdown_writes_trigger = 1; options.level0_file_num_compaction_trigger = 1;
options.level0_stop_writes_trigger = 1; options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
options.compression = kNoCompression; options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
options.hard_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.soft_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
SyncPoint::GetInstance()->LoadDependency( DestroyAndReopen(options);
{{"DBImpl::DelayWrite:Wait", for (int i = 0; i < 1024 * 2; i++) {
"DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}, Put(Key(i), kValue);
{"DBImpl::BackgroundCompaction:Finish", }
"DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}}); dbfull()->TEST_WaitForFlushMemTable();
SyncPoint::GetInstance()->EnableProcessing(); ASSERT_EQ(2, NumTableFilesAtLevel(0));
uint64_t l0_size = SizeAtLevel(0);
// Stall writes. switch (option_type) {
Reopen(options); case 0:
env_->StartThread( // test with level0_stop_writes_trigger
[](void* arg) { options.level0_stop_writes_trigger = 2;
std::string value(1000, 'v'); options.level0_slowdown_writes_trigger = 2;
auto* t = static_cast<DBOptionsTest*>(arg); break;
for (int i = 0; i < 2000; i++) { case 1:
ASSERT_OK(t->Put(t->Key(i), value)); options.level0_slowdown_writes_trigger = 2;
} break;
}, case 2:
this); options.hard_pending_compaction_bytes_limit = l0_size;
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"); options.soft_pending_compaction_bytes_limit = l0_size;
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); break;
ColumnFamilyHandle* handle = dbfull()->DefaultColumnFamily(); case 3:
// We will get a deadlock here if we hit the issue. options.soft_pending_compaction_bytes_limit = l0_size;
ASSERT_OK(dbfull()->EnableAutoCompaction({handle})); break;
env_->WaitForJoin(); }
} Reopen(options);
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
// Similar to EnableAutoCompactionAfterStallDeadlock. See comments there. SyncPoint::GetInstance()->LoadDependency(
TEST_F(DBOptionsTest, ToggleStopTriggerToUnblockWrites) { {{"DBOptionsTest::EnableAutoCompactionAndTriggerStall:1",
Options options; "BackgroundCallCompaction:0"},
options.disable_auto_compactions = true; {"DBImpl::BackgroundCompaction():BeforePickCompaction",
options.write_buffer_size = 1000 * 1000; // 1M "DBOptionsTest::EnableAutoCompactionAndTriggerStall:2"},
options.level0_file_num_compaction_trigger = 1; {"DBOptionsTest::EnableAutoCompactionAndTriggerStall:3",
options.level0_slowdown_writes_trigger = 1; "DBImpl::BackgroundCompaction():AfterPickCompaction"}});
options.level0_stop_writes_trigger = 1; // Block background compaction.
options.compression = kNoCompression; SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->LoadDependency( switch (method_type) {
{{"DBImpl::DelayWrite:Wait", case 0:
"DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}, ASSERT_OK(
{"DBImpl::BackgroundCompaction:Finish", dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
"DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}}); break;
SyncPoint::GetInstance()->EnableProcessing(); case 1:
ASSERT_OK(dbfull()->EnableAutoCompaction(
{dbfull()->DefaultColumnFamily()}));
break;
}
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:1");
// Wait for stall condition recalculate.
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:2");
// Stall writes. switch (option_type) {
Reopen(options); case 0:
env_->StartThread( ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
[](void* arg) { break;
std::string value(1000, 'v'); case 1:
auto* t = static_cast<DBOptionsTest*>(arg); ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
for (int i = 0; i < 2000; i++) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_OK(t->Put(t->Key(i), value)); break;
} case 2:
}, ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
this); break;
TEST_SYNC_POINT("DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"); case 3:
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
// We will get a deadlock here if we hit the issue. ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_OK( break;
dbfull()->SetOptions({{"level0_stop_writes_trigger", "1000000"}, }
{"level0_slowdown_writes_trigger", "1000000"}})); TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:3");
env_->WaitForJoin();
// Background compaction executed.
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
}
}
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

View File

@ -1816,6 +1816,22 @@ TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBTest2, MaxSuccessiveMergesInRecovery) {
Options options;
options = CurrentOptions(options);
options.merge_operator = MergeOperators::CreatePutOperator();
DestroyAndReopen(options);
db_->Put(WriteOptions(), "foo", "bar");
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
options.max_successive_merges = 3;
Reopen(options);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -30,6 +30,8 @@ SpecialEnv::SpecialEnv(Env* base)
manifest_write_error_.store(false, std::memory_order_release); manifest_write_error_.store(false, std::memory_order_release);
log_write_error_.store(false, std::memory_order_release); log_write_error_.store(false, std::memory_order_release);
random_file_open_counter_.store(0, std::memory_order_relaxed); random_file_open_counter_.store(0, std::memory_order_relaxed);
delete_count_.store(0, std::memory_order_relaxed);
num_open_wal_file_.store(0);
log_write_slowdown_ = 0; log_write_slowdown_ = 0;
bytes_written_ = 0; bytes_written_ = 0;
sync_counter_ = 0; sync_counter_ = 0;

View File

@ -285,7 +285,10 @@ class SpecialEnv : public EnvWrapper {
class WalFile : public WritableFile { class WalFile : public WritableFile {
public: public:
WalFile(SpecialEnv* env, unique_ptr<WritableFile>&& b) WalFile(SpecialEnv* env, unique_ptr<WritableFile>&& b)
: env_(env), base_(std::move(b)) {} : env_(env), base_(std::move(b)) {
env_->num_open_wal_file_.fetch_add(1);
}
virtual ~WalFile() { env_->num_open_wal_file_.fetch_add(-1); }
Status Append(const Slice& data) override { Status Append(const Slice& data) override {
#if !(defined NDEBUG) || !defined(OS_WIN) #if !(defined NDEBUG) || !defined(OS_WIN)
TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1"); TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1");
@ -443,6 +446,11 @@ class SpecialEnv : public EnvWrapper {
addon_time_.load(); addon_time_.load();
} }
virtual Status DeleteFile(const std::string& fname) override {
delete_count_.fetch_add(1);
return target()->DeleteFile(fname);
}
Random rnd_; Random rnd_;
port::Mutex rnd_mutex_; // Lock to pretect rnd_ port::Mutex rnd_mutex_; // Lock to pretect rnd_
@ -470,6 +478,9 @@ class SpecialEnv : public EnvWrapper {
// Slow down every log write, in micro-seconds. // Slow down every log write, in micro-seconds.
std::atomic<int> log_write_slowdown_; std::atomic<int> log_write_slowdown_;
// Number of WAL files that are still open for write.
std::atomic<int> num_open_wal_file_;
bool count_random_reads_; bool count_random_reads_;
anon::AtomicCounter random_read_counter_; anon::AtomicCounter random_read_counter_;
std::atomic<size_t> random_read_bytes_counter_; std::atomic<size_t> random_read_bytes_counter_;
@ -494,6 +505,8 @@ class SpecialEnv : public EnvWrapper {
std::atomic<int64_t> addon_time_; std::atomic<int64_t> addon_time_;
std::atomic<int> delete_count_;
bool time_elapse_only_sleep_; bool time_elapse_only_sleep_;
bool no_sleep_; bool no_sleep_;

View File

@ -292,6 +292,44 @@ TEST_F(DBWALTest, RecoveryWithEmptyLog) {
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(DBWALTest, GetSortedWalFiles) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_EQ(0, log_files.size());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_EQ(1, log_files.size());
} while (ChangeOptions());
}
TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
// Test for regression of WAL cleanup missing files that don't contain data
// for every column family.
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Put(1, "foo", "v2"));
std::array<uint64_t, 2> earliest_log_nums;
for (int i = 0; i < 2; ++i) {
if (i > 0) {
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
}
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
if (log_files.size() > 0) {
earliest_log_nums[i] = log_files[0]->LogNumber();
} else {
earliest_log_nums[i] = port::kMaxUint64;
}
}
// Check at least the first WAL was cleaned up during the recovery.
ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
} while (ChangeOptions());
}
TEST_F(DBWALTest, RecoverWithLargeLog) { TEST_F(DBWALTest, RecoverWithLargeLog) {
do { do {
{ {

View File

@ -16,12 +16,17 @@
#include <vector> #include <vector>
#include <string> #include <string>
#include "db/dbformat.h"
#include "db/column_family.h" #include "db/column_family.h"
#include "db/dbformat.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
#include "db/job_context.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/memtable_list.h" #include "db/memtable_list.h"
#include "db/snapshot_impl.h" #include "db/snapshot_impl.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "db/write_controller.h"
#include "db/write_thread.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -33,11 +38,6 @@
#include "util/instrumented_mutex.h" #include "util/instrumented_mutex.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/thread_local.h" #include "util/thread_local.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
#include "db/write_thread.h"
#include "db/job_context.h"
namespace rocksdb { namespace rocksdb {

View File

@ -153,10 +153,14 @@ void ForwardIterator::SVCleanup() {
db_->mutex_.Lock(); db_->mutex_.Lock();
sv_->Cleanup(); sv_->Cleanup();
db_->FindObsoleteFiles(&job_context, false, true); db_->FindObsoleteFiles(&job_context, false, true);
if (read_options_.background_purge_on_iterator_cleanup) {
db_->ScheduleBgLogWriterClose(&job_context);
}
db_->mutex_.Unlock(); db_->mutex_.Unlock();
delete sv_; delete sv_;
if (job_context.HaveSomethingToDelete()) { if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context); db_->PurgeObsoleteFiles(
job_context, read_options_.background_purge_on_iterator_cleanup);
} }
job_context.Clean(); job_context.Clean();
} }

View File

@ -615,15 +615,15 @@ void InternalStats::DumpDBStats(std::string* value) {
// Data // Data
// writes: total number of write requests. // writes: total number of write requests.
// keys: total number of key updates issued by all the write requests // keys: total number of key updates issued by all the write requests
// batches: number of group commits issued to the DB. Each group can contain // commit groups: number of group commits issued to the DB. Each group can
// one or more writes. // contain one or more writes.
// so writes/keys is the average number of put in multi-put or put // so writes/keys is the average number of put in multi-put or put
// writes/batches is the average group commit size. // writes/groups is the average group commit size.
// //
// The format is the same for interval stats. // The format is the same for interval stats.
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Cumulative writes: %s writes, %s keys, %s batches, " "Cumulative writes: %s writes, %s keys, %s commit groups, "
"%.1f writes per batch, ingest: %.2f GB, %.2f MB/s\n", "%.1f writes per commit group, ingest: %.2f GB, %.2f MB/s\n",
NumberToHumanString(write_other + write_self).c_str(), NumberToHumanString(write_other + write_self).c_str(),
NumberToHumanString(num_keys_written).c_str(), NumberToHumanString(num_keys_written).c_str(),
NumberToHumanString(write_self).c_str(), NumberToHumanString(write_self).c_str(),
@ -654,8 +654,8 @@ void InternalStats::DumpDBStats(std::string* value) {
uint64_t interval_num_keys_written = uint64_t interval_num_keys_written =
num_keys_written - db_stats_snapshot_.num_keys_written; num_keys_written - db_stats_snapshot_.num_keys_written;
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Interval writes: %s writes, %s keys, %s batches, " "Interval writes: %s writes, %s keys, %s commit groups, "
"%.1f writes per batch, ingest: %.2f MB, %.2f MB/s\n", "%.1f writes per commit group, ingest: %.2f MB, %.2f MB/s\n",
NumberToHumanString( NumberToHumanString(
interval_write_other + interval_write_self).c_str(), interval_write_other + interval_write_self).c_str(),
NumberToHumanString(interval_num_keys_written).c_str(), NumberToHumanString(interval_num_keys_written).c_str(),

View File

@ -12,7 +12,6 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "db/column_family.h"
#include "db/log_writer.h" #include "db/log_writer.h"
namespace rocksdb { namespace rocksdb {

View File

@ -95,11 +95,11 @@ Status TableCache::GetTableReader(
unique_ptr<RandomAccessFile> file; unique_ptr<RandomAccessFile> file;
Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options); Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);
if (readahead > 0) {
file = NewReadaheadRandomAccessFile(std::move(file), readahead);
}
RecordTick(ioptions_.statistics, NO_FILE_OPENS); RecordTick(ioptions_.statistics, NO_FILE_OPENS);
if (s.ok()) { if (s.ok()) {
if (readahead > 0) {
file = NewReadaheadRandomAccessFile(std::move(file), readahead);
}
if (!sequential_mode && ioptions_.advise_random_on_open) { if (!sequential_mode && ioptions_.advise_random_on_open) {
file->Hint(RandomAccessFile::RANDOM); file->Hint(RandomAccessFile::RANDOM);
} }

View File

@ -383,7 +383,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
Status s; Status s;
if (type == kAliveLogFile) { if (type == kAliveLogFile) {
std::string fname = LogFileName(db_options_.wal_dir, number); std::string fname = LogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(fname, sequence); s = ReadFirstLine(fname, number, sequence);
if (env_->FileExists(fname).ok() && !s.ok()) { if (env_->FileExists(fname).ok() && !s.ok()) {
// return any error that is not caused by non-existing file // return any error that is not caused by non-existing file
return s; return s;
@ -394,7 +394,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
// check if the file got moved to archive. // check if the file got moved to archive.
std::string archived_file = std::string archived_file =
ArchivedLogFileName(db_options_.wal_dir, number); ArchivedLogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(archived_file, sequence); s = ReadFirstLine(archived_file, number, sequence);
// maybe the file was deleted from archive dir. If that's the case, return // maybe the file was deleted from archive dir. If that's the case, return
// Status::OK(). The caller with identify this as empty file because // Status::OK(). The caller with identify this as empty file because
// *sequence == 0 // *sequence == 0
@ -413,6 +413,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
// the function returns status.ok() and sequence == 0 if the file exists, but is // the function returns status.ok() and sequence == 0 if the file exists, but is
// empty // empty
Status WalManager::ReadFirstLine(const std::string& fname, Status WalManager::ReadFirstLine(const std::string& fname,
const uint64_t number,
SequenceNumber* sequence) { SequenceNumber* sequence) {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Env* env; Env* env;
@ -449,7 +450,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
reporter.status = &status; reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks; reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/, *sequence); true /*checksum*/, 0 /*initial_offset*/, number);
std::string scratch; std::string scratch;
Slice record; Slice record;

View File

@ -54,9 +54,9 @@ class WalManager {
return ReadFirstRecord(type, number, sequence); return ReadFirstRecord(type, number, sequence);
} }
Status TEST_ReadFirstLine(const std::string& fname, Status TEST_ReadFirstLine(const std::string& fname, const uint64_t number,
SequenceNumber* sequence) { SequenceNumber* sequence) {
return ReadFirstLine(fname, sequence); return ReadFirstLine(fname, number, sequence);
} }
private: private:
@ -71,7 +71,8 @@ class WalManager {
Status ReadFirstRecord(const WalFileType type, const uint64_t number, Status ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence); SequenceNumber* sequence);
Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence); Status ReadFirstLine(const std::string& fname, const uint64_t number,
SequenceNumber* sequence);
// ------- state from DBImpl ------ // ------- state from DBImpl ------
const DBOptions& db_options_; const DBOptions& db_options_;

View File

@ -119,10 +119,11 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) {
ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions())); ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
SequenceNumber s; SequenceNumber s;
ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, &s)); ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
ASSERT_EQ(s, 0U); ASSERT_EQ(s, 0U);
ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); ASSERT_OK(
wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
ASSERT_EQ(s, 0U); ASSERT_EQ(s, 0U);
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(

View File

@ -0,0 +1,19 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include "rocksdb/options.h"
#include "rocksdb/status.h"
namespace rocksdb {
// Try to migrate DB created with old_opts to be use new_opts.
// Multiple column families is not supported.
// It is best-effort. No guarantee to succeed.
// A full compaction may be executed.
Status OptionChangeMigration(std::string dbname, const Options& old_opts,
const Options& new_opts);
} // namespace rocksdb

View File

@ -6,7 +6,7 @@
#define ROCKSDB_MAJOR 4 #define ROCKSDB_MAJOR 4
#define ROCKSDB_MINOR 11 #define ROCKSDB_MINOR 11
#define ROCKSDB_PATCH 0 #define ROCKSDB_PATCH 2
// Do not use these. We made the mistake of declaring macros starting with // Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these // double underscore. Now we have to live with our choice. We'll deprecate these

2
src.mk
View File

@ -131,6 +131,7 @@ LIB_SOURCES = \
utilities/merge_operators/string_append/stringappend2.cc \ utilities/merge_operators/string_append/stringappend2.cc \
utilities/merge_operators/string_append/stringappend.cc \ utilities/merge_operators/string_append/stringappend.cc \
utilities/merge_operators/uint64add.cc \ utilities/merge_operators/uint64add.cc \
utilities/option_change_migration/option_change_migration.cc \
utilities/options/options_util.cc \ utilities/options/options_util.cc \
utilities/persistent_cache/persistent_cache_tier.cc \ utilities/persistent_cache/persistent_cache_tier.cc \
utilities/persistent_cache/volatile_tier_impl.cc \ utilities/persistent_cache/volatile_tier_impl.cc \
@ -286,6 +287,7 @@ MAIN_SOURCES = \
utilities/geodb/geodb_test.cc \ utilities/geodb/geodb_test.cc \
utilities/memory/memory_test.cc \ utilities/memory/memory_test.cc \
utilities/merge_operators/string_append/stringappend_test.cc \ utilities/merge_operators/string_append/stringappend_test.cc \
utilities/option_change_migration/option_change_migration_test.cc \
utilities/options/options_util_test.cc \ utilities/options/options_util_test.cc \
utilities/redis/redis_lists_test.cc \ utilities/redis/redis_lists_test.cc \
utilities/simulator_cache/sim_cache_test.cc \ utilities/simulator_cache/sim_cache_test.cc \

View File

@ -42,6 +42,7 @@
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
@ -533,12 +534,19 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// We've successfully read the footer and the index block: we're // We've successfully read the footer and the index block: we're
// ready to serve requests. // ready to serve requests.
// Better not mutate rep_ after the creation. eg. internal_prefix_transform
// raw pointer will be used to create HashIndexReader, whose reset may
// access a dangling pointer.
Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options, Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options,
internal_comparator, skip_filters); internal_comparator, skip_filters);
rep->file = std::move(file); rep->file = std::move(file);
rep->footer = footer; rep->footer = footer;
rep->index_type = table_options.index_type; rep->index_type = table_options.index_type;
rep->hash_index_allow_collision = table_options.hash_index_allow_collision; rep->hash_index_allow_collision = table_options.hash_index_allow_collision;
// We need to wrap data with internal_prefix_transform to make sure it can
// handle prefix correctly.
rep->internal_prefix_transform.reset(
new InternalKeySliceTransform(rep->ioptions.prefix_extractor));
SetupCacheKeyPrefix(rep, file_size); SetupCacheKeyPrefix(rep, file_size);
unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep)); unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep));
@ -1053,7 +1061,11 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
} else { } else {
// Create index reader and put it in the cache. // Create index reader and put it in the cache.
Status s; Status s;
TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:2");
s = CreateIndexReader(&index_reader); s = CreateIndexReader(&index_reader);
TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:1");
TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:3");
TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:4");
if (s.ok()) { if (s.ok()) {
assert(index_reader != nullptr); assert(index_reader != nullptr);
s = block_cache->Insert(key, index_reader, index_reader->usable_size(), s = block_cache->Insert(key, index_reader, index_reader->usable_size(),
@ -1609,10 +1621,6 @@ Status BlockBasedTable::CreateIndexReader(
meta_index_iter = meta_iter_guard.get(); meta_index_iter = meta_iter_guard.get();
} }
// We need to wrap data with internal_prefix_transform to make sure it can
// handle prefix correctly.
rep_->internal_prefix_transform.reset(
new InternalKeySliceTransform(rep_->ioptions.prefix_extractor));
return HashIndexReader::Create( return HashIndexReader::Create(
rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions, rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions,
comparator, footer.index_handle(), meta_index_iter, index_reader, comparator, footer.index_handle(), meta_index_iter, index_reader,

View File

@ -45,6 +45,7 @@
#include "util/random.h" #include "util/random.h"
#include "util/statistics.h" #include "util/statistics.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
@ -2021,6 +2022,65 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) {
c.ResetTableReader(); c.ResetTableReader();
} }
TEST_F(BlockBasedTableTest, NewIndexIteratorLeak) {
// A regression test to avoid data race described in
// https://github.com/facebook/rocksdb/issues/1267
TableConstructor c(BytewiseComparator(), true /* convert_to_internal_key_ */);
std::vector<std::string> keys;
stl_wrappers::KVMap kvmap;
c.Add("a1", "val1");
Options options;
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
BlockBasedTableOptions table_options;
table_options.index_type = BlockBasedTableOptions::kHashSearch;
table_options.cache_index_and_filter_blocks = true;
table_options.block_cache = NewLRUCache(0);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
const ImmutableCFOptions ioptions(options);
c.Finish(options, ioptions, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
{
{"BlockBasedTable::NewIndexIterator::thread1:1",
"BlockBasedTable::NewIndexIterator::thread2:2"},
{"BlockBasedTable::NewIndexIterator::thread2:3",
"BlockBasedTable::NewIndexIterator::thread1:4"},
},
{
{"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker",
"BlockBasedTable::NewIndexIterator::thread1:1"},
{"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker",
"BlockBasedTable::NewIndexIterator::thread1:4"},
{"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker",
"BlockBasedTable::NewIndexIterator::thread2:2"},
{"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker",
"BlockBasedTable::NewIndexIterator::thread2:3"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
auto* reader = c.GetTableReader();
std::function<void()> func1 = [&]() {
TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker");
std::unique_ptr<InternalIterator> iter(reader->NewIterator(ro));
iter->Seek(InternalKey("a1", 0, kTypeValue).Encode());
};
std::function<void()> func2 = [&]() {
TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker");
std::unique_ptr<InternalIterator> iter(reader->NewIterator(ro));
};
auto thread1 = std::thread(func1);
auto thread2 = std::thread(func2);
thread1.join();
thread2.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
c.ResetTableReader();
}
// Plain table is not supported in ROCKSDB_LITE // Plain table is not supported in ROCKSDB_LITE
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(PlainTableTest, BasicPlainTableProperties) { TEST_F(PlainTableTest, BasicPlainTableProperties) {

View File

@ -129,9 +129,13 @@ class PosixEnv : public Env {
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].JoinAllThreads(); thread_pools_[pool_id].JoinAllThreads();
} }
// All threads must be joined before the deletion of // Delete the thread_status_updater_ only when the current Env is not
// thread_status_updater_. // Env::Default(). This is to avoid the free-after-use error when
delete thread_status_updater_; // Env::Default() is destructed while some other child threads are
// still trying to update thread status.
if (this != Env::Default()) {
delete thread_status_updater_;
}
} }
void SetFD_CLOEXEC(int fd, const EnvOptions* options) { void SetFD_CLOEXEC(int fd, const EnvOptions* options) {

View File

@ -6,6 +6,7 @@
#include <assert.h> #include <assert.h>
#include <condition_variable> #include <condition_variable>
#include <functional>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <thread> #include <thread>

View File

@ -10,6 +10,7 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <functional>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>

View File

@ -0,0 +1,153 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/utilities/option_change_migration.h"
#ifndef ROCKSDB_LITE
#include "rocksdb/db.h"
namespace rocksdb {
namespace {
// Return a version of Options `opts` that allow us to open/write into a DB
// without triggering an automatic compaction or stalling. This is guaranteed
// by disabling automatic compactions and using huge values for stalling
// triggers.
Options GetNoCompactionOptions(const Options& opts) {
Options ret_opts = opts;
ret_opts.disable_auto_compactions = true;
ret_opts.level0_slowdown_writes_trigger = 999999;
ret_opts.level0_stop_writes_trigger = 999999;
ret_opts.soft_pending_compaction_bytes_limit = 0;
ret_opts.hard_pending_compaction_bytes_limit = 0;
return ret_opts;
}
Status OpenDb(const Options& options, const std::string& dbname,
std::unique_ptr<DB>* db) {
db->reset();
DB* tmpdb;
Status s = DB::Open(options, dbname, &tmpdb);
if (s.ok()) {
db->reset(tmpdb);
}
return s;
}
Status CompactToLevel(const Options& options, const std::string& dbname,
int dest_level, bool need_reopen) {
std::unique_ptr<DB> db;
Options no_compact_opts = GetNoCompactionOptions(options);
if (dest_level == 0) {
// L0 has strict sequenceID requirements to files to it. It's safer
// to only put one compacted file to there.
// This is only used for converting to universal compaction with
// only one level. In this case, compacting to one file is also
// optimal.
no_compact_opts.target_file_size_base = 999999999999999;
}
Status s = OpenDb(no_compact_opts, dbname, &db);
if (!s.ok()) {
return s;
}
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = dest_level;
db->CompactRange(cro, nullptr, nullptr);
if (need_reopen) {
// Need to restart DB to rewrite the manifest file.
// In order to open a DB with specific num_levels, the manifest file should
// contain no record that mentiones any level beyond num_levels. Issuing a
// full compaction will move all the data to a level not exceeding
// num_levels, but the manifest may still contain previous record mentioning
// a higher level. Reopening the DB will force the manifest to be rewritten
// so that those records will be cleared.
db.reset();
s = OpenDb(no_compact_opts, dbname, &db);
}
return s;
}
Status MigrateToUniversal(std::string dbname, const Options& old_opts,
const Options& new_opts) {
if (old_opts.num_levels <= new_opts.num_levels) {
return Status::OK();
} else {
bool need_compact = false;
{
std::unique_ptr<DB> db;
Options opts = GetNoCompactionOptions(old_opts);
Status s = OpenDb(opts, dbname, &db);
if (!s.ok()) {
return s;
}
ColumnFamilyMetaData metadata;
db->GetColumnFamilyMetaData(&metadata);
if (!metadata.levels.empty() &&
metadata.levels.back().level >= new_opts.num_levels) {
need_compact = true;
}
}
if (need_compact) {
return CompactToLevel(old_opts, dbname, new_opts.num_levels - 1, true);
}
return Status::OK();
}
}
Status MigrateToLevelBase(std::string dbname, const Options& old_opts,
const Options& new_opts) {
if (!new_opts.level_compaction_dynamic_level_bytes) {
if (old_opts.num_levels == 1) {
return Status::OK();
}
// Compact everything to level 1 to guarantee it can be safely opened.
Options opts = old_opts;
opts.target_file_size_base = new_opts.target_file_size_base;
// Although sometimes we can open the DB with the new option without error,
// We still want to compact the files to avoid the LSM tree to stuck
// in bad shape. For example, if the user changed the level size
// multiplier from 4 to 8, with the same data, we will have fewer
// levels. Unless we issue a full comaction, the LSM tree may stuck
// with more levels than needed and it won't recover automatically.
return CompactToLevel(opts, dbname, 1, true);
} else {
// Compact everything to the last level to guarantee it can be safely
// opened.
if (old_opts.num_levels == 1) {
return Status::OK();
} else if (new_opts.num_levels > old_opts.num_levels) {
// Dynamic level mode requires data to be put in the last level first.
return CompactToLevel(new_opts, dbname, new_opts.num_levels - 1, false);
} else {
Options opts = old_opts;
opts.target_file_size_base = new_opts.target_file_size_base;
return CompactToLevel(opts, dbname, new_opts.num_levels - 1, true);
}
}
}
} // namespace
Status OptionChangeMigration(std::string dbname, const Options& old_opts,
const Options& new_opts) {
if (new_opts.compaction_style == CompactionStyle::kCompactionStyleUniversal) {
return MigrateToUniversal(dbname, old_opts, new_opts);
} else if (new_opts.compaction_style ==
CompactionStyle::kCompactionStyleLevel) {
return MigrateToLevelBase(dbname, old_opts, new_opts);
} else {
return Status::NotSupported(
"Do not how to migrate to this compaction style");
}
}
} // namespace rocksdb
#else
namespace rocksdb {
Status OptionChangeMigration(std::string dbname, const Options& old_opts,
const Options& new_opts) {
return Status::NotSupported();
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,207 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/utilities/option_change_migration.h"
#include <set>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
namespace rocksdb {
class DBOptionChangeMigrationTest
: public DBTestBase,
public testing::WithParamInterface<
std::tuple<int, bool, bool, int, bool, bool>> {
public:
DBOptionChangeMigrationTest()
: DBTestBase("/db_option_change_migration_test") {
level1_ = std::get<0>(GetParam());
is_universal1_ = std::get<1>(GetParam());
is_dynamic1_ = std::get<2>(GetParam());
level2_ = std::get<3>(GetParam());
is_universal2_ = std::get<4>(GetParam());
is_dynamic2_ = std::get<5>(GetParam());
}
// Required if inheriting from testing::WithParamInterface<>
static void SetUpTestCase() {}
static void TearDownTestCase() {}
int level1_;
bool is_universal1_;
bool is_dynamic1_;
int level2_;
bool is_universal2_;
bool is_dynamic2_;
};
#ifndef ROCKSDB_LITE
TEST_P(DBOptionChangeMigrationTest, Migrate1) {
Options old_options = CurrentOptions();
if (is_universal1_) {
old_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
} else {
old_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
old_options.level_compaction_dynamic_level_bytes = is_dynamic1_;
}
old_options.level0_file_num_compaction_trigger = 3;
old_options.write_buffer_size = 64 * 1024;
old_options.target_file_size_base = 128 * 1024;
// Make level target of L1, L2 to be 200KB and 600KB
old_options.num_levels = level1_;
old_options.max_bytes_for_level_multiplier = 3;
old_options.max_bytes_for_level_base = 200 * 1024;
Reopen(old_options);
Random rnd(301);
int key_idx = 0;
// Generate at least 2MB of data
for (int num = 0; num < 20; num++) {
GenerateNewFile(&rnd, &key_idx);
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
// Will make sure exactly those keys are in the DB after migration.
std::set<std::string> keys;
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->SeekToFirst();
for (; it->Valid(); it->Next()) {
keys.insert(it->key().ToString());
}
}
Close();
Options new_options = old_options;
if (is_universal2_) {
new_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
} else {
new_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
new_options.level_compaction_dynamic_level_bytes = is_dynamic2_;
}
new_options.target_file_size_base = 256 * 1024;
new_options.num_levels = level2_;
new_options.max_bytes_for_level_base = 150 * 1024;
new_options.max_bytes_for_level_multiplier = 4;
ASSERT_OK(OptionChangeMigration(dbname_, old_options, new_options));
Reopen(new_options);
// Wait for compaction to finish and make sure it can reopen
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
Reopen(new_options);
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->SeekToFirst();
for (std::string key : keys) {
ASSERT_TRUE(it->Valid());
ASSERT_EQ(key, it->key().ToString());
it->Next();
}
ASSERT_TRUE(!it->Valid());
}
}
TEST_P(DBOptionChangeMigrationTest, Migrate2) {
Options old_options = CurrentOptions();
if (is_universal2_) {
old_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
} else {
old_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
old_options.level_compaction_dynamic_level_bytes = is_dynamic2_;
}
old_options.level0_file_num_compaction_trigger = 3;
old_options.write_buffer_size = 64 * 1024;
old_options.target_file_size_base = 128 * 1024;
// Make level target of L1, L2 to be 200KB and 600KB
old_options.num_levels = level2_;
old_options.max_bytes_for_level_multiplier = 3;
old_options.max_bytes_for_level_base = 200 * 1024;
Reopen(old_options);
Random rnd(301);
int key_idx = 0;
// Generate at least 2MB of data
for (int num = 0; num < 20; num++) {
GenerateNewFile(&rnd, &key_idx);
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
// Will make sure exactly those keys are in the DB after migration.
std::set<std::string> keys;
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->SeekToFirst();
for (; it->Valid(); it->Next()) {
keys.insert(it->key().ToString());
}
}
Close();
Options new_options = old_options;
if (is_universal1_) {
new_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
} else {
new_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
new_options.level_compaction_dynamic_level_bytes = is_dynamic1_;
}
new_options.target_file_size_base = 256 * 1024;
new_options.num_levels = level1_;
new_options.max_bytes_for_level_base = 150 * 1024;
new_options.max_bytes_for_level_multiplier = 4;
ASSERT_OK(OptionChangeMigration(dbname_, old_options, new_options));
Reopen(new_options);
// Wait for compaction to finish and make sure it can reopen
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
Reopen(new_options);
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->SeekToFirst();
for (std::string key : keys) {
ASSERT_TRUE(it->Valid());
ASSERT_EQ(key, it->key().ToString());
it->Next();
}
ASSERT_TRUE(!it->Valid());
}
}
INSTANTIATE_TEST_CASE_P(
DBOptionChangeMigrationTest, DBOptionChangeMigrationTest,
::testing::Values(std::make_tuple(3, false, false, 4, false, false),
std::make_tuple(3, false, true, 4, false, true),
std::make_tuple(3, false, true, 4, false, false),
std::make_tuple(3, false, false, 4, false, true),
std::make_tuple(3, true, false, 4, true, false),
std::make_tuple(1, true, false, 4, true, false),
std::make_tuple(3, false, false, 4, true, false),
std::make_tuple(3, false, false, 1, true, false),
std::make_tuple(3, false, true, 4, true, false),
std::make_tuple(3, false, true, 1, true, false),
std::make_tuple(1, true, false, 4, false, false)));
#endif // ROCKSDB_LITE
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -7,6 +7,8 @@
#include "utilities/persistent_cache/block_cache_tier_file.h" #include "utilities/persistent_cache/block_cache_tier_file.h"
#include <unistd.h> #include <unistd.h>
#include <functional>
#include <memory> #include <memory>
#include <vector> #include <vector>

View File

@ -6,6 +6,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <functional>
#include <list> #include <list>
#include <memory> #include <memory>
#include <string> #include <string>

View File

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <functional>
#include "util/random.h" #include "util/random.h"
#include "utilities/persistent_cache/hash_table.h" #include "utilities/persistent_cache/hash_table.h"
#include "utilities/persistent_cache/lrulist.h" #include "utilities/persistent_cache/lrulist.h"