Merge branch 'master' into fix-range-deletion-bug
This commit is contained in:
commit
5a81a48ba9
@ -777,7 +777,7 @@ if(WITH_LIBRADOS)
|
||||
endif()
|
||||
|
||||
if(WIN32)
|
||||
set(SYSTEM_LIBS ${SYSTEM_LIBS} Shlwapi.lib Rpcrt4.lib)
|
||||
set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib)
|
||||
set(LIBS ${ROCKSDB_STATIC_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
|
||||
else()
|
||||
set(SYSTEM_LIBS ${CMAKE_THREAD_LIBS_INIT})
|
||||
|
18
HISTORY.md
18
HISTORY.md
@ -1,21 +1,23 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
### Bug Fixes
|
||||
* Fix data corruption casued by output of intra-L0 compaction on ingested file not being placed in correct order in L0.
|
||||
* Fix a data race between Version::GetColumnFamilyMetaData() and Compaction::MarkFilesBeingCompacted() for access to being_compacted (#6056). The current fix acquires the db mutex during Version::GetColumnFamilyMetaData(), which may cause regression.
|
||||
|
||||
### Public API Change
|
||||
* RocksDB release 4.1 or older will not be able to open DB generated by the new release. 4.2 was released on Feb 23, 2016.
|
||||
* TTL Compactions in Level compaction style now initiate successive cascading compactions on a key range so that it reaches the bottom level quickly on TTL expiry. `creation_time` table property for compaction output files is now set to the minimum of the creation times of all compaction inputs.
|
||||
* Changed the default value of periodic_compaction_seconds to `UINT64_MAX` which allows RocksDB to auto-tune periodic compaction scheduling. When using the default value, periodic compactions are now auto-enabled if a compaction filter is used. A value of `0` will turn off the feature completely.
|
||||
* With FIFO compaction style, options.periodic_compaction_seconds will have the same meaning as options.ttl. Whichever stricter will be used. With the default options.periodic_compaction_seconds value with options.ttl's default of 0, RocksDB will give a default of 30 days.
|
||||
* Added an API GetCreationTimeOfOldestFile(uint64_t* creation_time) to get the file_creation_time of the oldest SST file in the DB.
|
||||
* An unlikely usage of FilterPolicy is no longer supported. Calling GetFilterBitsBuilder() on the FilterPolicy returned by NewBloomFilterPolicy will now cause an assertion violation in debug builds, because RocksDB has internally migrated to a more elaborate interface that is expected to evolve further. Custom implementations of FilterPolicy should work as before, except those wrapping the return of NewBloomFilterPolicy, which will require a new override of a protected function in FilterPolicy.
|
||||
* NewBloomFilterPolicy now takes bits_per_key as a double instead of an int. This permits finer control over the memory vs. accuracy trade-off in the new Bloom filter implementation and should not change source code compatibility.
|
||||
* The option BackupableDBOptions::max_valid_backups_to_open is now only used when opening BackupEngineReadOnly. When opening a read/write BackupEngine, anything but the default value logs a warning and is treated as the default. This change ensures that backup deletion has proper accounting of shared files to ensure they are deleted when no longer referenced by a backup.
|
||||
|
||||
### Default Option Changes
|
||||
* Changed the default value of periodic_compaction_seconds to `UINT64_MAX - 1` which allows RocksDB to auto-tune periodic compaction scheduling. When using the default value, periodic compactions are now auto-enabled if a compaction filter is used. A value of `0` will turn off the feature completely.
|
||||
* Changed the default value of ttl to `UINT64_MAX - 1` which allows RocksDB to auto-tune ttl value. When using the default value, TTL will be auto-enabled to 30 days, when the feature is supported. To revert the old behavior, you can explictly set it to 0.
|
||||
|
||||
### New Features
|
||||
* Universal compaction to support options.periodic_compaction_seconds. A full compaction will be triggered if any file is over the threshold.
|
||||
* `GetLiveFilesMetaData` and `GetColumnFamilyMetaData` now expose the file number of SST files as well as the oldest blob file referenced by each SST.
|
||||
* A batched MultiGet API (DB::MultiGet()) that supports retrieving keys from multiple column families.
|
||||
* Full and partitioned filters in the block-based table use an improved Bloom filter implementation, enabled with format_version 5 (or above) because previous releases cannot read this filter. This replacement is faster and more accurate, especially for high bits per key or millions of keys in a single (full) filter. For example, the new Bloom filter has a lower false positive rate at 16 bits per key than the old one at 100 bits per key.
|
||||
* Full and partitioned filters in the block-based table use an improved Bloom filter implementation, enabled with format_version 5 (or above) because previous releases cannot read this filter. This replacement is faster and more accurate, especially for high bits per key or millions of keys in a single (full) filter. For example, the new Bloom filter has the same false postive rate at 9.55 bits per key as the old one at 10 bits per key, and a lower false positive rate at 16 bits per key than the old one at 100 bits per key.
|
||||
* Added AVX2 instructions to USE_SSE builds to accelerate the new Bloom filter and XXH3-based hash function on compatible x86_64 platforms (Haswell and later, ~2014).
|
||||
* Support options.ttl with options.max_open_files = -1. File's oldest ancester time will be written to manifest. If it is availalbe, this information will be used instead of creation_time in table properties.
|
||||
* Setting options.ttl for universal compaction now has the same meaning as setting periodic_compaction_seconds.
|
||||
@ -24,7 +26,9 @@
|
||||
* For 64-bit hashing, RocksDB is standardizing on a slightly modified preview version of XXH3. This function is now used for many non-persisted hashes, along with fastrange64() in place of the modulus operator, and some benchmarks show a slight improvement.
|
||||
|
||||
### Bug Fixes
|
||||
* Fix a assertion failure in MultiGe4t() when BlockBasedTableOptions::no_block_cache is true and there is no compressed block cache
|
||||
* Fix data corruption casued by output of intra-L0 compaction on ingested file not being placed in correct order in L0.
|
||||
* Fix a data race between Version::GetColumnFamilyMetaData() and Compaction::MarkFilesBeingCompacted() for access to being_compacted (#6056). The current fix acquires the db mutex during Version::GetColumnFamilyMetaData(), which may cause regression.
|
||||
* Fix a assertion failure in MultiGet() when BlockBasedTableOptions::no_block_cache is true and there is no compressed block cache
|
||||
* If a call to BackupEngine::PurgeOldBackups or BackupEngine::DeleteBackup suffered a crash, power failure, or I/O error, files could be left over from old backups that could only be purged with a call to GarbageCollect. Any call to PurgeOldBackups, DeleteBackup, or GarbageCollect should now suffice to purge such files.
|
||||
* Fix a buffer overrun problem in BlockBasedTable::MultiGet() when compression is enabled and no compressed block cache is configured.
|
||||
* Fix a bug in DBIter that is_blob_ state isn't updated when iterating backward using seek.
|
||||
|
41
Makefile
41
Makefile
@ -1103,14 +1103,23 @@ unity_test: db/db_test.o db/db_test_util.o $(TESTHARNESS) $(TOOLLIBOBJECTS) unit
|
||||
rocksdb.h rocksdb.cc: build_tools/amalgamate.py Makefile $(LIB_SOURCES) unity.cc
|
||||
build_tools/amalgamate.py -I. -i./include unity.cc -x include/rocksdb/c.h -H rocksdb.h -o rocksdb.cc
|
||||
|
||||
clean:
|
||||
clean: clean-ext-libraries-all clean-rocks
|
||||
|
||||
clean-not-downloaded: clean-ext-libraries-bin clean-rocks
|
||||
|
||||
clean-rocks:
|
||||
rm -f $(BENCHMARKS) $(TOOLS) $(TESTS) $(PARALLEL_TEST) $(LIBRARY) $(SHARED)
|
||||
rm -rf $(CLEAN_FILES) ios-x86 ios-arm scan_build_report
|
||||
$(FIND) . -name "*.[oda]" -exec rm -f {} \;
|
||||
$(FIND) . -type f -regex ".*\.\(\(gcda\)\|\(gcno\)\)" -exec rm {} \;
|
||||
rm -rf bzip2* snappy* zlib* lz4* zstd*
|
||||
cd java; $(MAKE) clean
|
||||
|
||||
clean-ext-libraries-all:
|
||||
rm -rf bzip2* snappy* zlib* lz4* zstd*
|
||||
|
||||
clean-ext-libraries-bin:
|
||||
find . -maxdepth 1 -type d \( -name bzip2\* -or -name snappy\* -or -name zlib\* -or -name lz4\* -or -name zstd\* \) -prune -exec rm -rf {} \;
|
||||
|
||||
tags:
|
||||
ctags -R .
|
||||
cscope -b `$(FIND) . -name '*.cc'` `$(FIND) . -name '*.h'` `$(FIND) . -name '*.c'`
|
||||
@ -1713,7 +1722,7 @@ JAVA_INCLUDE = -I$(JAVA_HOME)/include/ -I$(JAVA_HOME)/include/linux
|
||||
ifeq ($(PLATFORM), OS_SOLARIS)
|
||||
ARCH := $(shell isainfo -b)
|
||||
else ifeq ($(PLATFORM), OS_OPENBSD)
|
||||
ifneq (,$(filter $(MACHINE), amd64 arm64 aarch64 sparc64))
|
||||
ifneq (,$(filter amd64 ppc64 ppc64le arm64 aarch64 sparc64, $(MACHINE)))
|
||||
ARCH := 64
|
||||
else
|
||||
ARCH := 32
|
||||
@ -1722,10 +1731,10 @@ else
|
||||
ARCH := $(shell getconf LONG_BIT)
|
||||
endif
|
||||
|
||||
ifeq (,$(filter $(MACHINE), ppc arm64 aarch64 sparc64))
|
||||
ROCKSDBJNILIB = librocksdbjni-linux$(ARCH).so
|
||||
ifneq (,$(filter ppc% arm64 aarch64 sparc64, $(MACHINE)))
|
||||
ROCKSDBJNILIB = librocksdbjni-linux-$(MACHINE).so
|
||||
else
|
||||
ROCKSDBJNILIB = librocksdbjni-linux-$(MACHINE).so
|
||||
ROCKSDBJNILIB = librocksdbjni-linux$(ARCH).so
|
||||
endif
|
||||
ROCKSDB_JAR = rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH)-linux$(ARCH).jar
|
||||
ROCKSDB_JAR_ALL = rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH).jar
|
||||
@ -1742,11 +1751,11 @@ BZIP2_DOWNLOAD_BASE ?= https://downloads.sourceforge.net/project/bzip2
|
||||
SNAPPY_VER ?= 1.1.7
|
||||
SNAPPY_SHA256 ?= 3dfa02e873ff51a11ee02b9ca391807f0c8ea0529a4924afa645fbf97163f9d4
|
||||
SNAPPY_DOWNLOAD_BASE ?= https://github.com/google/snappy/archive
|
||||
LZ4_VER ?= 1.8.3
|
||||
LZ4_SHA256 ?= 33af5936ac06536805f9745e0b6d61da606a1f8b4cc5c04dd3cbaca3b9b4fc43
|
||||
LZ4_VER ?= 1.9.2
|
||||
LZ4_SHA256 ?= 658ba6191fa44c92280d4aa2c271b0f4fbc0e34d249578dd05e50e76d0e5efcc
|
||||
LZ4_DOWNLOAD_BASE ?= https://github.com/lz4/lz4/archive
|
||||
ZSTD_VER ?= 1.4.0
|
||||
ZSTD_SHA256 ?= 63be339137d2b683c6d19a9e34f4fb684790e864fee13c7dd40e197a64c705c1
|
||||
ZSTD_VER ?= 1.4.4
|
||||
ZSTD_SHA256 ?= a364f5162c7d1a455cc915e8e3cf5f4bd8b75d09bc0f53965b0c9ca1383c52c8
|
||||
ZSTD_DOWNLOAD_BASE ?= https://github.com/facebook/zstd/archive
|
||||
CURL_SSL_OPTS ?= --tlsv1
|
||||
|
||||
@ -1913,23 +1922,19 @@ rocksdbjavastaticreleasedocker: rocksdbjavastatic rocksdbjavastaticdockerx86 roc
|
||||
|
||||
rocksdbjavastaticdockerx86:
|
||||
mkdir -p java/target
|
||||
docker run --rm --name rocksdb_linux_x86-be --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host --env DEBUG_LEVEL=$(DEBUG_LEVEL) evolvedbinary/rocksjava:centos6_x86-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh
|
||||
docker run --rm --name rocksdb_linux_x86-be --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host:ro --volume /rocksdb-local-build --volume `pwd`/java/target:/rocksdb-java-target --env DEBUG_LEVEL=$(DEBUG_LEVEL) evolvedbinary/rocksjava:centos6_x86-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh
|
||||
|
||||
rocksdbjavastaticdockerx86_64:
|
||||
mkdir -p java/target
|
||||
docker run --rm --name rocksdb_linux_x64-be --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host --env DEBUG_LEVEL=$(DEBUG_LEVEL) evolvedbinary/rocksjava:centos6_x64-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh
|
||||
docker run --rm --name rocksdb_linux_x64-be --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host:ro --volume /rocksdb-local-build --volume `pwd`/java/target:/rocksdb-java-target --env DEBUG_LEVEL=$(DEBUG_LEVEL) evolvedbinary/rocksjava:centos6_x64-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh
|
||||
|
||||
rocksdbjavastaticdockerppc64le:
|
||||
mkdir -p java/target
|
||||
docker run --rm --name rocksdb_linux_ppc64le-be --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host --env DEBUG_LEVEL=$(DEBUG_LEVEL) evolvedbinary/rocksjava:centos7_ppc64le-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh
|
||||
docker run --rm --name rocksdb_linux_ppc64le-be --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host:ro --volume /rocksdb-local-build --volume `pwd`/java/target:/rocksdb-java-target --env DEBUG_LEVEL=$(DEBUG_LEVEL) evolvedbinary/rocksjava:centos7_ppc64le-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh
|
||||
|
||||
rocksdbjavastaticdockerarm64v8:
|
||||
mkdir -p java/target
|
||||
DOCKER_LINUX_ARM64V8_CONTAINER=`docker ps -aqf name=rocksdb_linux_arm64v8-be`; \
|
||||
if [ -z "$$DOCKER_LINUX_ARM64V8_CONTAINER" ]; then \
|
||||
docker container create --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host --name rocksdb_linux_arm64v8-be evolvedbinary/rocksjava:centos7_arm64v8-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh; \
|
||||
fi
|
||||
docker start -a rocksdb_linux_arm64v8-be
|
||||
docker run --rm --name rocksdb_linux_arm64v8-be --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host:ro --volume /rocksdb-local-build --volume `pwd`/java/target:/rocksdb-java-target --env DEBUG_LEVEL=$(DEBUG_LEVEL) evolvedbinary/rocksjava:centos7_arm64v8-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh
|
||||
|
||||
rocksdbjavastaticpublish: rocksdbjavastaticrelease rocksdbjavastaticpublishcentral
|
||||
|
||||
|
@ -188,6 +188,11 @@ Status CheckCFPathsSupported(const DBOptions& db_options,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
namespace {
|
||||
const uint64_t kDefaultTtl = 0xfffffffffffffffe;
|
||||
const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
|
||||
}; // namespace
|
||||
|
||||
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
|
||||
const ColumnFamilyOptions& src) {
|
||||
ColumnFamilyOptions result = src;
|
||||
@ -343,8 +348,20 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
|
||||
result.max_compaction_bytes = result.target_file_size_base * 25;
|
||||
}
|
||||
|
||||
const uint64_t kDefaultPeriodicCompSecs = 0xffffffffffffffff;
|
||||
const uint64_t kDefaultTtlSecs = 30 * 24 * 60 * 60;
|
||||
bool is_block_based_table =
|
||||
(result.table_factory->Name() == BlockBasedTableFactory().Name());
|
||||
|
||||
const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
|
||||
if (result.ttl == kDefaultTtl) {
|
||||
if (is_block_based_table &&
|
||||
result.compaction_style != kCompactionStyleFIFO) {
|
||||
result.ttl = kAdjustedTtl;
|
||||
} else {
|
||||
result.ttl = 0;
|
||||
}
|
||||
}
|
||||
|
||||
const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
|
||||
|
||||
// Turn on periodic compactions and set them to occur once every 30 days if
|
||||
// compaction filters are used and periodic_compaction_seconds is set to the
|
||||
@ -352,16 +369,19 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
|
||||
if (result.compaction_style != kCompactionStyleFIFO) {
|
||||
if ((result.compaction_filter != nullptr ||
|
||||
result.compaction_filter_factory != nullptr) &&
|
||||
result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
|
||||
result.periodic_compaction_seconds = kDefaultTtlSecs;
|
||||
result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
|
||||
is_block_based_table) {
|
||||
result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
|
||||
}
|
||||
} else {
|
||||
// result.compaction_style == kCompactionStyleFIFO
|
||||
if (result.ttl == 0) {
|
||||
if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
|
||||
result.periodic_compaction_seconds = kDefaultTtlSecs;
|
||||
if (is_block_based_table) {
|
||||
if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
|
||||
result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
|
||||
}
|
||||
result.ttl = result.periodic_compaction_seconds;
|
||||
}
|
||||
result.ttl = result.periodic_compaction_seconds;
|
||||
} else if (result.periodic_compaction_seconds != 0) {
|
||||
result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
|
||||
}
|
||||
@ -379,6 +399,10 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
|
||||
}
|
||||
}
|
||||
|
||||
if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
|
||||
result.periodic_compaction_seconds = 0;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -1209,7 +1233,7 @@ Status ColumnFamilyData::ValidateOptions(
|
||||
return s;
|
||||
}
|
||||
|
||||
if (cf_options.ttl > 0) {
|
||||
if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
|
||||
if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
|
||||
return Status::NotSupported(
|
||||
"TTL is only supported in Block-Based Table format. ");
|
||||
@ -1217,7 +1241,7 @@ Status ColumnFamilyData::ValidateOptions(
|
||||
}
|
||||
|
||||
if (cf_options.periodic_compaction_seconds > 0 &&
|
||||
cf_options.periodic_compaction_seconds < port::kMaxUint64) {
|
||||
cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
|
||||
if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
|
||||
return Status::NotSupported(
|
||||
"Periodic Compaction is only supported in "
|
||||
|
@ -57,6 +57,8 @@ class CompactionPickerTest : public testing::Test {
|
||||
log_buffer_(InfoLogLevel::INFO_LEVEL, &logger_),
|
||||
file_num_(1),
|
||||
vstorage_(nullptr) {
|
||||
mutable_cf_options_.ttl = 0;
|
||||
mutable_cf_options_.periodic_compaction_seconds = 0;
|
||||
// ioptions_.compaction_pri = kMinOverlappingRatio has its own set of
|
||||
// tests to cover.
|
||||
ioptions_.compaction_pri = kByCompensatedSize;
|
||||
|
@ -3911,7 +3911,7 @@ TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
|
||||
for (CompactionFilterType comp_filter_type :
|
||||
{kUseCompactionFilter, kUseCompactionFilterFactory}) {
|
||||
// Assert that periodic compactions are not enabled.
|
||||
ASSERT_EQ(port::kMaxUint64, options.periodic_compaction_seconds);
|
||||
ASSERT_EQ(port::kMaxUint64 - 1, options.periodic_compaction_seconds);
|
||||
|
||||
if (comp_filter_type == kUseCompactionFilter) {
|
||||
options.compaction_filter = &test_compaction_filter;
|
||||
|
@ -635,6 +635,21 @@ TEST_F(DBOptionsTest, SanitizeUniversalTTLCompaction) {
|
||||
ASSERT_EQ(100, dbfull()->GetOptions().periodic_compaction_seconds);
|
||||
}
|
||||
|
||||
TEST_F(DBOptionsTest, SanitizeTtlDefault) {
|
||||
Options options;
|
||||
Reopen(options);
|
||||
ASSERT_EQ(30 * 24 * 60 * 60, dbfull()->GetOptions().ttl);
|
||||
|
||||
options.compaction_style = kCompactionStyleLevel;
|
||||
options.ttl = 0;
|
||||
Reopen(options);
|
||||
ASSERT_EQ(0, dbfull()->GetOptions().ttl);
|
||||
|
||||
options.ttl = 100;
|
||||
Reopen(options);
|
||||
ASSERT_EQ(100, dbfull()->GetOptions().ttl);
|
||||
}
|
||||
|
||||
TEST_F(DBOptionsTest, SanitizeFIFOPeriodicCompaction) {
|
||||
Options options;
|
||||
options.compaction_style = kCompactionStyleFIFO;
|
||||
|
@ -3311,6 +3311,22 @@ TEST_F(DBTest, FIFOCompactionStyleWithCompactionAndDelete) {
|
||||
}
|
||||
}
|
||||
|
||||
// Check that FIFO-with-TTL is not supported with max_open_files != -1.
|
||||
TEST_F(DBTest, FIFOCompactionWithTTLAndMaxOpenFilesTest) {
|
||||
Options options;
|
||||
options.compaction_style = kCompactionStyleFIFO;
|
||||
options.create_if_missing = true;
|
||||
options.ttl = 600; // seconds
|
||||
|
||||
// TTL is now supported with max_open_files != -1.
|
||||
options.max_open_files = 100;
|
||||
options = CurrentOptions(options);
|
||||
ASSERT_OK(TryReopen(options));
|
||||
|
||||
options.max_open_files = -1;
|
||||
ASSERT_OK(TryReopen(options));
|
||||
}
|
||||
|
||||
// Check that FIFO-with-TTL is supported only with BlockBasedTableFactory.
|
||||
TEST_F(DBTest, FIFOCompactionWithTTLAndVariousTableFormatsTest) {
|
||||
Options options;
|
||||
@ -4812,6 +4828,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
|
||||
// Even more FIFOCompactionTests are at DBTest.FIFOCompaction* .
|
||||
TEST_F(DBTest, DynamicFIFOCompactionOptions) {
|
||||
Options options;
|
||||
options.ttl = 0;
|
||||
options.create_if_missing = true;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
@ -6165,6 +6182,19 @@ TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, CreateColumnFamilyShouldFailOnIncompatibleOptions) {
|
||||
Options options = CurrentOptions();
|
||||
options.max_open_files = 100;
|
||||
Reopen(options);
|
||||
|
||||
ColumnFamilyOptions cf_options(options);
|
||||
// ttl is now supported when max_open_files is -1.
|
||||
cf_options.ttl = 3600;
|
||||
ColumnFamilyHandle* handle;
|
||||
ASSERT_OK(db_->CreateColumnFamily(cf_options, "pikachu", &handle));
|
||||
delete handle;
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_F(DBTest, RowCache) {
|
||||
Options options = CurrentOptions();
|
||||
|
@ -2166,9 +2166,10 @@ TEST_F(DBTestUniversalCompaction2, PeriodicCompactionDefault) {
|
||||
ASSERT_EQ(30 * 24 * 60 * 60,
|
||||
dbfull()->GetOptions().periodic_compaction_seconds);
|
||||
|
||||
options.ttl = 60 * 24 * 60 * 60;
|
||||
options.compaction_filter = nullptr;
|
||||
Reopen(options);
|
||||
ASSERT_EQ(options.periodic_compaction_seconds,
|
||||
ASSERT_EQ(60 * 24 * 60 * 60,
|
||||
dbfull()->GetOptions().periodic_compaction_seconds);
|
||||
}
|
||||
|
||||
|
@ -2442,8 +2442,7 @@ void VersionStorageInfo::ComputeCompactionScore(
|
||||
if (mutable_cf_options.ttl > 0) {
|
||||
ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
|
||||
}
|
||||
if (mutable_cf_options.periodic_compaction_seconds > 0 &&
|
||||
mutable_cf_options.periodic_compaction_seconds < port::kMaxUint64) {
|
||||
if (mutable_cf_options.periodic_compaction_seconds > 0) {
|
||||
ComputeFilesMarkedForPeriodicCompaction(
|
||||
immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
|
||||
}
|
||||
@ -2514,12 +2513,12 @@ void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
|
||||
}
|
||||
const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
|
||||
|
||||
assert(periodic_compaction_seconds <= current_time);
|
||||
// Disable periodic compaction if periodic_compaction_seconds > current_time.
|
||||
// This also help handle the underflow case.
|
||||
// If periodic_compaction_seconds > current_time, no file possibly qualifies
|
||||
// periodic compaction.
|
||||
if (periodic_compaction_seconds > current_time) {
|
||||
return;
|
||||
}
|
||||
|
||||
const uint64_t allowed_time_limit =
|
||||
current_time - periodic_compaction_seconds;
|
||||
|
||||
|
@ -654,11 +654,15 @@ struct AdvancedColumnFamilyOptions {
|
||||
// unit: seconds. Ex: 1 day = 1 * 24 * 60 * 60
|
||||
// In FIFO, this option will have the same meaning as
|
||||
// periodic_compaction_seconds. Whichever stricter will be used.
|
||||
// 0 means disabling.
|
||||
// UINT64_MAX - 1 (0xfffffffffffffffe) is special flag to allow RocksDB to
|
||||
// pick default.
|
||||
//
|
||||
// Default: 0 (disabled)
|
||||
// Default: 30 days for leveled compaction + block based table. disable
|
||||
// otherwise.
|
||||
//
|
||||
// Dynamically changeable through SetOptions() API
|
||||
uint64_t ttl = 0;
|
||||
uint64_t ttl = 0xfffffffffffffffe;
|
||||
|
||||
// Files older than this value will be picked up for compaction, and
|
||||
// re-written to the same level as they were before.
|
||||
@ -676,7 +680,7 @@ struct AdvancedColumnFamilyOptions {
|
||||
//
|
||||
// Values:
|
||||
// 0: Turn off Periodic compactions.
|
||||
// UINT64_MAX (i.e 0xffffffffffffffff): Let RocksDB control this feature
|
||||
// UINT64_MAX - 1 (i.e 0xfffffffffffffffe): Let RocksDB control this feature
|
||||
// as needed. For now, RocksDB will change this value to 30 days
|
||||
// (i.e 30 * 24 * 60 * 60) so that every file goes through the compaction
|
||||
// process at least once every 30 days if not compacted sooner.
|
||||
@ -684,10 +688,10 @@ struct AdvancedColumnFamilyOptions {
|
||||
// when this value is left default, and ttl is left to 0, 30 days will be
|
||||
// used. Otherwise, min(ttl, periodic_compaction_seconds) will be used.
|
||||
//
|
||||
// Default: UINT64_MAX (allow RocksDB to auto-tune)
|
||||
// Default: UINT64_MAX - 1 (allow RocksDB to auto-tune)
|
||||
//
|
||||
// Dynamically changeable through SetOptions() API
|
||||
uint64_t periodic_compaction_seconds = 0xffffffffffffffff;
|
||||
uint64_t periodic_compaction_seconds = 0xfffffffffffffffe;
|
||||
|
||||
// If this option is set then 1 in N blocks are compressed
|
||||
// using a fast (lz4) and slow (zstd) compression algorithm.
|
||||
|
@ -151,8 +151,12 @@ class FilterPolicy {
|
||||
// Return a new filter policy that uses a bloom filter with approximately
|
||||
// the specified number of bits per key.
|
||||
//
|
||||
// bits_per_key: bits per key in bloom filter. A good value for bits_per_key
|
||||
// is 10, which yields a filter with ~ 1% false positive rate.
|
||||
// bits_per_key: average bits allocated per key in bloom filter. A good
|
||||
// choice is 9.9, which yields a filter with ~ 1% false positive rate.
|
||||
// When format_version < 5, the value will be rounded to the nearest
|
||||
// integer. Recommend using no more than three decimal digits after the
|
||||
// decimal point, as in 6.667.
|
||||
//
|
||||
// use_block_based_builder: use deprecated block based filter (true) rather
|
||||
// than full or partitioned filter (false).
|
||||
//
|
||||
@ -167,5 +171,5 @@ class FilterPolicy {
|
||||
// FilterPolicy (like NewBloomFilterPolicy) that does not ignore
|
||||
// trailing spaces in keys.
|
||||
extern const FilterPolicy* NewBloomFilterPolicy(
|
||||
int bits_per_key, bool use_block_based_builder = false);
|
||||
double bits_per_key, bool use_block_based_builder = false);
|
||||
} // namespace rocksdb
|
||||
|
@ -32,7 +32,7 @@ export PATH=$JAVA_HOME:/usr/local/bin:$PATH
|
||||
|
||||
# build rocksdb
|
||||
cd /rocksdb
|
||||
scl enable devtoolset-2 'make jclean clean'
|
||||
scl enable devtoolset-2 'make clean-not-downloaded'
|
||||
scl enable devtoolset-2 'PORTABLE=1 make -j8 rocksdbjavastatic'
|
||||
cp /rocksdb/java/target/librocksdbjni-* /rocksdb-build
|
||||
cp /rocksdb/java/target/rocksdbjni-* /rocksdb-build
|
||||
|
@ -4,26 +4,31 @@
|
||||
set -e
|
||||
#set -x
|
||||
|
||||
rm -rf /rocksdb-local
|
||||
cp -r /rocksdb-host /rocksdb-local
|
||||
cd /rocksdb-local
|
||||
# just in-case this is run outside Docker
|
||||
mkdir -p /rocksdb-local-build
|
||||
|
||||
# Use scl devtoolset if available (i.e. CentOS <7)
|
||||
rm -rf /rocksdb-local-build/*
|
||||
cp -r /rocksdb-host/* /rocksdb-local-build
|
||||
cd /rocksdb-local-build
|
||||
|
||||
# Use scl devtoolset if available
|
||||
if hash scl 2>/dev/null; then
|
||||
if scl --list | grep -q 'devtoolset-7'; then
|
||||
scl enable devtoolset-7 'make jclean clean'
|
||||
# CentOS 7+
|
||||
scl enable devtoolset-7 'make clean-not-downloaded'
|
||||
scl enable devtoolset-7 'PORTABLE=1 make -j2 rocksdbjavastatic'
|
||||
elif scl --list | grep -q 'devtoolset-2'; then
|
||||
scl enable devtoolset-2 'make jclean clean'
|
||||
# CentOS 5 or 6
|
||||
scl enable devtoolset-2 'make clean-not-downloaded'
|
||||
scl enable devtoolset-2 'PORTABLE=1 make -j2 rocksdbjavastatic'
|
||||
else
|
||||
echo "Could not find devtoolset"
|
||||
exit 1;
|
||||
fi
|
||||
else
|
||||
make jclean clean
|
||||
make clean-not-downloaded
|
||||
PORTABLE=1 make -j2 rocksdbjavastatic
|
||||
fi
|
||||
|
||||
cp java/target/librocksdbjni-linux*.so java/target/rocksdbjni-*-linux*.jar /rocksdb-host/java/target
|
||||
cp java/target/librocksdbjni-linux*.so java/target/rocksdbjni-*-linux*.jar /rocksdb-java-target
|
||||
|
||||
|
@ -19,10 +19,10 @@
|
||||
/*
|
||||
* Class: org_rocksdb_BloomFilter
|
||||
* Method: createBloomFilter
|
||||
* Signature: (IZ)J
|
||||
* Signature: (DZ)J
|
||||
*/
|
||||
jlong Java_org_rocksdb_BloomFilter_createNewBloomFilter(
|
||||
JNIEnv* /*env*/, jclass /*jcls*/, jint bits_per_key,
|
||||
JNIEnv* /*env*/, jclass /*jcls*/, jdouble bits_per_key,
|
||||
jboolean use_block_base_builder) {
|
||||
auto* sptr_filter = new std::shared_ptr<const rocksdb::FilterPolicy>(
|
||||
rocksdb::NewBloomFilterPolicy(bits_per_key, use_block_base_builder));
|
||||
|
@ -20,7 +20,7 @@ package org.rocksdb;
|
||||
*/
|
||||
public class BloomFilter extends Filter {
|
||||
|
||||
private static final int DEFAULT_BITS_PER_KEY = 10;
|
||||
private static final double DEFAULT_BITS_PER_KEY = 10.0;
|
||||
private static final boolean DEFAULT_MODE = true;
|
||||
|
||||
/**
|
||||
@ -39,7 +39,7 @@ public class BloomFilter extends Filter {
|
||||
*
|
||||
* <p>
|
||||
* bits_per_key: bits per key in bloom filter. A good value for bits_per_key
|
||||
* is 10, which yields a filter with ~ 1% false positive rate.
|
||||
* is 9.9, which yields a filter with ~ 1% false positive rate.
|
||||
* </p>
|
||||
* <p>
|
||||
* Callers must delete the result after any database that is using the
|
||||
@ -47,7 +47,7 @@ public class BloomFilter extends Filter {
|
||||
*
|
||||
* @param bitsPerKey number of bits to use
|
||||
*/
|
||||
public BloomFilter(final int bitsPerKey) {
|
||||
public BloomFilter(final double bitsPerKey) {
|
||||
this(bitsPerKey, DEFAULT_MODE);
|
||||
}
|
||||
|
||||
@ -70,10 +70,10 @@ public class BloomFilter extends Filter {
|
||||
* @param bitsPerKey number of bits to use
|
||||
* @param useBlockBasedMode use block based mode or full filter mode
|
||||
*/
|
||||
public BloomFilter(final int bitsPerKey, final boolean useBlockBasedMode) {
|
||||
public BloomFilter(final double bitsPerKey, final boolean useBlockBasedMode) {
|
||||
super(createNewBloomFilter(bitsPerKey, useBlockBasedMode));
|
||||
}
|
||||
|
||||
private native static long createNewBloomFilter(final int bitsKeyKey,
|
||||
private native static long createNewBloomFilter(final double bitsKeyKey,
|
||||
final boolean useBlockBasedMode);
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ namespace rocksdb {
|
||||
// of DB. Raw pointers defined in this struct do not have ownership to the data
|
||||
// they point to. Options contains std::shared_ptr to these data.
|
||||
struct ImmutableCFOptions {
|
||||
ImmutableCFOptions();
|
||||
explicit ImmutableCFOptions(const Options& options);
|
||||
|
||||
ImmutableCFOptions(const ImmutableDBOptions& db_options,
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "rocksdb/memtablerep.h"
|
||||
#include "rocksdb/utilities/leveldb_options.h"
|
||||
#include "rocksdb/utilities/object_registry.h"
|
||||
#include "table/block_based/filter_policy_internal.h"
|
||||
#include "test_util/testharness.h"
|
||||
#include "test_util/testutil.h"
|
||||
#include "util/random.h"
|
||||
@ -515,13 +516,15 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
||||
BlockBasedTableOptions table_opt;
|
||||
BlockBasedTableOptions new_opt;
|
||||
// make sure default values are overwritten by something else
|
||||
ASSERT_OK(GetBlockBasedTableOptionsFromString(table_opt,
|
||||
"cache_index_and_filter_blocks=1;index_type=kHashSearch;"
|
||||
"checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;"
|
||||
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
|
||||
"block_size_deviation=8;block_restart_interval=4;"
|
||||
"filter_policy=bloomfilter:4:true;whole_key_filtering=1;",
|
||||
&new_opt));
|
||||
ASSERT_OK(GetBlockBasedTableOptionsFromString(
|
||||
table_opt,
|
||||
"cache_index_and_filter_blocks=1;index_type=kHashSearch;"
|
||||
"checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;"
|
||||
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
|
||||
"block_size_deviation=8;block_restart_interval=4;"
|
||||
"format_version=5;whole_key_filtering=1;"
|
||||
"filter_policy=bloomfilter:4.567:false;",
|
||||
&new_opt));
|
||||
ASSERT_TRUE(new_opt.cache_index_and_filter_blocks);
|
||||
ASSERT_EQ(new_opt.index_type, BlockBasedTableOptions::kHashSearch);
|
||||
ASSERT_EQ(new_opt.checksum, ChecksumType::kxxHash);
|
||||
@ -534,7 +537,13 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
||||
ASSERT_EQ(new_opt.block_size, 1024UL);
|
||||
ASSERT_EQ(new_opt.block_size_deviation, 8);
|
||||
ASSERT_EQ(new_opt.block_restart_interval, 4);
|
||||
ASSERT_EQ(new_opt.format_version, 5U);
|
||||
ASSERT_EQ(new_opt.whole_key_filtering, true);
|
||||
ASSERT_TRUE(new_opt.filter_policy != nullptr);
|
||||
const BloomFilterPolicy& bfp =
|
||||
dynamic_cast<const BloomFilterPolicy&>(*new_opt.filter_policy);
|
||||
EXPECT_EQ(bfp.GetMillibitsPerKey(), 4567);
|
||||
EXPECT_EQ(bfp.GetWholeBitsPerKey(), 5);
|
||||
|
||||
// unknown option
|
||||
ASSERT_NOK(GetBlockBasedTableOptionsFromString(table_opt,
|
||||
|
@ -510,8 +510,8 @@ std::string ParseBlockBasedTableOption(const std::string& name,
|
||||
if (pos == std::string::npos) {
|
||||
return "Invalid filter policy config, missing bits_per_key";
|
||||
}
|
||||
int bits_per_key =
|
||||
ParseInt(trim(value.substr(kName.size(), pos - kName.size())));
|
||||
double bits_per_key =
|
||||
ParseDouble(trim(value.substr(kName.size(), pos - kName.size())));
|
||||
bool use_block_based_builder =
|
||||
ParseBoolean("use_block_based_builder", trim(value.substr(pos + 1)));
|
||||
new_options->filter_policy.reset(
|
||||
|
@ -27,9 +27,10 @@ namespace {
|
||||
// See description in FastLocalBloomImpl
|
||||
class FastLocalBloomBitsBuilder : public BuiltinFilterBitsBuilder {
|
||||
public:
|
||||
FastLocalBloomBitsBuilder(const int bits_per_key, const int num_probes)
|
||||
: bits_per_key_(bits_per_key), num_probes_(num_probes) {
|
||||
assert(bits_per_key_);
|
||||
FastLocalBloomBitsBuilder(const int millibits_per_key)
|
||||
: millibits_per_key_(millibits_per_key),
|
||||
num_probes_(FastLocalBloomImpl::ChooseNumProbes(millibits_per_key_)) {
|
||||
assert(millibits_per_key >= 1000);
|
||||
}
|
||||
|
||||
// No Copy allowed
|
||||
@ -77,14 +78,15 @@ class FastLocalBloomBitsBuilder : public BuiltinFilterBitsBuilder {
|
||||
|
||||
int CalculateNumEntry(const uint32_t bytes) override {
|
||||
uint32_t bytes_no_meta = bytes >= 5u ? bytes - 5u : 0;
|
||||
return static_cast<int>(uint64_t{8} * bytes_no_meta / bits_per_key_);
|
||||
return static_cast<int>(uint64_t{8000} * bytes_no_meta /
|
||||
millibits_per_key_);
|
||||
}
|
||||
|
||||
uint32_t CalculateSpace(const int num_entry) override {
|
||||
uint32_t num_cache_lines = 0;
|
||||
if (bits_per_key_ > 0 && num_entry > 0) {
|
||||
if (millibits_per_key_ > 0 && num_entry > 0) {
|
||||
num_cache_lines = static_cast<uint32_t>(
|
||||
(int64_t{num_entry} * bits_per_key_ + 511) / 512);
|
||||
(int64_t{num_entry} * millibits_per_key_ + 511999) / 512000);
|
||||
}
|
||||
return num_cache_lines * 64 + /*metadata*/ 5;
|
||||
}
|
||||
@ -136,7 +138,7 @@ class FastLocalBloomBitsBuilder : public BuiltinFilterBitsBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
int bits_per_key_;
|
||||
int millibits_per_key_;
|
||||
int num_probes_;
|
||||
std::vector<uint64_t> hash_entries_;
|
||||
};
|
||||
@ -187,7 +189,7 @@ using LegacyBloomImpl = LegacyLocalityBloomImpl</*ExtraRotates*/ false>;
|
||||
|
||||
class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder {
|
||||
public:
|
||||
explicit LegacyBloomBitsBuilder(const int bits_per_key, const int num_probes);
|
||||
explicit LegacyBloomBitsBuilder(const int bits_per_key);
|
||||
|
||||
// No Copy allowed
|
||||
LegacyBloomBitsBuilder(const LegacyBloomBitsBuilder&) = delete;
|
||||
@ -208,7 +210,6 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder {
|
||||
}
|
||||
|
||||
private:
|
||||
friend class FullFilterBlockTest_DuplicateEntries_Test;
|
||||
int bits_per_key_;
|
||||
int num_probes_;
|
||||
std::vector<uint32_t> hash_entries_;
|
||||
@ -228,9 +229,9 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder {
|
||||
void AddHash(uint32_t h, char* data, uint32_t num_lines, uint32_t total_bits);
|
||||
};
|
||||
|
||||
LegacyBloomBitsBuilder::LegacyBloomBitsBuilder(const int bits_per_key,
|
||||
const int num_probes)
|
||||
: bits_per_key_(bits_per_key), num_probes_(num_probes) {
|
||||
LegacyBloomBitsBuilder::LegacyBloomBitsBuilder(const int bits_per_key)
|
||||
: bits_per_key_(bits_per_key),
|
||||
num_probes_(LegacyNoLocalityBloomImpl::ChooseNumProbes(bits_per_key_)) {
|
||||
assert(bits_per_key_);
|
||||
}
|
||||
|
||||
@ -412,12 +413,24 @@ const std::vector<BloomFilterPolicy::Mode> BloomFilterPolicy::kAllUserModes = {
|
||||
kAuto,
|
||||
};
|
||||
|
||||
BloomFilterPolicy::BloomFilterPolicy(int bits_per_key, Mode mode)
|
||||
: bits_per_key_(bits_per_key), mode_(mode) {
|
||||
// We intentionally round down to reduce probing cost a little bit
|
||||
num_probes_ = static_cast<int>(bits_per_key_ * 0.69); // 0.69 =~ ln(2)
|
||||
if (num_probes_ < 1) num_probes_ = 1;
|
||||
if (num_probes_ > 30) num_probes_ = 30;
|
||||
BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode)
|
||||
: mode_(mode) {
|
||||
// Sanitize bits_per_key
|
||||
if (bits_per_key < 1.0) {
|
||||
bits_per_key = 1.0;
|
||||
} else if (!(bits_per_key < 100.0)) { // including NaN
|
||||
bits_per_key = 100.0;
|
||||
}
|
||||
|
||||
// Includes a nudge toward rounding up, to ensure on all platforms
|
||||
// that doubles specified with three decimal digits after the decimal
|
||||
// point are interpreted accurately.
|
||||
millibits_per_key_ = static_cast<int>(bits_per_key * 1000.0 + 0.500001);
|
||||
|
||||
// For better or worse, this is a rounding up of a nudged rounding up,
|
||||
// e.g. 7.4999999999999 will round up to 8, but that provides more
|
||||
// predictability against small arithmetic errors in floating point.
|
||||
whole_bits_per_key_ = (millibits_per_key_ + 500) / 1000;
|
||||
}
|
||||
|
||||
BloomFilterPolicy::~BloomFilterPolicy() {}
|
||||
@ -433,7 +446,7 @@ void BloomFilterPolicy::CreateFilter(const Slice* keys, int n,
|
||||
assert(mode_ == kDeprecatedBlock);
|
||||
|
||||
// Compute bloom filter size (in both bits and bytes)
|
||||
uint32_t bits = static_cast<uint32_t>(n * bits_per_key_);
|
||||
uint32_t bits = static_cast<uint32_t>(n * whole_bits_per_key_);
|
||||
|
||||
// For small n, we can see a very high false positive rate. Fix it
|
||||
// by enforcing a minimum bloom filter length.
|
||||
@ -442,12 +455,15 @@ void BloomFilterPolicy::CreateFilter(const Slice* keys, int n,
|
||||
uint32_t bytes = (bits + 7) / 8;
|
||||
bits = bytes * 8;
|
||||
|
||||
int num_probes =
|
||||
LegacyNoLocalityBloomImpl::ChooseNumProbes(whole_bits_per_key_);
|
||||
|
||||
const size_t init_size = dst->size();
|
||||
dst->resize(init_size + bytes, 0);
|
||||
dst->push_back(static_cast<char>(num_probes_)); // Remember # of probes
|
||||
dst->push_back(static_cast<char>(num_probes)); // Remember # of probes
|
||||
char* array = &(*dst)[init_size];
|
||||
for (int i = 0; i < n; i++) {
|
||||
LegacyNoLocalityBloomImpl::AddHash(BloomHash(keys[i]), bits, num_probes_,
|
||||
LegacyNoLocalityBloomImpl::AddHash(BloomHash(keys[i]), bits, num_probes,
|
||||
array);
|
||||
}
|
||||
}
|
||||
@ -470,7 +486,7 @@ bool BloomFilterPolicy::KeyMayMatch(const Slice& key,
|
||||
// Consider it a match.
|
||||
return true;
|
||||
}
|
||||
// NB: using k not num_probes_
|
||||
// NB: using stored k not num_probes for whole_bits_per_key_
|
||||
return LegacyNoLocalityBloomImpl::HashMayMatch(BloomHash(key), bits, k,
|
||||
array);
|
||||
}
|
||||
@ -504,9 +520,9 @@ FilterBitsBuilder* BloomFilterPolicy::GetFilterBitsBuilderInternal(
|
||||
case kDeprecatedBlock:
|
||||
return nullptr;
|
||||
case kFastLocalBloom:
|
||||
return new FastLocalBloomBitsBuilder(bits_per_key_, num_probes_);
|
||||
return new FastLocalBloomBitsBuilder(millibits_per_key_);
|
||||
case kLegacyBloom:
|
||||
return new LegacyBloomBitsBuilder(bits_per_key_, num_probes_);
|
||||
return new LegacyBloomBitsBuilder(whole_bits_per_key_);
|
||||
}
|
||||
}
|
||||
assert(false);
|
||||
@ -649,7 +665,7 @@ FilterBitsReader* BloomFilterPolicy::GetBloomBitsReader(
|
||||
return new AlwaysTrueFilter();
|
||||
}
|
||||
|
||||
const FilterPolicy* NewBloomFilterPolicy(int bits_per_key,
|
||||
const FilterPolicy* NewBloomFilterPolicy(double bits_per_key,
|
||||
bool use_block_based_builder) {
|
||||
BloomFilterPolicy::Mode m;
|
||||
if (use_block_based_builder) {
|
||||
|
@ -91,7 +91,7 @@ class BloomFilterPolicy : public FilterPolicy {
|
||||
// tests should prefer using NewBloomFilterPolicy (user-exposed).
|
||||
static const std::vector<Mode> kAllUserModes;
|
||||
|
||||
explicit BloomFilterPolicy(int bits_per_key, Mode mode);
|
||||
explicit BloomFilterPolicy(double bits_per_key, Mode mode);
|
||||
|
||||
~BloomFilterPolicy() override;
|
||||
|
||||
@ -111,6 +111,11 @@ class BloomFilterPolicy : public FilterPolicy {
|
||||
// chosen for this BloomFilterPolicy. Not compatible with CreateFilter.
|
||||
FilterBitsReader* GetFilterBitsReader(const Slice& contents) const override;
|
||||
|
||||
// Essentially for testing only: configured millibits/key
|
||||
int GetMillibitsPerKey() const { return millibits_per_key_; }
|
||||
// Essentially for testing only: legacy whole bits/key
|
||||
int GetWholeBitsPerKey() const { return whole_bits_per_key_; }
|
||||
|
||||
protected:
|
||||
// To use this function, call FilterBuildingContext::GetBuilder().
|
||||
//
|
||||
@ -120,8 +125,16 @@ class BloomFilterPolicy : public FilterPolicy {
|
||||
const FilterBuildingContext&) const override;
|
||||
|
||||
private:
|
||||
int bits_per_key_;
|
||||
int num_probes_;
|
||||
// Newer filters support fractional bits per key. For predictable behavior
|
||||
// of 0.001-precision values across floating point implementations, we
|
||||
// round to thousandths of a bit (on average) per key.
|
||||
int millibits_per_key_;
|
||||
|
||||
// Older filters round to whole number bits per key. (There *should* be no
|
||||
// compatibility issue with fractional bits per key, but preserving old
|
||||
// behavior with format_version < 5 just in case.)
|
||||
int whole_bits_per_key_;
|
||||
|
||||
// Selected mode (a specific implementation or way of selecting an
|
||||
// implementation) for building new SST filters.
|
||||
Mode mode_;
|
||||
|
@ -54,7 +54,7 @@ with open('${sorted_input_data}', 'w') as f:
|
||||
EOF
|
||||
|
||||
declare -a backward_compatible_checkout_objs=("2.2.fb.branch" "2.3.fb.branch" "2.4.fb.branch" "2.5.fb.branch" "2.6.fb.branch" "2.7.fb.branch" "2.8.1.fb" "3.0.fb.branch" "3.1.fb" "3.2.fb" "3.3.fb" "3.4.fb" "3.5.fb" "3.6.fb" "3.7.fb" "3.8.fb" "3.9.fb")
|
||||
declare -a forward_compatible_checkout_objs=("3.10.fb" "3.11.fb" "3.12.fb" "3.13.fb" "4.0.fb" "4.1.fb" "4.2.fb" "4.3.fb" "4.4.fb" "4.5.fb" "4.6.fb" "4.7.fb" "4.8.fb" "4.9.fb" "4.10.fb" "4.11.fb" "4.12.fb" "4.13.fb" "5.0.fb" "5.1.fb" "5.2.fb" "5.3.fb" "5.4.fb" "5.5.fb" "5.6.fb" "5.7.fb" "5.8.fb" "5.9.fb" "5.10.fb")
|
||||
declare -a forward_compatible_checkout_objs=("4.2.fb" "4.3.fb" "4.4.fb" "4.5.fb" "4.6.fb" "4.7.fb" "4.8.fb" "4.9.fb" "4.10.fb" "4.11.fb" "4.12.fb" "4.13.fb" "5.0.fb" "5.1.fb" "5.2.fb" "5.3.fb" "5.4.fb" "5.5.fb" "5.6.fb" "5.7.fb" "5.8.fb" "5.9.fb" "5.10.fb")
|
||||
declare -a forward_compatible_with_options_checkout_objs=("5.11.fb" "5.12.fb" "5.13.fb" "5.14.fb" "5.15.fb" "5.16.fb" "5.17.fb" "5.18.fb" "6.0.fb" "6.1.fb" "6.2.fb" "6.3.fb" "6.4.fb" "6.5.fb")
|
||||
declare -a checkout_objs=(${backward_compatible_checkout_objs[@]} ${forward_compatible_checkout_objs[@]} ${forward_compatible_with_options_checkout_objs[@]})
|
||||
declare -a extern_sst_ingestion_compatible_checkout_objs=("5.14.fb" "5.15.fb" "5.16.fb" "5.17.fb" "5.18.fb" "6.0.fb" "6.1.fb" "6.2.fb" "6.3.fb" "6.4.fb" "6.5.fb")
|
||||
|
@ -72,6 +72,50 @@ namespace rocksdb {
|
||||
//
|
||||
class FastLocalBloomImpl {
|
||||
public:
|
||||
static inline int ChooseNumProbes(int millibits_per_key) {
|
||||
// Since this implementation can (with AVX2) make up to 8 probes
|
||||
// for the same cost, we pick the most accurate num_probes, based
|
||||
// on actual tests of the implementation. Note that for higher
|
||||
// bits/key, the best choice for cache-local Bloom can be notably
|
||||
// smaller than standard bloom, e.g. 9 instead of 11 @ 16 b/k.
|
||||
if (millibits_per_key <= 2080) {
|
||||
return 1;
|
||||
} else if (millibits_per_key <= 3580) {
|
||||
return 2;
|
||||
} else if (millibits_per_key <= 5100) {
|
||||
return 3;
|
||||
} else if (millibits_per_key <= 6640) {
|
||||
return 4;
|
||||
} else if (millibits_per_key <= 8300) {
|
||||
return 5;
|
||||
} else if (millibits_per_key <= 10070) {
|
||||
return 6;
|
||||
} else if (millibits_per_key <= 11720) {
|
||||
return 7;
|
||||
} else if (millibits_per_key <= 14001) {
|
||||
// Would be something like <= 13800 but sacrificing *slightly* for
|
||||
// more settings using <= 8 probes.
|
||||
return 8;
|
||||
} else if (millibits_per_key <= 16050) {
|
||||
return 9;
|
||||
} else if (millibits_per_key <= 18300) {
|
||||
return 10;
|
||||
} else if (millibits_per_key <= 22001) {
|
||||
return 11;
|
||||
} else if (millibits_per_key <= 25501) {
|
||||
return 12;
|
||||
} else if (millibits_per_key > 50000) {
|
||||
// Top out at 24 probes (three sets of 8)
|
||||
return 24;
|
||||
} else {
|
||||
// Roughly optimal choices for remaining range
|
||||
// e.g.
|
||||
// 28000 -> 12, 28001 -> 13
|
||||
// 50000 -> 23, 50001 -> 24
|
||||
return (millibits_per_key - 1) / 2000 - 1;
|
||||
}
|
||||
}
|
||||
|
||||
static inline void AddHash(uint32_t h1, uint32_t h2, uint32_t len_bytes,
|
||||
int num_probes, char *data) {
|
||||
uint32_t bytes_to_cache_line = fastrange32(len_bytes >> 6, h1) << 6;
|
||||
@ -228,6 +272,14 @@ class FastLocalBloomImpl {
|
||||
//
|
||||
class LegacyNoLocalityBloomImpl {
|
||||
public:
|
||||
static inline int ChooseNumProbes(int bits_per_key) {
|
||||
// We intentionally round down to reduce probing cost a little bit
|
||||
int num_probes = static_cast<int>(bits_per_key * 0.69); // 0.69 =~ ln(2)
|
||||
if (num_probes < 1) num_probes = 1;
|
||||
if (num_probes > 30) num_probes = 30;
|
||||
return num_probes;
|
||||
}
|
||||
|
||||
static inline void AddHash(uint32_t h, uint32_t total_bits, int num_probes,
|
||||
char *data) {
|
||||
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
|
||||
|
@ -16,6 +16,7 @@ int main() {
|
||||
#else
|
||||
|
||||
#include <array>
|
||||
#include <cmath>
|
||||
#include <vector>
|
||||
|
||||
#include "logging/logging.h"
|
||||
@ -69,7 +70,7 @@ class BlockBasedBloomTest : public testing::Test {
|
||||
filter_.clear();
|
||||
}
|
||||
|
||||
void ResetPolicy(int bits_per_key) {
|
||||
void ResetPolicy(double bits_per_key) {
|
||||
policy_.reset(new BloomFilterPolicy(bits_per_key,
|
||||
BloomFilterPolicy::kDeprecatedBlock));
|
||||
Reset();
|
||||
@ -229,6 +230,22 @@ TEST_F(BlockBasedBloomTest, Schema) {
|
||||
Build();
|
||||
ASSERT_EQ(BloomHash(FilterData()), 3057004015U);
|
||||
|
||||
// With new fractional bits_per_key, check that we are rounding to
|
||||
// whole bits per key for old Bloom filters.
|
||||
ResetPolicy(9.5); // Treated as 10
|
||||
for (int key = 1; key < 88; key++) {
|
||||
Add(Key(key, buffer));
|
||||
}
|
||||
Build();
|
||||
ASSERT_EQ(BloomHash(FilterData()), /*SAME*/ 3057004015U);
|
||||
|
||||
ResetPolicy(10.499); // Treated as 10
|
||||
for (int key = 1; key < 88; key++) {
|
||||
Add(Key(key, buffer));
|
||||
}
|
||||
Build();
|
||||
ASSERT_EQ(BloomHash(FilterData()), /*SAME*/ 3057004015U);
|
||||
|
||||
ResetPolicy();
|
||||
}
|
||||
|
||||
@ -250,7 +267,12 @@ class FullBloomTest : public testing::TestWithParam<BloomFilterPolicy::Mode> {
|
||||
|
||||
BuiltinFilterBitsBuilder* GetBuiltinFilterBitsBuilder() {
|
||||
// Throws on bad cast
|
||||
return &dynamic_cast<BuiltinFilterBitsBuilder&>(*bits_builder_.get());
|
||||
return &dynamic_cast<BuiltinFilterBitsBuilder&>(*bits_builder_);
|
||||
}
|
||||
|
||||
const BloomFilterPolicy* GetBloomFilterPolicy() {
|
||||
// Throws on bad cast
|
||||
return &dynamic_cast<const BloomFilterPolicy&>(*policy_);
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
@ -260,7 +282,7 @@ class FullBloomTest : public testing::TestWithParam<BloomFilterPolicy::Mode> {
|
||||
filter_size_ = 0;
|
||||
}
|
||||
|
||||
void ResetPolicy(int bits_per_key) {
|
||||
void ResetPolicy(double bits_per_key) {
|
||||
policy_.reset(new BloomFilterPolicy(bits_per_key, GetParam()));
|
||||
Reset();
|
||||
}
|
||||
@ -366,14 +388,57 @@ class FullBloomTest : public testing::TestWithParam<BloomFilterPolicy::Mode> {
|
||||
};
|
||||
|
||||
TEST_P(FullBloomTest, FilterSize) {
|
||||
auto bits_builder = GetBuiltinFilterBitsBuilder();
|
||||
for (int n = 1; n < 100; n++) {
|
||||
auto space = bits_builder->CalculateSpace(n);
|
||||
auto n2 = bits_builder->CalculateNumEntry(space);
|
||||
ASSERT_GE(n2, n);
|
||||
auto space2 = bits_builder->CalculateSpace(n2);
|
||||
ASSERT_EQ(space, space2);
|
||||
// In addition to checking the consistency of space computation, we are
|
||||
// checking that denoted and computed doubles are interpreted as expected
|
||||
// as bits_per_key values.
|
||||
bool some_computed_less_than_denoted = false;
|
||||
// Note: enforced minimum is 1 bit per key (1000 millibits), and enforced
|
||||
// maximum is 100 bits per key (100000 millibits).
|
||||
for (auto bpk :
|
||||
std::vector<std::pair<double, int> >{{-HUGE_VAL, 1000},
|
||||
{-INFINITY, 1000},
|
||||
{0.0, 1000},
|
||||
{1.234, 1234},
|
||||
{3.456, 3456},
|
||||
{9.5, 9500},
|
||||
{10.0, 10000},
|
||||
{10.499, 10499},
|
||||
{21.345, 21345},
|
||||
{99.999, 99999},
|
||||
{1234.0, 100000},
|
||||
{HUGE_VAL, 100000},
|
||||
{INFINITY, 100000},
|
||||
{NAN, 100000}}) {
|
||||
ResetPolicy(bpk.first);
|
||||
auto bfp = GetBloomFilterPolicy();
|
||||
EXPECT_EQ(bpk.second, bfp->GetMillibitsPerKey());
|
||||
EXPECT_EQ((bpk.second + 500) / 1000, bfp->GetWholeBitsPerKey());
|
||||
|
||||
double computed = bpk.first;
|
||||
// This transforms e.g. 9.5 -> 9.499999999999998, which we still
|
||||
// round to 10 for whole bits per key.
|
||||
computed += 0.5;
|
||||
computed /= 1234567.0;
|
||||
computed *= 1234567.0;
|
||||
computed -= 0.5;
|
||||
some_computed_less_than_denoted |= (computed < bpk.first);
|
||||
ResetPolicy(computed);
|
||||
bfp = GetBloomFilterPolicy();
|
||||
EXPECT_EQ(bpk.second, bfp->GetMillibitsPerKey());
|
||||
EXPECT_EQ((bpk.second + 500) / 1000, bfp->GetWholeBitsPerKey());
|
||||
|
||||
auto bits_builder = GetBuiltinFilterBitsBuilder();
|
||||
for (int n = 1; n < 100; n++) {
|
||||
auto space = bits_builder->CalculateSpace(n);
|
||||
auto n2 = bits_builder->CalculateNumEntry(space);
|
||||
EXPECT_GE(n2, n);
|
||||
auto space2 = bits_builder->CalculateSpace(n2);
|
||||
EXPECT_EQ(space, space2);
|
||||
}
|
||||
}
|
||||
// Check that the compiler hasn't optimized our computation into nothing
|
||||
EXPECT_TRUE(some_computed_less_than_denoted);
|
||||
ResetPolicy();
|
||||
}
|
||||
|
||||
TEST_P(FullBloomTest, FullEmptyFilter) {
|
||||
@ -546,32 +611,43 @@ TEST_P(FullBloomTest, Schema) {
|
||||
EXPECT_EQ("34,74,130,236,643,882,962,1015,1035,1110", FirstFPs(10));
|
||||
}
|
||||
|
||||
ResetPolicy(14); // num_probes = 9
|
||||
// This used to be 9 probes, but 8 is a better choice for speed,
|
||||
// especially with SIMD groups of 8 probes, with essentially no
|
||||
// change in FP rate.
|
||||
// FP rate @ 9 probes, old Bloom: 0.4321%
|
||||
// FP rate @ 9 probes, new Bloom: 0.1846%
|
||||
// FP rate @ 8 probes, new Bloom: 0.1843%
|
||||
ResetPolicy(14); // num_probes = 8 (new), 9 (old)
|
||||
for (int key = 0; key < 2087; key++) {
|
||||
Add(Key(key, buffer));
|
||||
}
|
||||
Build();
|
||||
EXPECT_EQ(GetNumProbesFromFilterData(), 9);
|
||||
EXPECT_EQ(GetNumProbesFromFilterData(), SelectByImpl(9, 8));
|
||||
EXPECT_EQ(
|
||||
BloomHash(FilterData()),
|
||||
SelectByImpl(SelectByCacheLineSize(178861123, 379087593, 2574136516U),
|
||||
3129678118U));
|
||||
3709876890U));
|
||||
if (GetParam() == BloomFilterPolicy::kFastLocalBloom) {
|
||||
EXPECT_EQ("130,989,2002,3225,3543,4522,4863,5256,5277", FirstFPs(9));
|
||||
EXPECT_EQ("130,240,522,565,989,2002,2526,3147,3543", FirstFPs(9));
|
||||
}
|
||||
|
||||
ResetPolicy(16); // num_probes = 11
|
||||
// This used to be 11 probes, but 9 is a better choice for speed
|
||||
// AND accuracy.
|
||||
// FP rate @ 11 probes, old Bloom: 0.3571%
|
||||
// FP rate @ 11 probes, new Bloom: 0.0884%
|
||||
// FP rate @ 9 probes, new Bloom: 0.0843%
|
||||
ResetPolicy(16); // num_probes = 9 (new), 11 (old)
|
||||
for (int key = 0; key < 2087; key++) {
|
||||
Add(Key(key, buffer));
|
||||
}
|
||||
Build();
|
||||
EXPECT_EQ(GetNumProbesFromFilterData(), 11);
|
||||
EXPECT_EQ(GetNumProbesFromFilterData(), SelectByImpl(11, 9));
|
||||
EXPECT_EQ(
|
||||
BloomHash(FilterData()),
|
||||
SelectByImpl(SelectByCacheLineSize(1129406313, 3049154394U, 1727750964),
|
||||
1262483504));
|
||||
1087138490));
|
||||
if (GetParam() == BloomFilterPolicy::kFastLocalBloom) {
|
||||
EXPECT_EQ("240,945,2660,3299,4031,4282,5173,6197,8715", FirstFPs(9));
|
||||
EXPECT_EQ("3299,3611,3916,6620,7822,8079,8482,8942,10167", FirstFPs(9));
|
||||
}
|
||||
|
||||
ResetPolicy(10); // num_probes = 6, but different memory ratio vs. 9
|
||||
@ -616,6 +692,39 @@ TEST_P(FullBloomTest, Schema) {
|
||||
EXPECT_EQ("16,126,133,422,466,472,813,1002,1035,1159", FirstFPs(10));
|
||||
}
|
||||
|
||||
// With new fractional bits_per_key, check that we are rounding to
|
||||
// whole bits per key for old Bloom filters but fractional for
|
||||
// new Bloom filter.
|
||||
ResetPolicy(9.5);
|
||||
for (int key = 1; key < 2088; key++) {
|
||||
Add(Key(key, buffer));
|
||||
}
|
||||
Build();
|
||||
EXPECT_EQ(GetNumProbesFromFilterData(), 6);
|
||||
EXPECT_EQ(BloomHash(FilterData()),
|
||||
SelectByImpl(/*SAME*/ SelectByCacheLineSize(2885052954U, 769447944,
|
||||
4175124908U),
|
||||
/*CHANGED*/ 3166884174U));
|
||||
if (GetParam() == BloomFilterPolicy::kFastLocalBloom) {
|
||||
EXPECT_EQ(/*CHANGED*/ "126,156,367,444,458,791,813,976,1015,1035",
|
||||
FirstFPs(10));
|
||||
}
|
||||
|
||||
ResetPolicy(10.499);
|
||||
for (int key = 1; key < 2088; key++) {
|
||||
Add(Key(key, buffer));
|
||||
}
|
||||
Build();
|
||||
EXPECT_EQ(GetNumProbesFromFilterData(), SelectByImpl(6, 7));
|
||||
EXPECT_EQ(BloomHash(FilterData()),
|
||||
SelectByImpl(/*SAME*/ SelectByCacheLineSize(2885052954U, 769447944,
|
||||
4175124908U),
|
||||
/*CHANGED*/ 4098502778U));
|
||||
if (GetParam() == BloomFilterPolicy::kFastLocalBloom) {
|
||||
EXPECT_EQ(/*CHANGED*/ "16,236,240,472,1015,1045,1111,1409,1465,1612",
|
||||
FirstFPs(10));
|
||||
}
|
||||
|
||||
ResetPolicy();
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ DEFINE_uint32(vary_key_size_log2_interval, 5,
|
||||
|
||||
DEFINE_uint32(batch_size, 8, "Number of keys to group in each batch");
|
||||
|
||||
DEFINE_uint32(bits_per_key, 10, "Bits per key setting for filters");
|
||||
DEFINE_double(bits_per_key, 10.0, "Bits per key setting for filters");
|
||||
|
||||
DEFINE_double(m_queries, 200, "Millions of queries for each test mode");
|
||||
|
||||
|
@ -568,6 +568,14 @@ Status BackupEngineImpl::Initialize() {
|
||||
// we might need to clean up from previous crash or I/O errors
|
||||
might_need_garbage_collect_ = true;
|
||||
|
||||
if (options_.max_valid_backups_to_open != port::kMaxInt32) {
|
||||
options_.max_valid_backups_to_open = port::kMaxInt32;
|
||||
ROCKS_LOG_WARN(
|
||||
options_.info_log,
|
||||
"`max_valid_backups_to_open` is not set to the default value. Ignoring "
|
||||
"its value since BackupEngine is not read-only.");
|
||||
}
|
||||
|
||||
// gather the list of directories that we need to create
|
||||
std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
|
||||
directories;
|
||||
@ -1044,29 +1052,21 @@ Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
|
||||
// After removing meta file, best effort deletion even with errors.
|
||||
// (Don't delete other files if we can't delete the meta file right
|
||||
// now.)
|
||||
|
||||
if (options_.max_valid_backups_to_open == port::kMaxInt32) {
|
||||
std::vector<std::string> to_delete;
|
||||
for (auto& itr : backuped_file_infos_) {
|
||||
if (itr.second->refs == 0) {
|
||||
Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
|
||||
itr.first.c_str(), s.ToString().c_str());
|
||||
to_delete.push_back(itr.first);
|
||||
if (!s.ok()) {
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
std::vector<std::string> to_delete;
|
||||
for (auto& itr : backuped_file_infos_) {
|
||||
if (itr.second->refs == 0) {
|
||||
Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
|
||||
s.ToString().c_str());
|
||||
to_delete.push_back(itr.first);
|
||||
if (!s.ok()) {
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
for (auto& td : to_delete) {
|
||||
backuped_file_infos_.erase(td);
|
||||
}
|
||||
} else {
|
||||
ROCKS_LOG_WARN(
|
||||
options_.info_log,
|
||||
"DeleteBackup cleanup is limited since `max_valid_backups_to_open` "
|
||||
"constrains how many backups the engine knows about");
|
||||
}
|
||||
for (auto& td : to_delete) {
|
||||
backuped_file_infos_.erase(td);
|
||||
}
|
||||
|
||||
// take care of private dirs -- GarbageCollect() will take care of them
|
||||
@ -1569,64 +1569,54 @@ Status BackupEngineImpl::GarbageCollect() {
|
||||
might_need_garbage_collect_ = false;
|
||||
|
||||
ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
|
||||
if (options_.max_valid_backups_to_open != port::kMaxInt32) {
|
||||
ROCKS_LOG_WARN(
|
||||
options_.info_log,
|
||||
"Garbage collection is limited since `max_valid_backups_to_open` "
|
||||
"constrains how many backups the engine knows about");
|
||||
}
|
||||
|
||||
if (options_.max_valid_backups_to_open == port::kMaxInt32) {
|
||||
// delete obsolete shared files
|
||||
// we cannot do this when BackupEngine has `max_valid_backups_to_open` set
|
||||
// as those engines don't know about all shared files.
|
||||
for (bool with_checksum : {false, true}) {
|
||||
std::vector<std::string> shared_children;
|
||||
{
|
||||
std::string shared_path;
|
||||
if (with_checksum) {
|
||||
shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
|
||||
} else {
|
||||
shared_path = GetAbsolutePath(GetSharedFileRel());
|
||||
}
|
||||
auto s = backup_env_->FileExists(shared_path);
|
||||
if (s.ok()) {
|
||||
s = backup_env_->GetChildren(shared_path, &shared_children);
|
||||
} else if (s.IsNotFound()) {
|
||||
s = Status::OK();
|
||||
}
|
||||
// delete obsolete shared files
|
||||
for (bool with_checksum : {false, true}) {
|
||||
std::vector<std::string> shared_children;
|
||||
{
|
||||
std::string shared_path;
|
||||
if (with_checksum) {
|
||||
shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
|
||||
} else {
|
||||
shared_path = GetAbsolutePath(GetSharedFileRel());
|
||||
}
|
||||
auto s = backup_env_->FileExists(shared_path);
|
||||
if (s.ok()) {
|
||||
s = backup_env_->GetChildren(shared_path, &shared_children);
|
||||
} else if (s.IsNotFound()) {
|
||||
s = Status::OK();
|
||||
}
|
||||
if (!s.ok()) {
|
||||
overall_status = s;
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
for (auto& child : shared_children) {
|
||||
if (child == "." || child == "..") {
|
||||
continue;
|
||||
}
|
||||
std::string rel_fname;
|
||||
if (with_checksum) {
|
||||
rel_fname = GetSharedFileWithChecksumRel(child);
|
||||
} else {
|
||||
rel_fname = GetSharedFileRel(child);
|
||||
}
|
||||
auto child_itr = backuped_file_infos_.find(rel_fname);
|
||||
// if it's not refcounted, delete it
|
||||
if (child_itr == backuped_file_infos_.end() ||
|
||||
child_itr->second->refs == 0) {
|
||||
// this might be a directory, but DeleteFile will just fail in that
|
||||
// case, so we're good
|
||||
Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
|
||||
rel_fname.c_str(), s.ToString().c_str());
|
||||
backuped_file_infos_.erase(rel_fname);
|
||||
if (!s.ok()) {
|
||||
overall_status = s;
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
for (auto& child : shared_children) {
|
||||
if (child == "." || child == "..") {
|
||||
continue;
|
||||
}
|
||||
std::string rel_fname;
|
||||
if (with_checksum) {
|
||||
rel_fname = GetSharedFileWithChecksumRel(child);
|
||||
} else {
|
||||
rel_fname = GetSharedFileRel(child);
|
||||
}
|
||||
auto child_itr = backuped_file_infos_.find(rel_fname);
|
||||
// if it's not refcounted, delete it
|
||||
if (child_itr == backuped_file_infos_.end() ||
|
||||
child_itr->second->refs == 0) {
|
||||
// this might be a directory, but DeleteFile will just fail in that
|
||||
// case, so we're good
|
||||
Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
|
||||
rel_fname.c_str(), s.ToString().c_str());
|
||||
backuped_file_infos_.erase(rel_fname);
|
||||
if (!s.ok()) {
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1645,8 +1635,7 @@ Status BackupEngineImpl::GarbageCollect() {
|
||||
if (child == "." || child == "..") {
|
||||
continue;
|
||||
}
|
||||
// it's ok to do this when BackupEngine has `max_valid_backups_to_open` set
|
||||
// as the engine always knows all valid backup numbers.
|
||||
|
||||
BackupID backup_id = 0;
|
||||
bool tmp_dir = child.find(".tmp") != std::string::npos;
|
||||
sscanf(child.c_str(), "%u", &backup_id);
|
||||
|
@ -1687,12 +1687,50 @@ TEST_F(BackupableDBTest, LimitBackupsOpened) {
|
||||
CloseDBAndBackupEngine();
|
||||
|
||||
backupable_options_->max_valid_backups_to_open = 2;
|
||||
OpenDBAndBackupEngine();
|
||||
backupable_options_->destroy_old_data = false;
|
||||
BackupEngineReadOnly* read_only_backup_engine;
|
||||
ASSERT_OK(BackupEngineReadOnly::Open(backup_chroot_env_.get(),
|
||||
*backupable_options_,
|
||||
&read_only_backup_engine));
|
||||
|
||||
std::vector<BackupInfo> backup_infos;
|
||||
backup_engine_->GetBackupInfo(&backup_infos);
|
||||
read_only_backup_engine->GetBackupInfo(&backup_infos);
|
||||
ASSERT_EQ(2, backup_infos.size());
|
||||
ASSERT_EQ(2, backup_infos[0].backup_id);
|
||||
ASSERT_EQ(4, backup_infos[1].backup_id);
|
||||
delete read_only_backup_engine;
|
||||
}
|
||||
|
||||
TEST_F(BackupableDBTest, IgnoreLimitBackupsOpenedWhenNotReadOnly) {
|
||||
// Verify the specified max_valid_backups_to_open is ignored if the engine
|
||||
// is not read-only.
|
||||
//
|
||||
// Setup:
|
||||
// - backups 1, 2, and 4 are valid
|
||||
// - backup 3 is corrupt
|
||||
// - max_valid_backups_to_open == 2
|
||||
//
|
||||
// Expectation: the engine opens backups 4, 2, and 1 since those are latest
|
||||
// non-corrupt backups, by ignoring max_valid_backups_to_open == 2.
|
||||
const int kNumKeys = 5000;
|
||||
OpenDBAndBackupEngine(true);
|
||||
for (int i = 1; i <= 4; ++i) {
|
||||
FillDB(db_.get(), kNumKeys * i, kNumKeys * (i + 1));
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
|
||||
if (i == 3) {
|
||||
ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/3", 3));
|
||||
}
|
||||
}
|
||||
CloseDBAndBackupEngine();
|
||||
|
||||
backupable_options_->max_valid_backups_to_open = 2;
|
||||
OpenDBAndBackupEngine();
|
||||
std::vector<BackupInfo> backup_infos;
|
||||
backup_engine_->GetBackupInfo(&backup_infos);
|
||||
ASSERT_EQ(3, backup_infos.size());
|
||||
ASSERT_EQ(1, backup_infos[0].backup_id);
|
||||
ASSERT_EQ(2, backup_infos[1].backup_id);
|
||||
ASSERT_EQ(4, backup_infos[2].backup_id);
|
||||
CloseDBAndBackupEngine();
|
||||
DestroyDB(dbname_, options_);
|
||||
}
|
||||
@ -1718,33 +1756,6 @@ TEST_F(BackupableDBTest, CreateWhenLatestBackupCorrupted) {
|
||||
ASSERT_EQ(2, backup_infos[0].backup_id);
|
||||
}
|
||||
|
||||
TEST_F(BackupableDBTest, WriteOnlyEngine) {
|
||||
// Verify we can open a backup engine and create new ones even if reading old
|
||||
// backups would fail with IOError. IOError is a more serious condition than
|
||||
// corruption and would cause the engine to fail opening. So the only way to
|
||||
// avoid is by not reading old backups at all, i.e., respecting
|
||||
// `max_valid_backups_to_open == 0`.
|
||||
const int kNumKeys = 5000;
|
||||
OpenDBAndBackupEngine(true /* destroy_old_data */);
|
||||
FillDB(db_.get(), 0 /* from */, kNumKeys);
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
|
||||
CloseDBAndBackupEngine();
|
||||
|
||||
backupable_options_->max_valid_backups_to_open = 0;
|
||||
// cause any meta-file reads to fail with IOError during Open
|
||||
test_backup_env_->SetDummySequentialFile(true);
|
||||
test_backup_env_->SetDummySequentialFileFailReads(true);
|
||||
OpenDBAndBackupEngine();
|
||||
test_backup_env_->SetDummySequentialFileFailReads(false);
|
||||
test_backup_env_->SetDummySequentialFile(false);
|
||||
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
|
||||
std::vector<BackupInfo> backup_infos;
|
||||
backup_engine_->GetBackupInfo(&backup_infos);
|
||||
ASSERT_EQ(1, backup_infos.size());
|
||||
ASSERT_EQ(2, backup_infos[0].backup_id);
|
||||
}
|
||||
|
||||
TEST_F(BackupableDBTest, WriteOnlyEngineNoSharedFileDeletion) {
|
||||
// Verifies a write-only BackupEngine does not delete files belonging to valid
|
||||
// backups when GarbageCollect, PurgeOldBackups, or DeleteBackup are called.
|
||||
|
@ -84,7 +84,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
|
||||
statistics_(db_options_.statistics.get()),
|
||||
next_file_number_(1),
|
||||
flush_sequence_(0),
|
||||
epoch_of_(0),
|
||||
closed_(true),
|
||||
open_file_count_(0),
|
||||
total_blob_size_(0),
|
||||
@ -584,14 +583,24 @@ Status BlobDBImpl::GetBlobFileReader(
|
||||
return s;
|
||||
}
|
||||
|
||||
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
|
||||
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
|
||||
bool has_ttl, const ExpirationRange& expiration_range,
|
||||
const std::string& reason) {
|
||||
assert(has_ttl == (expiration_range.first || expiration_range.second));
|
||||
|
||||
uint64_t file_num = next_file_number_++;
|
||||
auto bfile = std::make_shared<BlobFile>(this, blob_dir_, file_num,
|
||||
db_options_.info_log.get());
|
||||
|
||||
const uint32_t column_family_id =
|
||||
static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
||||
auto blob_file = std::make_shared<BlobFile>(
|
||||
this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
|
||||
bdb_options_.compression, has_ttl, expiration_range);
|
||||
|
||||
ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
|
||||
bfile->PathName().c_str(), reason.c_str());
|
||||
blob_file->PathName().c_str(), reason.c_str());
|
||||
LogFlush(db_options_.info_log);
|
||||
return bfile;
|
||||
|
||||
return blob_file;
|
||||
}
|
||||
|
||||
Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
|
||||
@ -687,142 +696,123 @@ Status BlobDBImpl::CheckOrCreateWriterLocked(
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BlobDBImpl::CreateBlobFileAndWriter(
|
||||
bool has_ttl, const ExpirationRange& expiration_range,
|
||||
const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
|
||||
std::shared_ptr<Writer>* writer) {
|
||||
assert(has_ttl == (expiration_range.first || expiration_range.second));
|
||||
assert(blob_file);
|
||||
assert(writer);
|
||||
|
||||
*blob_file = NewBlobFile(has_ttl, expiration_range, reason);
|
||||
assert(*blob_file);
|
||||
|
||||
// file not visible, hence no lock
|
||||
Status s = CheckOrCreateWriterLocked(*blob_file, writer);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||
"Failed to get writer for blob file: %s, error: %s",
|
||||
(*blob_file)->PathName().c_str(), s.ToString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
assert(*writer);
|
||||
|
||||
s = (*writer)->WriteHeader((*blob_file)->header_);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||
"Failed to write header to new blob file: %s"
|
||||
" status: '%s'",
|
||||
(*blob_file)->PathName().c_str(), s.ToString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
(*blob_file)->SetFileSize(BlobLogHeader::kSize);
|
||||
total_blob_size_ += BlobLogHeader::kSize;
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
|
||||
assert(blob_file != nullptr);
|
||||
assert(blob_file);
|
||||
|
||||
{
|
||||
ReadLock rl(&mutex_);
|
||||
if (open_non_ttl_file_ != nullptr) {
|
||||
|
||||
if (open_non_ttl_file_) {
|
||||
assert(!open_non_ttl_file_->Immutable());
|
||||
*blob_file = open_non_ttl_file_;
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
// CHECK again
|
||||
// Check again
|
||||
WriteLock wl(&mutex_);
|
||||
if (open_non_ttl_file_ != nullptr) {
|
||||
|
||||
if (open_non_ttl_file_) {
|
||||
assert(!open_non_ttl_file_->Immutable());
|
||||
*blob_file = open_non_ttl_file_;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
*blob_file = NewBlobFile("SelectBlobFile");
|
||||
assert(*blob_file != nullptr);
|
||||
|
||||
// file not visible, hence no lock
|
||||
std::shared_ptr<Writer> writer;
|
||||
Status s = CheckOrCreateWriterLocked(*blob_file, &writer);
|
||||
const Status s = CreateBlobFileAndWriter(
|
||||
/* has_ttl */ false, ExpirationRange(),
|
||||
/* reason */ "SelectBlobFile", blob_file, &writer);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||
"Failed to get writer from blob file: %s, error: %s",
|
||||
(*blob_file)->PathName().c_str(), s.ToString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
(*blob_file)->file_size_ = BlobLogHeader::kSize;
|
||||
(*blob_file)->header_.compression = bdb_options_.compression;
|
||||
(*blob_file)->header_.has_ttl = false;
|
||||
(*blob_file)->header_.column_family_id =
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
||||
(*blob_file)->header_valid_ = true;
|
||||
(*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id);
|
||||
(*blob_file)->SetHasTTL(false);
|
||||
(*blob_file)->SetCompression(bdb_options_.compression);
|
||||
|
||||
s = writer->WriteHeader((*blob_file)->header_);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||
"Failed to write header to new blob file: %s"
|
||||
" status: '%s'",
|
||||
(*blob_file)->PathName().c_str(), s.ToString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
blob_files_.insert(
|
||||
std::make_pair((*blob_file)->BlobFileNumber(), *blob_file));
|
||||
blob_files_.insert(std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
|
||||
(*blob_file)->BlobFileNumber(), *blob_file));
|
||||
open_non_ttl_file_ = *blob_file;
|
||||
total_blob_size_ += BlobLogHeader::kSize;
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
|
||||
std::shared_ptr<BlobFile>* blob_file) {
|
||||
assert(blob_file != nullptr);
|
||||
assert(blob_file);
|
||||
assert(expiration != kNoExpiration);
|
||||
uint64_t epoch_read = 0;
|
||||
|
||||
{
|
||||
ReadLock rl(&mutex_);
|
||||
|
||||
*blob_file = FindBlobFileLocked(expiration);
|
||||
epoch_read = epoch_of_.load();
|
||||
if (*blob_file != nullptr) {
|
||||
assert(!(*blob_file)->Immutable());
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
// Check again
|
||||
WriteLock wl(&mutex_);
|
||||
|
||||
*blob_file = FindBlobFileLocked(expiration);
|
||||
if (*blob_file != nullptr) {
|
||||
assert(!(*blob_file)->Immutable());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
uint64_t exp_low =
|
||||
const uint64_t exp_low =
|
||||
(expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
|
||||
uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
|
||||
ExpirationRange expiration_range = std::make_pair(exp_low, exp_high);
|
||||
const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
|
||||
const ExpirationRange expiration_range(exp_low, exp_high);
|
||||
|
||||
*blob_file = NewBlobFile("SelectBlobFileTTL");
|
||||
assert(*blob_file != nullptr);
|
||||
std::ostringstream oss;
|
||||
oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
|
||||
|
||||
ROCKS_LOG_INFO(db_options_.info_log,
|
||||
"New blob file TTL range: %s %" PRIu64 " %" PRIu64,
|
||||
(*blob_file)->PathName().c_str(), exp_low, exp_high);
|
||||
LogFlush(db_options_.info_log);
|
||||
|
||||
// we don't need to take lock as no other thread is seeing bfile yet
|
||||
std::shared_ptr<Writer> writer;
|
||||
Status s = CheckOrCreateWriterLocked(*blob_file, &writer);
|
||||
const Status s =
|
||||
CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
|
||||
/* reason */ oss.str(), blob_file, &writer);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(
|
||||
db_options_.info_log,
|
||||
"Failed to get writer from blob file with TTL: %s, error: %s",
|
||||
(*blob_file)->PathName().c_str(), s.ToString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
(*blob_file)->header_.expiration_range = expiration_range;
|
||||
(*blob_file)->header_.compression = bdb_options_.compression;
|
||||
(*blob_file)->header_.has_ttl = true;
|
||||
(*blob_file)->header_.column_family_id =
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
||||
(*blob_file)->header_valid_ = true;
|
||||
(*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id);
|
||||
(*blob_file)->SetHasTTL(true);
|
||||
(*blob_file)->SetCompression(bdb_options_.compression);
|
||||
(*blob_file)->file_size_ = BlobLogHeader::kSize;
|
||||
|
||||
// set the first value of the range, since that is
|
||||
// concrete at this time. also necessary to add to open_ttl_files_
|
||||
(*blob_file)->expiration_range_ = expiration_range;
|
||||
|
||||
WriteLock wl(&mutex_);
|
||||
// in case the epoch has shifted in the interim, then check
|
||||
// check condition again - should be rare.
|
||||
if (epoch_of_.load() != epoch_read) {
|
||||
std::shared_ptr<BlobFile> blob_file2 = FindBlobFileLocked(expiration);
|
||||
if (blob_file2 != nullptr) {
|
||||
*blob_file = std::move(blob_file2);
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
s = writer->WriteHeader((*blob_file)->header_);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||
"Failed to write header to new blob file: %s"
|
||||
" status: '%s'",
|
||||
(*blob_file)->PathName().c_str(), s.ToString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
blob_files_.insert(
|
||||
std::make_pair((*blob_file)->BlobFileNumber(), *blob_file));
|
||||
blob_files_.insert(std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
|
||||
(*blob_file)->BlobFileNumber(), *blob_file));
|
||||
open_ttl_files_.insert(*blob_file);
|
||||
total_blob_size_ += BlobLogHeader::kSize;
|
||||
epoch_of_++;
|
||||
|
||||
return s;
|
||||
}
|
||||
@ -1261,15 +1251,18 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
|
||||
|
||||
Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||
PinnableSlice* value, uint64_t* expiration) {
|
||||
assert(value != nullptr);
|
||||
assert(value);
|
||||
|
||||
BlobIndex blob_index;
|
||||
Status s = blob_index.DecodeFrom(index_entry);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
|
||||
return Status::NotFound("Key expired");
|
||||
}
|
||||
|
||||
if (expiration != nullptr) {
|
||||
if (blob_index.HasTTL()) {
|
||||
*expiration = blob_index.expiration();
|
||||
@ -1277,13 +1270,65 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||
*expiration = kNoExpiration;
|
||||
}
|
||||
}
|
||||
|
||||
if (blob_index.IsInlined()) {
|
||||
// TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
|
||||
// memory buffer to avoid extra copy.
|
||||
value->PinSelf(blob_index.value());
|
||||
return Status::OK();
|
||||
}
|
||||
if (blob_index.size() == 0) {
|
||||
|
||||
CompressionType compression_type = kNoCompression;
|
||||
s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
|
||||
blob_index.size(), value, &compression_type);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
if (compression_type != kNoCompression) {
|
||||
BlockContents contents;
|
||||
auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
||||
|
||||
{
|
||||
StopWatch decompression_sw(env_, statistics_,
|
||||
BLOB_DB_DECOMPRESSION_MICROS);
|
||||
UncompressionContext context(compression_type);
|
||||
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
|
||||
compression_type);
|
||||
s = UncompressBlockContentsForCompressionType(
|
||||
info, value->data(), value->size(), &contents,
|
||||
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
|
||||
}
|
||||
|
||||
if (!s.ok()) {
|
||||
if (debug_level_ >= 2) {
|
||||
ROCKS_LOG_ERROR(
|
||||
db_options_.info_log,
|
||||
"Uncompression error during blob read from file: %" PRIu64
|
||||
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
|
||||
" key: %s status: '%s'",
|
||||
blob_index.file_number(), blob_index.offset(), blob_index.size(),
|
||||
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
|
||||
}
|
||||
|
||||
return Status::Corruption("Unable to uncompress blob.");
|
||||
}
|
||||
|
||||
value->PinSelf(contents.data);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
|
||||
uint64_t offset, uint64_t size,
|
||||
PinnableSlice* value,
|
||||
CompressionType* compression_type) {
|
||||
assert(value);
|
||||
assert(compression_type);
|
||||
assert(*compression_type == kNoCompression);
|
||||
|
||||
if (!size) {
|
||||
value->PinSelf("");
|
||||
return Status::OK();
|
||||
}
|
||||
@ -1291,47 +1336,46 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||
// offset has to have certain min, as we will read CRC
|
||||
// later from the Blob Header, which needs to be also a
|
||||
// valid offset.
|
||||
if (blob_index.offset() <
|
||||
if (offset <
|
||||
(BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
|
||||
if (debug_level_ >= 2) {
|
||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||
"Invalid blob index file_number: %" PRIu64
|
||||
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
|
||||
" key: %s",
|
||||
blob_index.file_number(), blob_index.offset(),
|
||||
blob_index.size(), key.data());
|
||||
file_number, offset, size,
|
||||
key.ToString(/* output_hex */ true).c_str());
|
||||
}
|
||||
|
||||
return Status::NotFound("Invalid blob offset");
|
||||
}
|
||||
|
||||
std::shared_ptr<BlobFile> bfile;
|
||||
std::shared_ptr<BlobFile> blob_file;
|
||||
|
||||
{
|
||||
ReadLock rl(&mutex_);
|
||||
auto hitr = blob_files_.find(blob_index.file_number());
|
||||
auto it = blob_files_.find(file_number);
|
||||
|
||||
// file was deleted
|
||||
if (hitr == blob_files_.end()) {
|
||||
if (it == blob_files_.end()) {
|
||||
return Status::NotFound("Blob Not Found as blob file missing");
|
||||
}
|
||||
|
||||
bfile = hitr->second;
|
||||
blob_file = it->second;
|
||||
}
|
||||
|
||||
if (blob_index.size() == 0 && value != nullptr) {
|
||||
value->PinSelf("");
|
||||
return Status::OK();
|
||||
}
|
||||
*compression_type = blob_file->compression();
|
||||
|
||||
// takes locks when called
|
||||
std::shared_ptr<RandomAccessFileReader> reader;
|
||||
s = GetBlobFileReader(bfile, &reader);
|
||||
Status s = GetBlobFileReader(blob_file, &reader);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
assert(blob_index.offset() > key.size() + sizeof(uint32_t));
|
||||
uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);
|
||||
uint64_t record_size = sizeof(uint32_t) + key.size() + blob_index.size();
|
||||
assert(offset >= key.size() + sizeof(uint32_t));
|
||||
const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
|
||||
const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
|
||||
|
||||
// Allocate the buffer. This is safe in C++11
|
||||
std::string buffer_str(static_cast<size_t>(record_size), static_cast<char>(0));
|
||||
@ -1339,42 +1383,44 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||
|
||||
// A partial blob record contain checksum, key and value.
|
||||
Slice blob_record;
|
||||
|
||||
{
|
||||
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
|
||||
s = reader->Read(record_offset, static_cast<size_t>(record_size), &blob_record, buffer);
|
||||
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
|
||||
}
|
||||
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_DEBUG(db_options_.info_log,
|
||||
"Failed to read blob from blob file %" PRIu64
|
||||
", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
|
||||
", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
|
||||
bfile->BlobFileNumber(), blob_index.offset(),
|
||||
blob_index.size(), key.size(), s.ToString().c_str());
|
||||
ROCKS_LOG_DEBUG(
|
||||
db_options_.info_log,
|
||||
"Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
|
||||
", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
|
||||
file_number, offset, size, key.size(), s.ToString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
if (blob_record.size() != record_size) {
|
||||
ROCKS_LOG_DEBUG(
|
||||
db_options_.info_log,
|
||||
"Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
|
||||
", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
|
||||
", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
|
||||
bfile->BlobFileNumber(), blob_index.offset(), blob_index.size(),
|
||||
key.size(), blob_record.size(), record_size);
|
||||
file_number, offset, size, key.size(), blob_record.size(), record_size);
|
||||
|
||||
return Status::Corruption("Failed to retrieve blob from blob index.");
|
||||
}
|
||||
|
||||
Slice crc_slice(blob_record.data(), sizeof(uint32_t));
|
||||
Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
|
||||
static_cast<size_t>(blob_index.size()));
|
||||
uint32_t crc_exp;
|
||||
static_cast<size_t>(size));
|
||||
|
||||
uint32_t crc_exp = 0;
|
||||
if (!GetFixed32(&crc_slice, &crc_exp)) {
|
||||
ROCKS_LOG_DEBUG(db_options_.info_log,
|
||||
"Unable to decode CRC from blob file %" PRIu64
|
||||
", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
|
||||
", key size: %" ROCKSDB_PRIszt ", status: '%s'",
|
||||
bfile->BlobFileNumber(), blob_index.offset(),
|
||||
blob_index.size(), key.size(), s.ToString().c_str());
|
||||
ROCKS_LOG_DEBUG(
|
||||
db_options_.info_log,
|
||||
"Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
|
||||
", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
|
||||
file_number, offset, size, key.size(), s.ToString().c_str());
|
||||
return Status::Corruption("Unable to decode checksum.");
|
||||
}
|
||||
|
||||
@ -1383,34 +1429,20 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||
crc = crc32c::Mask(crc); // Adjust for storage
|
||||
if (crc != crc_exp) {
|
||||
if (debug_level_ >= 2) {
|
||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||
"Blob crc mismatch file: %s blob_offset: %" PRIu64
|
||||
" blob_size: %" PRIu64 " key: %s status: '%s'",
|
||||
bfile->PathName().c_str(), blob_index.offset(),
|
||||
blob_index.size(), key.data(), s.ToString().c_str());
|
||||
ROCKS_LOG_ERROR(
|
||||
db_options_.info_log,
|
||||
"Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
|
||||
" blob_size: %" PRIu64 " key: %s status: '%s'",
|
||||
file_number, offset, size,
|
||||
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
|
||||
}
|
||||
|
||||
return Status::Corruption("Corruption. Blob CRC mismatch");
|
||||
}
|
||||
|
||||
if (bfile->compression() == kNoCompression) {
|
||||
value->PinSelf(blob_value);
|
||||
} else {
|
||||
BlockContents contents;
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
||||
{
|
||||
StopWatch decompression_sw(env_, statistics_,
|
||||
BLOB_DB_DECOMPRESSION_MICROS);
|
||||
UncompressionContext context(bfile->compression());
|
||||
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
|
||||
bfile->compression());
|
||||
s = UncompressBlockContentsForCompressionType(
|
||||
info, blob_value.data(), blob_value.size(), &contents,
|
||||
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
|
||||
}
|
||||
value->PinSelf(contents.data);
|
||||
}
|
||||
value->PinSelf(blob_value);
|
||||
|
||||
return s;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlobDBImpl::Get(const ReadOptions& read_options,
|
||||
@ -1954,7 +1986,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
||||
// new file
|
||||
std::string reason("GC of ");
|
||||
reason += bfptr->PathName();
|
||||
newfile = NewBlobFile(reason);
|
||||
newfile = NewBlobFile(bfptr->HasTTL(), bfptr->expiration_range_, reason);
|
||||
|
||||
s = CheckOrCreateWriterLocked(newfile, &new_writer);
|
||||
if (!s.ok()) {
|
||||
@ -1963,14 +1995,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
||||
newfile->PathName().c_str(), s.ToString().c_str());
|
||||
break;
|
||||
}
|
||||
// Can't use header beyond this point
|
||||
newfile->header_ = std::move(header);
|
||||
newfile->header_valid_ = true;
|
||||
newfile->file_size_ = BlobLogHeader::kSize;
|
||||
newfile->SetColumnFamilyId(bfptr->column_family_id());
|
||||
newfile->SetHasTTL(bfptr->HasTTL());
|
||||
newfile->SetCompression(bfptr->compression());
|
||||
newfile->expiration_range_ = bfptr->expiration_range_;
|
||||
|
||||
s = new_writer->WriteHeader(newfile->header_);
|
||||
if (!s.ok()) {
|
||||
|
@ -237,6 +237,11 @@ class BlobDBImpl : public BlobDB {
|
||||
Status GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||
PinnableSlice* value, uint64_t* expiration = nullptr);
|
||||
|
||||
Status GetRawBlobFromFile(const Slice& key, uint64_t file_number,
|
||||
uint64_t offset, uint64_t size,
|
||||
PinnableSlice* value,
|
||||
CompressionType* compression_type);
|
||||
|
||||
Slice GetCompressedSlice(const Slice& raw,
|
||||
std::string* compression_output) const;
|
||||
|
||||
@ -264,14 +269,22 @@ class BlobDBImpl : public BlobDB {
|
||||
const Slice& value, uint64_t expiration,
|
||||
std::string* index_entry);
|
||||
|
||||
// find an existing blob log file based on the expiration unix epoch
|
||||
// if such a file does not exist, return nullptr
|
||||
// Create a new blob file and associated writer.
|
||||
Status CreateBlobFileAndWriter(bool has_ttl,
|
||||
const ExpirationRange& expiration_range,
|
||||
const std::string& reason,
|
||||
std::shared_ptr<BlobFile>* blob_file,
|
||||
std::shared_ptr<Writer>* writer);
|
||||
|
||||
// Get the open non-TTL blob log file, or create a new one if no such file
|
||||
// exists.
|
||||
Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
|
||||
|
||||
// Get the open TTL blob log file for a certain expiration, or create a new
|
||||
// one if no such file exists.
|
||||
Status SelectBlobFileTTL(uint64_t expiration,
|
||||
std::shared_ptr<BlobFile>* blob_file);
|
||||
|
||||
// find an existing blob log file to append the value to
|
||||
Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
|
||||
|
||||
std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
|
||||
|
||||
// periodic sanity check. Bunch of checks
|
||||
@ -300,7 +313,9 @@ class BlobDBImpl : public BlobDB {
|
||||
void StartBackgroundTasks();
|
||||
|
||||
// add a new Blob File
|
||||
std::shared_ptr<BlobFile> NewBlobFile(const std::string& reason);
|
||||
std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl,
|
||||
const ExpirationRange& expiration_range,
|
||||
const std::string& reason);
|
||||
|
||||
// collect all the blob log files from the blob directory
|
||||
Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
|
||||
@ -434,9 +449,6 @@ class BlobDBImpl : public BlobDB {
|
||||
// The largest sequence number that has been flushed.
|
||||
SequenceNumber flush_sequence_;
|
||||
|
||||
// epoch or version of the open files.
|
||||
std::atomic<uint64_t> epoch_of_;
|
||||
|
||||
// opened non-TTL blob file.
|
||||
std::shared_ptr<BlobFile> open_non_ttl_file_;
|
||||
|
||||
|
@ -10,7 +10,6 @@
|
||||
#include <cinttypes>
|
||||
|
||||
#include <algorithm>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
|
||||
#include "db/column_family.h"
|
||||
@ -25,45 +24,24 @@ namespace rocksdb {
|
||||
|
||||
namespace blob_db {
|
||||
|
||||
BlobFile::BlobFile()
|
||||
: parent_(nullptr),
|
||||
file_number_(0),
|
||||
info_log_(nullptr),
|
||||
column_family_id_(std::numeric_limits<uint32_t>::max()),
|
||||
compression_(kNoCompression),
|
||||
has_ttl_(false),
|
||||
blob_count_(0),
|
||||
file_size_(0),
|
||||
closed_(false),
|
||||
immutable_sequence_(0),
|
||||
obsolete_(false),
|
||||
obsolete_sequence_(0),
|
||||
expiration_range_({0, 0}),
|
||||
last_access_(-1),
|
||||
last_fsync_(0),
|
||||
header_valid_(false),
|
||||
footer_valid_(false) {}
|
||||
|
||||
BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
|
||||
Logger* info_log)
|
||||
: parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {}
|
||||
|
||||
BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
|
||||
Logger* info_log, uint32_t column_family_id,
|
||||
CompressionType compression, bool has_ttl,
|
||||
const ExpirationRange& expiration_range)
|
||||
: parent_(p),
|
||||
path_to_dir_(bdir),
|
||||
file_number_(fn),
|
||||
info_log_(info_log),
|
||||
column_family_id_(std::numeric_limits<uint32_t>::max()),
|
||||
compression_(kNoCompression),
|
||||
has_ttl_(false),
|
||||
blob_count_(0),
|
||||
file_size_(0),
|
||||
closed_(false),
|
||||
immutable_sequence_(0),
|
||||
obsolete_(false),
|
||||
obsolete_sequence_(0),
|
||||
expiration_range_({0, 0}),
|
||||
last_access_(-1),
|
||||
last_fsync_(0),
|
||||
header_valid_(false),
|
||||
footer_valid_(false) {}
|
||||
column_family_id_(column_family_id),
|
||||
compression_(compression),
|
||||
has_ttl_(has_ttl),
|
||||
expiration_range_(expiration_range),
|
||||
header_(column_family_id, compression, has_ttl, expiration_range),
|
||||
header_valid_(true) {}
|
||||
|
||||
BlobFile::~BlobFile() {
|
||||
if (obsolete_) {
|
||||
|
@ -6,6 +6,7 @@
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <atomic>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <unordered_set>
|
||||
|
||||
@ -29,7 +30,7 @@ class BlobFile {
|
||||
|
||||
private:
|
||||
// access to parent
|
||||
const BlobDBImpl* parent_;
|
||||
const BlobDBImpl* parent_{nullptr};
|
||||
|
||||
// path to blob directory
|
||||
std::string path_to_dir_;
|
||||
@ -37,49 +38,50 @@ class BlobFile {
|
||||
// the id of the file.
|
||||
// the above 2 are created during file creation and never changed
|
||||
// after that
|
||||
uint64_t file_number_;
|
||||
uint64_t file_number_{0};
|
||||
|
||||
// The file numbers of the SST files whose oldest blob file reference
|
||||
// points to this blob file.
|
||||
std::unordered_set<uint64_t> linked_sst_files_;
|
||||
|
||||
// Info log.
|
||||
Logger* info_log_;
|
||||
Logger* info_log_{nullptr};
|
||||
|
||||
// Column family id.
|
||||
uint32_t column_family_id_;
|
||||
uint32_t column_family_id_{std::numeric_limits<uint32_t>::max()};
|
||||
|
||||
// Compression type of blobs in the file
|
||||
CompressionType compression_;
|
||||
CompressionType compression_{kNoCompression};
|
||||
|
||||
// If true, the keys in this file all has TTL. Otherwise all keys don't
|
||||
// have TTL.
|
||||
bool has_ttl_;
|
||||
bool has_ttl_{false};
|
||||
|
||||
// TTL range of blobs in the file.
|
||||
ExpirationRange expiration_range_;
|
||||
|
||||
// number of blobs in the file
|
||||
std::atomic<uint64_t> blob_count_;
|
||||
std::atomic<uint64_t> blob_count_{0};
|
||||
|
||||
// size of the file
|
||||
std::atomic<uint64_t> file_size_;
|
||||
std::atomic<uint64_t> file_size_{0};
|
||||
|
||||
BlobLogHeader header_;
|
||||
|
||||
// closed_ = true implies the file is no more mutable
|
||||
// no more blobs will be appended and the footer has been written out
|
||||
std::atomic<bool> closed_;
|
||||
std::atomic<bool> closed_{false};
|
||||
|
||||
// The latest sequence number when the file was closed/made immutable.
|
||||
SequenceNumber immutable_sequence_;
|
||||
SequenceNumber immutable_sequence_{0};
|
||||
|
||||
// has a pass of garbage collection successfully finished on this file
|
||||
// obsolete_ still needs to do iterator/snapshot checks
|
||||
std::atomic<bool> obsolete_;
|
||||
std::atomic<bool> obsolete_{false};
|
||||
|
||||
// The last sequence number by the time the file marked as obsolete.
|
||||
// Data in this file is visible to a snapshot taken before the sequence.
|
||||
SequenceNumber obsolete_sequence_;
|
||||
|
||||
ExpirationRange expiration_range_;
|
||||
SequenceNumber obsolete_sequence_{0};
|
||||
|
||||
// Sequential/Append writer for blobs
|
||||
std::shared_ptr<Writer> log_writer_;
|
||||
@ -92,29 +94,30 @@ class BlobFile {
|
||||
mutable port::RWMutex mutex_;
|
||||
|
||||
// time when the random access reader was last created.
|
||||
std::atomic<std::int64_t> last_access_;
|
||||
std::atomic<std::int64_t> last_access_{-1};
|
||||
|
||||
// last time file was fsync'd/fdatasyncd
|
||||
std::atomic<uint64_t> last_fsync_;
|
||||
std::atomic<uint64_t> last_fsync_{0};
|
||||
|
||||
bool header_valid_;
|
||||
bool header_valid_{false};
|
||||
|
||||
bool footer_valid_;
|
||||
bool footer_valid_{false};
|
||||
|
||||
public:
|
||||
BlobFile();
|
||||
BlobFile() = default;
|
||||
|
||||
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
|
||||
Logger* info_log);
|
||||
|
||||
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
|
||||
Logger* info_log, uint32_t column_family_id,
|
||||
CompressionType compression, bool has_ttl,
|
||||
const ExpirationRange& expiration_range);
|
||||
|
||||
~BlobFile();
|
||||
|
||||
uint32_t column_family_id() const;
|
||||
|
||||
void SetColumnFamilyId(uint32_t cf_id) {
|
||||
column_family_id_ = cf_id;
|
||||
}
|
||||
|
||||
// Returns log file's absolute pathname.
|
||||
std::string PathName() const;
|
||||
|
||||
@ -203,10 +206,6 @@ class BlobFile {
|
||||
|
||||
CompressionType compression() const { return compression_; }
|
||||
|
||||
void SetCompression(CompressionType c) {
|
||||
compression_ = c;
|
||||
}
|
||||
|
||||
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
|
||||
|
||||
// Read blob file header and footer. Return corruption if file header is
|
||||
|
@ -43,11 +43,19 @@ using ExpirationRange = std::pair<uint64_t, uint64_t>;
|
||||
struct BlobLogHeader {
|
||||
static constexpr size_t kSize = 30;
|
||||
|
||||
BlobLogHeader() = default;
|
||||
BlobLogHeader(uint32_t _column_family_id, CompressionType _compression,
|
||||
bool _has_ttl, const ExpirationRange& _expiration_range)
|
||||
: column_family_id(_column_family_id),
|
||||
compression(_compression),
|
||||
has_ttl(_has_ttl),
|
||||
expiration_range(_expiration_range) {}
|
||||
|
||||
uint32_t version = kVersion1;
|
||||
uint32_t column_family_id = 0;
|
||||
CompressionType compression = kNoCompression;
|
||||
bool has_ttl = false;
|
||||
ExpirationRange expiration_range = std::make_pair(0, 0);
|
||||
ExpirationRange expiration_range;
|
||||
|
||||
void EncodeTo(std::string* dst);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user