Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
fd8247b1be | ||
|
fe352574b4 | ||
|
6fbe42a6fd | ||
|
e6c47cd6cc | ||
|
645c445978 | ||
|
f952de5be2 | ||
|
4362985805 | ||
|
85cdad116b | ||
|
07e3794972 | ||
|
1624f20934 | ||
|
1e96a70be4 | ||
|
efe9d5c823 | ||
|
a265ac75ab | ||
|
9435b2e959 | ||
|
0f8c041ea7 | ||
|
9da6019f9d | ||
|
d5e2462946 | ||
|
5b3ebdc3d1 |
20
HISTORY.md
20
HISTORY.md
@ -1,4 +1,24 @@
|
||||
# Rocksdb Change Log
|
||||
## 6.19.4 (04/23/2021)
|
||||
### Bug Fixes
|
||||
* Fixed a bug where ingested files were written with incorrect boundary key metadata. In rare cases this could have led to a level's files being wrongly ordered and queries for the boundary keys returning wrong results.
|
||||
* Fixed the false-positive alert when recovering from the WAL file. Avoid reporting "SST file is ahead of WAL" on a newly created empty column family, if the previous WAL file is corrupted.
|
||||
|
||||
### Behavior Changes
|
||||
* Due to the fix of false-postive alert of "SST file is ahead of WAL", all the CFs with no SST file (CF empty) will bypass the consistency check. We fixed a false-positive, but introduced a very rare true-negative which will be triggered in the following conditions: A CF with some delete operations in the last a few queries which will result in an empty CF (those are flushed to SST file and a compaction triggered which combines this file and all other SST files and generates an empty CF, or there is another reason to write a manifest entry for this CF after a flush that generates no SST file from an empty CF). The deletion entries are logged in a WAL and this WAL was corrupted, while the CF's log number points to the next WAL (due to the flush). Therefore, the DB can only recover to the point without these trailing deletions and cause the inconsistent DB status.
|
||||
|
||||
## 6.19.3 (04/19/2021)
|
||||
### Bug Fixes
|
||||
* Fixed a bug in handling file rename error in distributed/network file systems when the server succeeds but client returns error. The bug can cause CURRENT file to point to non-existing MANIFEST file, thus DB cannot be opened.
|
||||
|
||||
## 6.19.2 (04/08/2021)
|
||||
### Bug Fixes
|
||||
* Fixed a backward iteration bug with partitioned filter enabled: not including the prefix of the last key of the previous filter partition in current filter partition can cause wrong iteration result.
|
||||
|
||||
## 6.19.1 (04/01/2021)
|
||||
### Bug Fixes
|
||||
* Fixed crash (divide by zero) when compression dictionary is applied to a file containing only range tombstones.
|
||||
|
||||
## 6.19.0 (03/21/2021)
|
||||
### Bug Fixes
|
||||
* Fixed the truncation error found in APIs/tools when dumping block-based SST files in a human-readable format. After fix, the block-based table can be fully dumped as a readable file.
|
||||
|
10
Makefile
10
Makefile
@ -501,6 +501,12 @@ ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1)
|
||||
LIB_OBJECTS += $(patsubst %.cpp, $(OBJ_DIR)/%.o, $(FOLLY_SOURCES))
|
||||
endif
|
||||
|
||||
# range_tree is not compatible with non GNU libc on ppc64
|
||||
# see https://jira.percona.com/browse/PS-7559
|
||||
ifneq ($(PPC_LIBC_IS_GNU),0)
|
||||
LIB_OBJECTS += $(patsubst %.cc, $(OBJ_DIR)/%.o, $(RANGE_TREE_SOURCES))
|
||||
endif
|
||||
|
||||
GTEST = $(OBJ_DIR)/$(GTEST_DIR)/gtest/gtest-all.o
|
||||
TESTUTIL = $(OBJ_DIR)/test_util/testutil.o
|
||||
TESTHARNESS = $(OBJ_DIR)/test_util/testharness.o $(TESTUTIL) $(GTEST)
|
||||
@ -2183,8 +2189,8 @@ SNAPPY_DOWNLOAD_BASE ?= https://github.com/google/snappy/archive
|
||||
LZ4_VER ?= 1.9.3
|
||||
LZ4_SHA256 ?= 030644df4611007ff7dc962d981f390361e6c97a34e5cbc393ddfbe019ffe2c1
|
||||
LZ4_DOWNLOAD_BASE ?= https://github.com/lz4/lz4/archive
|
||||
ZSTD_VER ?= 1.4.7
|
||||
ZSTD_SHA256 ?= 085500c8d0b9c83afbc1dc0d8b4889336ad019eba930c5d6a9c6c86c20c769c8
|
||||
ZSTD_VER ?= 1.4.9
|
||||
ZSTD_SHA256 ?= acf714d98e3db7b876e5b540cbf6dee298f60eb3c0723104f6d3f065cd60d6a8
|
||||
ZSTD_DOWNLOAD_BASE ?= https://github.com/facebook/zstd/archive
|
||||
CURL_SSL_OPTS ?= --tlsv1
|
||||
|
||||
|
16
TARGETS
16
TARGETS
@ -87,10 +87,12 @@ ROCKSDB_PREPROCESSOR_FLAGS = [
|
||||
|
||||
# Added missing flags from output of build_detect_platform
|
||||
"-DROCKSDB_BACKTRACE",
|
||||
]
|
||||
|
||||
# Directories with files for #include
|
||||
"-I" + REPO_PATH + "include/",
|
||||
"-I" + REPO_PATH,
|
||||
# Directories with files for #include
|
||||
ROCKSDB_INCLUDE_PATHS = [
|
||||
"",
|
||||
"include",
|
||||
]
|
||||
|
||||
ROCKSDB_ARCH_PREPROCESSOR_FLAGS = {
|
||||
@ -428,6 +430,7 @@ cpp_library(
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
link_whole = False,
|
||||
@ -735,6 +738,7 @@ cpp_library(
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
link_whole = True,
|
||||
@ -758,6 +762,7 @@ cpp_library(
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [":rocksdb_lib"],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS + [
|
||||
("googletest", None, "gtest"),
|
||||
@ -779,6 +784,7 @@ cpp_library(
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [":rocksdb_lib"],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
link_whole = False,
|
||||
@ -806,6 +812,7 @@ cpp_library(
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = ROCKSDB_LIB_DEPS,
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
)
|
||||
@ -817,6 +824,7 @@ cpp_binary(
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [":rocksdb_test_lib"],
|
||||
) if not is_opt_mode else None
|
||||
|
||||
@ -838,6 +846,7 @@ cpp_library(
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [":rocksdb_test_lib"],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
link_whole = False,
|
||||
@ -2118,6 +2127,7 @@ ROCKS_TESTS = [
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS + extra_compiler_flags,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [":rocksdb_test_lib"] + extra_deps,
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS + [
|
||||
("googletest", None, "gtest"),
|
||||
|
@ -141,11 +141,15 @@ def generate_targets(repo_path, deps_map):
|
||||
TARGETS.add_library(
|
||||
"rocksdb_lib",
|
||||
src_mk["LIB_SOURCES"] +
|
||||
# always add range_tree, it's only excluded on ppc64, which we don't use internally
|
||||
src_mk["RANGE_TREE_SOURCES"] +
|
||||
src_mk["TOOL_LIB_SOURCES"])
|
||||
# rocksdb_whole_archive_lib
|
||||
TARGETS.add_library(
|
||||
"rocksdb_whole_archive_lib",
|
||||
src_mk["LIB_SOURCES"] +
|
||||
# always add range_tree, it's only excluded on ppc64, which we don't use internally
|
||||
src_mk["RANGE_TREE_SOURCES"] +
|
||||
src_mk["TOOL_LIB_SOURCES"],
|
||||
deps=None,
|
||||
headers=None,
|
||||
|
@ -87,6 +87,7 @@ cpp_binary(
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [":rocksdb_test_lib"],
|
||||
) if not is_opt_mode else None
|
||||
|
||||
|
@ -94,10 +94,12 @@ ROCKSDB_PREPROCESSOR_FLAGS = [
|
||||
|
||||
# Added missing flags from output of build_detect_platform
|
||||
"-DROCKSDB_BACKTRACE",
|
||||
]
|
||||
|
||||
# Directories with files for #include
|
||||
"-I" + REPO_PATH + "include/",
|
||||
"-I" + REPO_PATH,
|
||||
# Directories with files for #include
|
||||
ROCKSDB_INCLUDE_PATHS = [
|
||||
"",
|
||||
"include",
|
||||
]
|
||||
|
||||
ROCKSDB_ARCH_PREPROCESSOR_FLAGS = {{
|
||||
@ -145,6 +147,7 @@ cpp_library(
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [{deps}],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS{extra_external_deps},
|
||||
link_whole = {link_whole},
|
||||
@ -161,6 +164,7 @@ cpp_library(
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = ROCKSDB_LIB_DEPS,
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
)
|
||||
@ -173,6 +177,7 @@ cpp_binary(
|
||||
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [{deps}],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
)
|
||||
@ -203,6 +208,7 @@ ROCKS_TESTS = [
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS + extra_compiler_flags,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
include_paths = ROCKSDB_INCLUDE_PATHS,
|
||||
deps = [":rocksdb_test_lib"] + extra_deps,
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS + [
|
||||
("googletest", None, "gtest"),
|
||||
|
@ -668,6 +668,23 @@ else
|
||||
fi
|
||||
fi
|
||||
|
||||
if test -n "`echo $TARGET_ARCHITECTURE | grep ^ppc64`"; then
|
||||
# check for GNU libc on ppc64
|
||||
$CXX -x c++ - -o /dev/null 2>/dev/null <<EOF
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <gnu/libc-version.h>
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
printf("GNU libc version: %s\n", gnu_get_libc_version());
|
||||
return 0;
|
||||
}
|
||||
EOF
|
||||
if [ "$?" != 0 ]; then
|
||||
PPC_LIBC_IS_GNU=0
|
||||
fi
|
||||
fi
|
||||
|
||||
if test "$TRY_SSE_ETC"; then
|
||||
# The USE_SSE flag now means "attempt to compile with widely-available
|
||||
# Intel architecture extensions utilized by specific optimizations in the
|
||||
@ -861,3 +878,6 @@ echo "LUA_PATH=$LUA_PATH" >> "$OUTPUT"
|
||||
if test -n "$USE_FOLLY_DISTRIBUTED_MUTEX"; then
|
||||
echo "USE_FOLLY_DISTRIBUTED_MUTEX=$USE_FOLLY_DISTRIBUTED_MUTEX" >> "$OUTPUT"
|
||||
fi
|
||||
if test -n "$PPC_LIBC_IS_GNU"; then
|
||||
echo "PPC_LIBC_IS_GNU=$PPC_LIBC_IS_GNU" >> "$OUTPUT"
|
||||
fi
|
||||
|
@ -7,6 +7,9 @@
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include <iomanip>
|
||||
#include <sstream>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "options/options_helper.h"
|
||||
#include "port/stack_trace.h"
|
||||
@ -2117,6 +2120,54 @@ TEST_F(DBBloomFilterTest, DynamicBloomFilterOptions) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBloomFilterTest, SeekForPrevWithPartitionedFilters) {
|
||||
Options options = CurrentOptions();
|
||||
constexpr size_t kNumKeys = 10000;
|
||||
static_assert(kNumKeys <= 10000, "kNumKeys have to be <= 10000");
|
||||
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeys + 10));
|
||||
options.create_if_missing = true;
|
||||
constexpr size_t kPrefixLength = 4;
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(kPrefixLength));
|
||||
options.compression = kNoCompression;
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.filter_policy.reset(NewBloomFilterPolicy(50));
|
||||
bbto.index_shortening =
|
||||
BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
|
||||
bbto.block_size = 128;
|
||||
bbto.metadata_block_size = 128;
|
||||
bbto.partition_filters = true;
|
||||
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
DestroyAndReopen(options);
|
||||
|
||||
const std::string value(64, '\0');
|
||||
|
||||
WriteOptions write_opts;
|
||||
write_opts.disableWAL = true;
|
||||
for (size_t i = 0; i < kNumKeys; ++i) {
|
||||
std::ostringstream oss;
|
||||
oss << std::setfill('0') << std::setw(4) << std::fixed << i;
|
||||
ASSERT_OK(db_->Put(write_opts, oss.str(), value));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
ReadOptions read_opts;
|
||||
// Use legacy, implicit prefix seek
|
||||
read_opts.total_order_seek = false;
|
||||
read_opts.auto_prefix_mode = false;
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
|
||||
for (size_t i = 0; i < kNumKeys; ++i) {
|
||||
// Seek with a key after each one added but with same prefix. One will
|
||||
// surely cross a partition boundary.
|
||||
std::ostringstream oss;
|
||||
oss << std::setfill('0') << std::setw(4) << std::fixed << i << "a";
|
||||
it->SeekForPrev(oss.str());
|
||||
ASSERT_OK(it->status());
|
||||
ASSERT_TRUE(it->Valid());
|
||||
}
|
||||
it.reset();
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -260,18 +260,16 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
// be pessimistic and try write to a new MANIFEST.
|
||||
// TODO: distinguish between MANIFEST write and CURRENT renaming
|
||||
if (!versions_->io_status().ok()) {
|
||||
if (total_log_size_ > 0) {
|
||||
// If the WAL is empty, we use different error reason
|
||||
error_handler_.SetBGError(io_s,
|
||||
BackgroundErrorReason::kManifestWrite);
|
||||
} else {
|
||||
error_handler_.SetBGError(io_s,
|
||||
BackgroundErrorReason::kManifestWriteNoWAL);
|
||||
}
|
||||
} else if (total_log_size_ > 0 || !log_io_s.ok()) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
|
||||
// If WAL sync is successful (either WAL size is 0 or there is no IO
|
||||
// error), all the Manifest write will be map to soft error.
|
||||
// TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor is
|
||||
// needed.
|
||||
error_handler_.SetBGError(io_s,
|
||||
BackgroundErrorReason::kManifestWriteNoWAL);
|
||||
} else {
|
||||
// If the WAL is empty, we use different error reason
|
||||
// If WAL sync is successful (either WAL size is 0 or there is no IO
|
||||
// error), all the other SST file write errors will be set as
|
||||
// kFlushNoWAL.
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
|
||||
}
|
||||
} else {
|
||||
@ -687,18 +685,16 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
// be pessimistic and try write to a new MANIFEST.
|
||||
// TODO: distinguish between MANIFEST write and CURRENT renaming
|
||||
if (!versions_->io_status().ok()) {
|
||||
if (total_log_size_ > 0) {
|
||||
// If the WAL is empty, we use different error reason
|
||||
error_handler_.SetBGError(io_s,
|
||||
BackgroundErrorReason::kManifestWrite);
|
||||
} else {
|
||||
error_handler_.SetBGError(io_s,
|
||||
BackgroundErrorReason::kManifestWriteNoWAL);
|
||||
}
|
||||
} else if (total_log_size_ > 0) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
|
||||
// If WAL sync is successful (either WAL size is 0 or there is no IO
|
||||
// error), all the Manifest write will be map to soft error.
|
||||
// TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor
|
||||
// is needed.
|
||||
error_handler_.SetBGError(io_s,
|
||||
BackgroundErrorReason::kManifestWriteNoWAL);
|
||||
} else {
|
||||
// If the WAL is empty, we use different error reason
|
||||
// If WAL sync is successful (either WAL size is 0 or there is no IO
|
||||
// error), all the other SST file write errors will be set as
|
||||
// kFlushNoWAL.
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
|
||||
}
|
||||
} else {
|
||||
@ -2567,6 +2563,8 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
|
||||
|
||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
|
||||
immutable_db_options_.info_log.get());
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:1");
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:2");
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
assert(bg_flush_scheduled_);
|
||||
|
@ -937,7 +937,7 @@ Status DBImpl::DeleteUnreferencedSstFiles() {
|
||||
return s;
|
||||
}
|
||||
|
||||
if (largest_file_number > next_file_number) {
|
||||
if (largest_file_number >= next_file_number) {
|
||||
versions_->next_file_number_.store(largest_file_number + 1);
|
||||
}
|
||||
|
||||
|
@ -283,6 +283,9 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
|
||||
const std::string manifest = DescriptorFileName(dbname_, 1);
|
||||
{
|
||||
if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
|
||||
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
|
||||
}
|
||||
std::unique_ptr<FSWritableFile> file;
|
||||
FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
|
||||
s = NewWritableFile(fs_.get(), manifest, &file, file_options);
|
||||
@ -312,7 +315,7 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
|
||||
manifest.substr(manifest.find_last_of("/\\") + 1));
|
||||
}
|
||||
} else {
|
||||
fs_->DeleteFile(manifest, IOOptions(), nullptr);
|
||||
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -1132,11 +1135,29 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
|
||||
immutable_db_options_.wal_recovery_mode ==
|
||||
WALRecoveryMode::kTolerateCorruptedTailRecords)) {
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->GetLogNumber() > corrupted_wal_number) {
|
||||
// One special case cause cfd->GetLogNumber() > corrupted_wal_number but
|
||||
// the CF is still consistent: If a new column family is created during
|
||||
// the flush and the WAL sync fails at the same time, the new CF points to
|
||||
// the new WAL but the old WAL is curropted. Since the new CF is empty, it
|
||||
// is still consistent. We add the check of CF sst file size to avoid the
|
||||
// false positive alert.
|
||||
|
||||
// Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
|
||||
// the ignorance of a very rare inconsistency case caused in data
|
||||
// canclation. One CF is empty due to KV deletion. But those operations
|
||||
// are in the WAL. If the WAL is corrupted, the status of this CF might
|
||||
// not be consistent with others. However, the consistency check will be
|
||||
// bypassed due to empty CF.
|
||||
// TODO: a better and complete implementation is needed to ensure strict
|
||||
// consistency check in WAL recovery including hanlding the tailing
|
||||
// issues.
|
||||
if (cfd->GetLogNumber() > corrupted_wal_number &&
|
||||
cfd->GetLiveSstFilesSize() > 0) {
|
||||
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
||||
"Column family inconsistency: SST file contains data"
|
||||
" beyond the point of corruption.");
|
||||
return Status::Corruption("SST file is ahead of WALs");
|
||||
return Status::Corruption("SST file is ahead of WALs in CF " +
|
||||
cfd->GetName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -73,6 +73,15 @@ TEST_F(DBRangeDelTest, FlushOutputHasOnlyRangeTombstones) {
|
||||
} while (ChangeOptions(kRangeDelSkipConfigs));
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, DictionaryCompressionWithOnlyRangeTombstones) {
|
||||
Options opts = CurrentOptions();
|
||||
opts.compression_opts.max_dict_bytes = 16384;
|
||||
Reopen(opts);
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1",
|
||||
"dr2"));
|
||||
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, CompactionOutputHasOnlyRangeTombstone) {
|
||||
do {
|
||||
Options opts = CurrentOptions();
|
||||
|
121
db/db_test2.cc
121
db/db_test2.cc
@ -5428,6 +5428,98 @@ TEST_F(DBTest2, AutoPrefixMode1) {
|
||||
ASSERT_EQ("a1", iterator->key().ToString());
|
||||
}
|
||||
}
|
||||
|
||||
class RenameCurrentTest : public DBTestBase,
|
||||
public testing::WithParamInterface<std::string> {
|
||||
public:
|
||||
RenameCurrentTest()
|
||||
: DBTestBase("rename_current_test", /*env_do_fsync=*/true),
|
||||
sync_point_(GetParam()) {}
|
||||
|
||||
~RenameCurrentTest() override {}
|
||||
|
||||
void SetUp() override {
|
||||
env_->no_file_overwrite_.store(true, std::memory_order_release);
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
env_->no_file_overwrite_.store(false, std::memory_order_release);
|
||||
}
|
||||
|
||||
void SetupSyncPoints() {
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->SetCallBack(sync_point_, [&](void* arg) {
|
||||
Status* s = reinterpret_cast<Status*>(arg);
|
||||
assert(s);
|
||||
*s = Status::IOError("Injected IO error.");
|
||||
});
|
||||
}
|
||||
|
||||
const std::string sync_point_;
|
||||
};
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DistributedFS, RenameCurrentTest,
|
||||
::testing::Values("SetCurrentFile:BeforeRename",
|
||||
"SetCurrentFile:AfterRename"));
|
||||
|
||||
TEST_P(RenameCurrentTest, Open) {
|
||||
Destroy(last_options_);
|
||||
Options options = GetDefaultOptions();
|
||||
options.create_if_missing = true;
|
||||
SetupSyncPoints();
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
Status s = TryReopen(options);
|
||||
ASSERT_NOK(s);
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
Reopen(options);
|
||||
}
|
||||
|
||||
TEST_P(RenameCurrentTest, Flush) {
|
||||
Destroy(last_options_);
|
||||
Options options = GetDefaultOptions();
|
||||
options.max_manifest_file_size = 1;
|
||||
options.create_if_missing = true;
|
||||
Reopen(options);
|
||||
ASSERT_OK(Put("key", "value"));
|
||||
SetupSyncPoints();
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
ASSERT_NOK(Flush());
|
||||
|
||||
ASSERT_NOK(Put("foo", "value"));
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
Reopen(options);
|
||||
ASSERT_EQ("value", Get("key"));
|
||||
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||
}
|
||||
|
||||
TEST_P(RenameCurrentTest, Compaction) {
|
||||
Destroy(last_options_);
|
||||
Options options = GetDefaultOptions();
|
||||
options.max_manifest_file_size = 1;
|
||||
options.create_if_missing = true;
|
||||
Reopen(options);
|
||||
ASSERT_OK(Put("a", "a_value"));
|
||||
ASSERT_OK(Put("c", "c_value"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
ASSERT_OK(Put("b", "b_value"));
|
||||
ASSERT_OK(Put("d", "d_value"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
SetupSyncPoints();
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
|
||||
/*end=*/nullptr));
|
||||
|
||||
ASSERT_NOK(Put("foo", "value"));
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
Reopen(options);
|
||||
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||
ASSERT_EQ("d_value", Get("d"));
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// WAL recovery mode is WALRecoveryMode::kPointInTimeRecovery.
|
||||
@ -5455,6 +5547,35 @@ TEST_F(DBTest2, PointInTimeRecoveryWithIOErrorWhileReadingWal) {
|
||||
Status s = TryReopen(options);
|
||||
ASSERT_TRUE(s.IsIOError());
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, PointInTimeRecoveryWithSyncFailureInCFCreation) {
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::BackgroundCallFlush:Start:1",
|
||||
"PointInTimeRecoveryWithSyncFailureInCFCreation:1"},
|
||||
{"PointInTimeRecoveryWithSyncFailureInCFCreation:2",
|
||||
"DBImpl::BackgroundCallFlush:Start:2"}});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
CreateColumnFamilies({"test1"}, Options());
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
|
||||
// Creating a CF when a flush is going on, log is synced but the
|
||||
// closed log file is not synced and corrupted.
|
||||
port::Thread flush_thread([&]() { ASSERT_NOK(Flush()); });
|
||||
TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:1");
|
||||
CreateColumnFamilies({"test2"}, Options());
|
||||
env_->corrupt_in_sync_ = true;
|
||||
TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:2");
|
||||
flush_thread.join();
|
||||
env_->corrupt_in_sync_ = false;
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
// Reopening the DB should not corrupt anything
|
||||
Options options = CurrentOptions();
|
||||
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
||||
ReopenWithColumnFamilies({"default", "test1", "test2"}, options);
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
@ -44,6 +44,7 @@ SpecialEnv::SpecialEnv(Env* base, bool time_elapse_only_sleep)
|
||||
manifest_sync_error_.store(false, std::memory_order_release);
|
||||
manifest_write_error_.store(false, std::memory_order_release);
|
||||
log_write_error_.store(false, std::memory_order_release);
|
||||
no_file_overwrite_.store(false, std::memory_order_release);
|
||||
random_file_open_counter_.store(0, std::memory_order_relaxed);
|
||||
delete_count_.store(0, std::memory_order_relaxed);
|
||||
num_open_wal_file_.store(0);
|
||||
|
@ -393,6 +393,10 @@ class SpecialEnv : public EnvWrapper {
|
||||
Status Flush() override { return base_->Flush(); }
|
||||
Status Sync() override {
|
||||
++env_->sync_counter_;
|
||||
if (env_->corrupt_in_sync_) {
|
||||
Append(std::string(33000, ' '));
|
||||
return Status::IOError("Ingested Sync Failure");
|
||||
}
|
||||
if (env_->skip_fsync_) {
|
||||
return Status::OK();
|
||||
} else {
|
||||
@ -440,6 +444,11 @@ class SpecialEnv : public EnvWrapper {
|
||||
std::unique_ptr<WritableFile> base_;
|
||||
};
|
||||
|
||||
if (no_file_overwrite_.load(std::memory_order_acquire) &&
|
||||
target()->FileExists(f).ok()) {
|
||||
return Status::NotSupported("SpecialEnv::no_file_overwrite_ is true.");
|
||||
}
|
||||
|
||||
if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
|
||||
uint32_t random_number;
|
||||
{
|
||||
@ -687,6 +696,9 @@ class SpecialEnv : public EnvWrapper {
|
||||
// Slow down every log write, in micro-seconds.
|
||||
std::atomic<int> log_write_slowdown_;
|
||||
|
||||
// If true, returns Status::NotSupported for file overwrite.
|
||||
std::atomic<bool> no_file_overwrite_;
|
||||
|
||||
// Number of WAL files that are still open for write.
|
||||
std::atomic<int> num_open_wal_file_;
|
||||
|
||||
@ -709,6 +721,9 @@ class SpecialEnv : public EnvWrapper {
|
||||
// If true, all fsync to files and directories are skipped.
|
||||
bool skip_fsync_ = false;
|
||||
|
||||
// If true, ingest the corruption to file during sync.
|
||||
bool corrupt_in_sync_ = false;
|
||||
|
||||
std::atomic<uint32_t> non_writeable_rate_;
|
||||
|
||||
std::atomic<uint32_t> new_writable_count_;
|
||||
|
@ -216,7 +216,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteRetryableError) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
@ -242,7 +242,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteRetryableError) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
@ -256,7 +256,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteRetryableError) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
@ -292,7 +292,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteFileScopeError) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
@ -306,7 +306,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteFileScopeError) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
@ -320,7 +320,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteFileScopeError) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
@ -340,7 +340,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteFileScopeError) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
@ -649,7 +649,7 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
@ -695,7 +695,7 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteFileScopeError) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
@ -1698,7 +1698,7 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
|
||||
// to soft error and trigger auto resume. During auto resume, SwitchMemtable
|
||||
// is disabled to avoid small SST tables. Write can still be applied before
|
||||
// the bg error is cleaned unless the memtable is full.
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover1) {
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableErrorAutoRecover1) {
|
||||
// Activate the FS before the first resume
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
@ -1768,7 +1768,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover1) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover2) {
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableErrorAutoRecover2) {
|
||||
// Activate the FS before the first resume
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
@ -1810,14 +1810,14 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover2) {
|
||||
ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
|
||||
ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_AUTORESUME_COUNT));
|
||||
ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
|
||||
ASSERT_LE(0, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
|
||||
ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
|
||||
ASSERT_LE(0, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
|
||||
HistogramData autoresume_retry;
|
||||
options.statistics->histogramData(ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
|
||||
&autoresume_retry);
|
||||
ASSERT_EQ(autoresume_retry.max, 1);
|
||||
ASSERT_GE(autoresume_retry.max, 0);
|
||||
ASSERT_OK(Put(Key(2), "val2", wo));
|
||||
s = Flush();
|
||||
// Since auto resume is successful, the bg error is cleaned, flush will
|
||||
@ -1827,56 +1827,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover2) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover1) {
|
||||
// Fail the first resume and make the second resume successful
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_env_.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.max_bgerror_resume_count = 2;
|
||||
options.bgerror_resume_retry_interval = 100000; // 0.1 second
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
|
||||
ASSERT_OK(Put(Key(1), "val1"));
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"RecoverFromRetryableBGIOError:BeforeWait0",
|
||||
"FLushWritRetryableeErrorAutoRecover1:0"},
|
||||
{"FLushWritRetryableeErrorAutoRecover1:1",
|
||||
"RecoverFromRetryableBGIOError:BeforeWait1"},
|
||||
{"RecoverFromRetryableBGIOError:RecoverSuccess",
|
||||
"FLushWritRetryableeErrorAutoRecover1:2"}});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeFinishBuildTable",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:0");
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:1");
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:2");
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
Reopen(options);
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_OK(Put(Key(2), "val2"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover2) {
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAutoRecover1) {
|
||||
// Activate the FS before the first resume
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
@ -1901,7 +1852,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover2) {
|
||||
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
ASSERT_EQ(listener->WaitForRecovery(5000000), true);
|
||||
@ -1916,7 +1867,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover2) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover3) {
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAutoRecover2) {
|
||||
// Fail all the resume and let user to resume
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
@ -1936,18 +1887,18 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover3) {
|
||||
|
||||
ASSERT_OK(Put(Key(1), "val1"));
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"FLushWritRetryableeErrorAutoRecover3:0",
|
||||
{{"FLushWritRetryableeErrorAutoRecover2:0",
|
||||
"RecoverFromRetryableBGIOError:BeforeStart"},
|
||||
{"RecoverFromRetryableBGIOError:LoopOut",
|
||||
"FLushWritRetryableeErrorAutoRecover3:1"}});
|
||||
"FLushWritRetryableeErrorAutoRecover2:1"}});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeFinishBuildTable",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover3:0");
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover3:1");
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:0");
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:1");
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
@ -1965,173 +1916,6 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover3) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover4) {
|
||||
// Fail the first resume and does not do resume second time because
|
||||
// the IO error severity is Fatal Error and not Retryable.
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_env_.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.max_bgerror_resume_count = 2;
|
||||
options.bgerror_resume_retry_interval = 10; // 0.1 second
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
IOStatus nr_msg = IOStatus::IOError("No Retryable Fatal IO Error");
|
||||
nr_msg.SetRetryable(false);
|
||||
|
||||
ASSERT_OK(Put(Key(1), "val1"));
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"RecoverFromRetryableBGIOError:BeforeStart",
|
||||
"FLushWritRetryableeErrorAutoRecover4:0"},
|
||||
{"FLushWritRetryableeErrorAutoRecover4:2",
|
||||
"RecoverFromRetryableBGIOError:RecoverFail0"}});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeFinishBuildTable",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"RecoverFromRetryableBGIOError:BeforeResume1",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, nr_msg); });
|
||||
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover4:0");
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover4:2");
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
// Even the FS is recoverd, due to the Fatal Error in bg_error_ the resume
|
||||
// and flush will all fail.
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_NOK(dbfull()->Resume());
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_OK(Put(Key(2), "val2"));
|
||||
ASSERT_NOK(Flush());
|
||||
ASSERT_EQ("NOT_FOUND", Get(Key(2)));
|
||||
|
||||
Reopen(options);
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_OK(Put(Key(2), "val2"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover5) {
|
||||
// During the resume, call DB->CLose, make sure the resume thread exist
|
||||
// before close continues. Due to the shutdown, the resume is not successful
|
||||
// and the FS does not become active, so close status is still IO error
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_env_.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.max_bgerror_resume_count = 2;
|
||||
options.bgerror_resume_retry_interval = 10; // 0.1 second
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
|
||||
ASSERT_OK(Put(Key(1), "val1"));
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"RecoverFromRetryableBGIOError:BeforeStart",
|
||||
"FLushWritRetryableeErrorAutoRecover5:0"}});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeFinishBuildTable",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover5:0");
|
||||
// The first resume will cause recovery_error and its severity is the
|
||||
// Fatal error
|
||||
s = dbfull()->Close();
|
||||
ASSERT_NOK(s);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
|
||||
Reopen(options);
|
||||
ASSERT_NE("val1", Get(Key(1)));
|
||||
ASSERT_OK(Put(Key(2), "val2"));
|
||||
s = Flush();
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover6) {
|
||||
// During the resume, call DB->CLose, make sure the resume thread exist
|
||||
// before close continues. Due to the shutdown, the resume is not successful
|
||||
// and the FS does not become active, so close status is still IO error
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_env_.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.max_bgerror_resume_count = 2;
|
||||
options.bgerror_resume_retry_interval = 10; // 0.1 second
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
|
||||
ASSERT_OK(Put(Key(1), "val1"));
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"FLushWritRetryableeErrorAutoRecover6:0",
|
||||
"RecoverFromRetryableBGIOError:BeforeStart"},
|
||||
{"RecoverFromRetryableBGIOError:BeforeWait0",
|
||||
"FLushWritRetryableeErrorAutoRecover6:1"},
|
||||
{"FLushWritRetryableeErrorAutoRecover6:2",
|
||||
"RecoverFromRetryableBGIOError:BeforeWait1"},
|
||||
{"RecoverFromRetryableBGIOError:AfterWait0",
|
||||
"FLushWritRetryableeErrorAutoRecover6:3"}});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeFinishBuildTable",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:0");
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:1");
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:2");
|
||||
TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:3");
|
||||
// The first resume will cause recovery_error and its severity is the
|
||||
// Fatal error
|
||||
s = dbfull()->Close();
|
||||
ASSERT_OK(s);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
Reopen(options);
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_OK(Put(Key(2), "val2"));
|
||||
s = Flush();
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) {
|
||||
// Fail the first resume and let the second resume be successful
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
@ -2168,7 +1952,7 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) {
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:0");
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
|
@ -1542,6 +1542,44 @@ TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) {
|
||||
ASSERT_EQ(2, NumTableFilesAtLevel(0));
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileBasicTest, IngestFileAfterDBPut) {
|
||||
// Repro https://github.com/facebook/rocksdb/issues/6245.
|
||||
// Flush three files to L0. Ingest one more file to trigger L0->L1 compaction
|
||||
// via trivial move. The bug happened when L1 files were incorrectly sorted
|
||||
// resulting in an old value for "k" returned by `Get()`.
|
||||
Options options = CurrentOptions();
|
||||
|
||||
ASSERT_OK(Put("k", "a"));
|
||||
Flush();
|
||||
ASSERT_OK(Put("k", "a"));
|
||||
Flush();
|
||||
ASSERT_OK(Put("k", "a"));
|
||||
Flush();
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options);
|
||||
|
||||
// Current file size should be 0 after sst_file_writer init and before open a
|
||||
// file.
|
||||
ASSERT_EQ(sst_file_writer.FileSize(), 0);
|
||||
|
||||
std::string file1 = sst_files_dir_ + "file1.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1));
|
||||
ASSERT_OK(sst_file_writer.Put("k", "b"));
|
||||
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
ASSERT_OK(s) << s.ToString();
|
||||
|
||||
// Current file size should be non-zero after success write.
|
||||
ASSERT_GT(sst_file_writer.FileSize(), 0);
|
||||
|
||||
IngestExternalFileOptions ifo;
|
||||
s = db_->IngestExternalFile({file1}, ifo);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
|
||||
ASSERT_EQ(Get("k"), "b");
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
|
||||
testing::Values(std::make_tuple(true, true),
|
||||
std::make_tuple(true, false),
|
||||
|
@ -368,9 +368,32 @@ Status ExternalSstFileIngestionJob::Run() {
|
||||
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
|
||||
last_seqno, &f, &assigned_seqno);
|
||||
}
|
||||
|
||||
// Modify the smallest/largest internal key to include the sequence number
|
||||
// that we just learned. Only overwrite sequence number zero. There could
|
||||
// be a nonzero sequence number already to indicate a range tombstone's
|
||||
// exclusive endpoint.
|
||||
ParsedInternalKey smallest_parsed, largest_parsed;
|
||||
if (status.ok()) {
|
||||
status = ParseInternalKey(*f.smallest_internal_key.rep(),
|
||||
&smallest_parsed, false /* log_err_key */);
|
||||
}
|
||||
if (status.ok()) {
|
||||
status = ParseInternalKey(*f.largest_internal_key.rep(), &largest_parsed,
|
||||
false /* log_err_key */);
|
||||
}
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
if (smallest_parsed.sequence == 0) {
|
||||
UpdateInternalKey(f.smallest_internal_key.rep(), assigned_seqno,
|
||||
smallest_parsed.type);
|
||||
}
|
||||
if (largest_parsed.sequence == 0) {
|
||||
UpdateInternalKey(f.largest_internal_key.rep(), assigned_seqno,
|
||||
largest_parsed.type);
|
||||
}
|
||||
|
||||
status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
|
||||
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
|
||||
&assigned_seqno);
|
||||
|
@ -4083,6 +4083,7 @@ Status VersionSet::ProcessManifestWrites(
|
||||
uint64_t new_manifest_file_size = 0;
|
||||
Status s;
|
||||
IOStatus io_s;
|
||||
IOStatus manifest_io_status;
|
||||
{
|
||||
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
|
||||
mu->Unlock();
|
||||
@ -4134,6 +4135,7 @@ Status VersionSet::ProcessManifestWrites(
|
||||
s = WriteCurrentStateToManifest(curr_state, wal_additions,
|
||||
descriptor_log_.get(), io_s);
|
||||
} else {
|
||||
manifest_io_status = io_s;
|
||||
s = io_s;
|
||||
}
|
||||
}
|
||||
@ -4171,11 +4173,13 @@ Status VersionSet::ProcessManifestWrites(
|
||||
io_s = descriptor_log_->AddRecord(record);
|
||||
if (!io_s.ok()) {
|
||||
s = io_s;
|
||||
manifest_io_status = io_s;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
io_s = SyncManifest(db_options_, descriptor_log_->file());
|
||||
manifest_io_status = io_s;
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
|
||||
}
|
||||
@ -4188,6 +4192,9 @@ Status VersionSet::ProcessManifestWrites(
|
||||
|
||||
// If we just created a new descriptor file, install it by writing a
|
||||
// new CURRENT file that points to it.
|
||||
if (s.ok()) {
|
||||
assert(manifest_io_status.ok());
|
||||
}
|
||||
if (s.ok() && new_descriptor_log) {
|
||||
io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
|
||||
db_directory);
|
||||
@ -4303,11 +4310,41 @@ Status VersionSet::ProcessManifestWrites(
|
||||
for (auto v : versions) {
|
||||
delete v;
|
||||
}
|
||||
if (manifest_io_status.ok()) {
|
||||
manifest_file_number_ = pending_manifest_file_number_;
|
||||
manifest_file_size_ = new_manifest_file_size;
|
||||
}
|
||||
// If manifest append failed for whatever reason, the file could be
|
||||
// corrupted. So we need to force the next version update to start a
|
||||
// new manifest file.
|
||||
descriptor_log_.reset();
|
||||
if (new_descriptor_log) {
|
||||
// If manifest operations failed, then we know the CURRENT file still
|
||||
// points to the original MANIFEST. Therefore, we can safely delete the
|
||||
// new MANIFEST.
|
||||
// If manifest operations succeeded, and we are here, then it is possible
|
||||
// that renaming tmp file to CURRENT failed.
|
||||
//
|
||||
// On local POSIX-compliant FS, the CURRENT must point to the original
|
||||
// MANIFEST. We can delete the new MANIFEST for simplicity, but we can also
|
||||
// keep it. Future recovery will ignore this MANIFEST. It's also ok for the
|
||||
// process not to crash and continue using the db. Any future LogAndApply()
|
||||
// call will switch to a new MANIFEST and update CURRENT, still ignoring
|
||||
// this one.
|
||||
//
|
||||
// On non-local FS, it is
|
||||
// possible that the rename operation succeeded on the server (remote)
|
||||
// side, but the client somehow returns a non-ok status to RocksDB. Note
|
||||
// that this does not violate atomicity. Should we delete the new MANIFEST
|
||||
// successfully, a subsequent recovery attempt will likely see the CURRENT
|
||||
// pointing to the new MANIFEST, thus fail. We will not be able to open the
|
||||
// DB again. Therefore, if manifest operations succeed, we should keep the
|
||||
// the new MANIFEST. If the process proceeds, any future LogAndApply() call
|
||||
// will switch to a new MANIFEST and update CURRENT. If user tries to
|
||||
// re-open the DB,
|
||||
// a) CURRENT points to the new MANIFEST, and the new MANIFEST is present.
|
||||
// b) CURRENT points to the original MANIFEST, and the original MANIFEST
|
||||
// also exists.
|
||||
if (new_descriptor_log && !manifest_io_status.ok()) {
|
||||
ROCKS_LOG_INFO(db_options_->info_log,
|
||||
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
|
||||
"\n",
|
||||
|
2
env/env_hdfs.cc
vendored
2
env/env_hdfs.cc
vendored
@ -213,6 +213,8 @@ class HdfsWritableFile: public WritableFile {
|
||||
}
|
||||
}
|
||||
|
||||
using WritableFile::Append;
|
||||
|
||||
// If the file was successfully created, then this returns true.
|
||||
// Otherwise returns false.
|
||||
bool isValid() {
|
||||
|
12
env/fs_posix.cc
vendored
12
env/fs_posix.cc
vendored
@ -620,9 +620,10 @@ class PosixFileSystem : public FileSystem {
|
||||
}
|
||||
}
|
||||
|
||||
const auto pre_read_errno = errno; // errno may be modified by readdir
|
||||
// reset errno before calling readdir()
|
||||
errno = 0;
|
||||
struct dirent* entry;
|
||||
while ((entry = readdir(d)) != nullptr && errno == pre_read_errno) {
|
||||
while ((entry = readdir(d)) != nullptr) {
|
||||
// filter out '.' and '..' directory entries
|
||||
// which appear only on some platforms
|
||||
const bool ignore =
|
||||
@ -631,19 +632,20 @@ class PosixFileSystem : public FileSystem {
|
||||
if (!ignore) {
|
||||
result->push_back(entry->d_name);
|
||||
}
|
||||
errno = 0; // reset errno if readdir() success
|
||||
}
|
||||
|
||||
// always attempt to close the dir
|
||||
const auto pre_close_errno = errno; // errno may be modified by closedir
|
||||
const int close_result = closedir(d);
|
||||
|
||||
if (pre_close_errno != pre_read_errno) {
|
||||
// error occured during readdir
|
||||
if (pre_close_errno != 0) {
|
||||
// error occurred during readdir
|
||||
return IOError("While readdir", dir, pre_close_errno);
|
||||
}
|
||||
|
||||
if (close_result != 0) {
|
||||
// error occured during closedir
|
||||
// error occurred during closedir
|
||||
return IOError("While closedir", dir, errno);
|
||||
}
|
||||
|
||||
|
@ -383,10 +383,12 @@ IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
|
||||
contents.remove_prefix(dbname.size() + 1);
|
||||
std::string tmp = TempFileName(dbname, descriptor_number);
|
||||
IOStatus s = WriteStringToFile(fs, contents.ToString() + "\n", tmp, true);
|
||||
TEST_SYNC_POINT_CALLBACK("SetCurrentFile:BeforeRename", &s);
|
||||
if (s.ok()) {
|
||||
TEST_KILL_RANDOM("SetCurrentFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
|
||||
s = fs->RenameFile(tmp, CurrentFileName(dbname), IOOptions(), nullptr);
|
||||
TEST_KILL_RANDOM("SetCurrentFile:1", rocksdb_kill_odds * REDUCE_ODDS2);
|
||||
TEST_SYNC_POINT_CALLBACK("SetCurrentFile:AfterRename", &s);
|
||||
}
|
||||
if (s.ok()) {
|
||||
if (directory_to_fsync != nullptr) {
|
||||
|
@ -11,7 +11,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 19
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 4
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -1459,8 +1459,8 @@ public class RocksDBTest {
|
||||
assertThat(livefiles.manifestFileSize).isEqualTo(57);
|
||||
assertThat(livefiles.files.size()).isEqualTo(3);
|
||||
assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT");
|
||||
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000003");
|
||||
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000006");
|
||||
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004");
|
||||
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000007");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -680,7 +680,8 @@ IOStatus WinFileSystem::GetChildren(const std::string& dir,
|
||||
// which appear only on some platforms
|
||||
const bool ignore =
|
||||
((data.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) != 0) &&
|
||||
(strcmp(data.cFileName, ".") == 0 || strcmp(data.cFileName, "..") == 0);
|
||||
(RX_FNCMP(data.cFileName, ".") == 0 ||
|
||||
RX_FNCMP(data.cFileName, "..") == 0);
|
||||
if (!ignore) {
|
||||
auto x = RX_FILESTRING(data.cFileName, RX_FNLEN(data.cFileName));
|
||||
result->push_back(FN_TO_RX(x));
|
||||
|
@ -355,6 +355,7 @@ extern void SetCpuPriority(ThreadId id, CpuPriority priority);
|
||||
#define RX_FILESTRING std::wstring
|
||||
#define RX_FN(a) ROCKSDB_NAMESPACE::port::utf8_to_utf16(a)
|
||||
#define FN_TO_RX(a) ROCKSDB_NAMESPACE::port::utf16_to_utf8(a)
|
||||
#define RX_FNCMP(a, b) ::wcscmp(a, RX_FN(b).c_str())
|
||||
#define RX_FNLEN(a) ::wcslen(a)
|
||||
|
||||
#define RX_DeleteFile DeleteFileW
|
||||
@ -379,6 +380,7 @@ extern void SetCpuPriority(ThreadId id, CpuPriority priority);
|
||||
#define RX_FILESTRING std::string
|
||||
#define RX_FN(a) a
|
||||
#define FN_TO_RX(a) a
|
||||
#define RX_FNCMP(a, b) strcmp(a, b)
|
||||
#define RX_FNLEN(a) strlen(a)
|
||||
|
||||
#define RX_DeleteFile DeleteFileA
|
||||
|
30
src.mk
30
src.mk
@ -258,20 +258,6 @@ LIB_SOURCES = \
|
||||
utilities/transactions/lock/lock_manager.cc \
|
||||
utilities/transactions/lock/point/point_lock_tracker.cc \
|
||||
utilities/transactions/lock/point/point_lock_manager.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/keyrange.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/manager.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/treenode.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/txnid_set.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/wfg.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/standalone_port.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/util/dbt.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/util/memarena.cc \
|
||||
utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc \
|
||||
utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc \
|
||||
utilities/transactions/optimistic_transaction.cc \
|
||||
utilities/transactions/optimistic_transaction_db_impl.cc \
|
||||
utilities/transactions/pessimistic_transaction.cc \
|
||||
@ -303,6 +289,22 @@ LIB_SOURCES_ASM =
|
||||
LIB_SOURCES_C =
|
||||
endif
|
||||
|
||||
RANGE_TREE_SOURCES =\
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/keyrange.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/manager.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/treenode.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/txnid_set.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/locktree/wfg.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/standalone_port.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/util/dbt.cc \
|
||||
utilities/transactions/lock/range/range_tree/lib/util/memarena.cc \
|
||||
utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc \
|
||||
utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc
|
||||
|
||||
TOOL_LIB_SOURCES = \
|
||||
tools/io_tracer_parser_tool.cc \
|
||||
tools/ldb_cmd.cc \
|
||||
|
@ -1645,6 +1645,11 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
|
||||
? r->compression_opts.zstd_max_train_bytes
|
||||
: r->compression_opts.max_dict_bytes;
|
||||
const size_t kNumBlocksBuffered = r->data_block_and_keys_buffers.size();
|
||||
if (kNumBlocksBuffered == 0) {
|
||||
// The below code is neither safe nor necessary for handling zero data
|
||||
// blocks.
|
||||
return;
|
||||
}
|
||||
|
||||
// Abstract algebra teaches us that a finite cyclic group (such as the
|
||||
// additive group of integers modulo N) can be generated by a number that is
|
||||
|
@ -22,6 +22,7 @@ FullFilterBlockBuilder::FullFilterBlockBuilder(
|
||||
whole_key_filtering_(whole_key_filtering),
|
||||
last_whole_key_recorded_(false),
|
||||
last_prefix_recorded_(false),
|
||||
last_key_in_domain_(false),
|
||||
num_added_(0) {
|
||||
assert(filter_bits_builder != nullptr);
|
||||
filter_bits_builder_.reset(filter_bits_builder);
|
||||
@ -30,6 +31,15 @@ FullFilterBlockBuilder::FullFilterBlockBuilder(
|
||||
void FullFilterBlockBuilder::Add(const Slice& key_without_ts) {
|
||||
const bool add_prefix =
|
||||
prefix_extractor_ && prefix_extractor_->InDomain(key_without_ts);
|
||||
|
||||
if (!last_prefix_recorded_ && last_key_in_domain_) {
|
||||
// We can reach here when a new filter partition starts in partitioned
|
||||
// filter. The last prefix in the previous partition should be added if
|
||||
// necessary regardless of key_without_ts, to support prefix SeekForPrev.
|
||||
AddKey(last_prefix_str_);
|
||||
last_prefix_recorded_ = true;
|
||||
}
|
||||
|
||||
if (whole_key_filtering_) {
|
||||
if (!add_prefix) {
|
||||
AddKey(key_without_ts);
|
||||
@ -49,7 +59,10 @@ void FullFilterBlockBuilder::Add(const Slice& key_without_ts) {
|
||||
}
|
||||
}
|
||||
if (add_prefix) {
|
||||
last_key_in_domain_ = true;
|
||||
AddPrefix(key_without_ts);
|
||||
} else {
|
||||
last_key_in_domain_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,6 +74,7 @@ inline void FullFilterBlockBuilder::AddKey(const Slice& key) {
|
||||
|
||||
// Add prefix to filter if needed
|
||||
void FullFilterBlockBuilder::AddPrefix(const Slice& key) {
|
||||
assert(prefix_extractor_ && prefix_extractor_->InDomain(key));
|
||||
Slice prefix = prefix_extractor_->Transform(key);
|
||||
if (whole_key_filtering_) {
|
||||
// if both whole_key and prefix are added to bloom then we will have whole
|
||||
|
@ -61,6 +61,7 @@ class FullFilterBlockBuilder : public FilterBlockBuilder {
|
||||
virtual void Reset();
|
||||
void AddPrefix(const Slice& key);
|
||||
const SliceTransform* prefix_extractor() { return prefix_extractor_; }
|
||||
const std::string& last_prefix_str() const { return last_prefix_str_; }
|
||||
|
||||
private:
|
||||
// important: all of these might point to invalid addresses
|
||||
@ -72,10 +73,14 @@ class FullFilterBlockBuilder : public FilterBlockBuilder {
|
||||
std::string last_whole_key_str_;
|
||||
bool last_prefix_recorded_;
|
||||
std::string last_prefix_str_;
|
||||
// Whether prefix_extractor_->InDomain(last_whole_key_) is true.
|
||||
// Used in partitioned filters so that the last prefix from the previous
|
||||
// filter partition will be added to the current partition if
|
||||
// last_key_in_domain_ is true, regardless of the current key.
|
||||
bool last_key_in_domain_;
|
||||
|
||||
uint32_t num_added_;
|
||||
std::unique_ptr<const char[]> filter_data_;
|
||||
|
||||
};
|
||||
|
||||
// A FilterBlockReader is used to parse filter from SST table.
|
||||
|
@ -73,13 +73,16 @@ void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock(
|
||||
}
|
||||
filter_gc.push_back(std::unique_ptr<const char[]>(nullptr));
|
||||
|
||||
// Add the prefix of the next key before finishing the partition. This hack,
|
||||
// fixes a bug with format_verison=3 where seeking for the prefix would lead
|
||||
// us to the previous partition.
|
||||
const bool add_prefix =
|
||||
// Add the prefix of the next key before finishing the partition without
|
||||
// updating last_prefix_str_. This hack, fixes a bug with format_verison=3
|
||||
// where seeking for the prefix would lead us to the previous partition.
|
||||
const bool maybe_add_prefix =
|
||||
next_key && prefix_extractor() && prefix_extractor()->InDomain(*next_key);
|
||||
if (add_prefix) {
|
||||
FullFilterBlockBuilder::AddPrefix(*next_key);
|
||||
if (maybe_add_prefix) {
|
||||
const Slice next_key_prefix = prefix_extractor()->Transform(*next_key);
|
||||
if (next_key_prefix.compare(last_prefix_str()) != 0) {
|
||||
AddKey(next_key_prefix);
|
||||
}
|
||||
}
|
||||
|
||||
Slice filter = filter_bits_builder_->Finish(&filter_gc.back());
|
||||
|
@ -3990,8 +3990,7 @@ class TestPrefixExtractor : public ROCKSDB_NAMESPACE::SliceTransform {
|
||||
}
|
||||
|
||||
bool InDomain(const ROCKSDB_NAMESPACE::Slice& src) const override {
|
||||
assert(IsValid(src));
|
||||
return true;
|
||||
return IsValid(src);
|
||||
}
|
||||
|
||||
bool InRange(const ROCKSDB_NAMESPACE::Slice& /*dst*/) const override {
|
||||
|
@ -2618,19 +2618,19 @@ TEST_F(BackupableDBTest, GarbageCollectionBeforeBackup) {
|
||||
OpenDBAndBackupEngine(true);
|
||||
|
||||
ASSERT_OK(backup_chroot_env_->CreateDirIfMissing(backupdir_ + "/shared"));
|
||||
std::string file_five = backupdir_ + "/shared/000008.sst";
|
||||
std::string file_five = backupdir_ + "/shared/000009.sst";
|
||||
std::string file_five_contents = "I'm not really a sst file";
|
||||
// this depends on the fact that 00008.sst is the first file created by the DB
|
||||
// this depends on the fact that 00009.sst is the first file created by the DB
|
||||
ASSERT_OK(file_manager_->WriteToFile(file_five, file_five_contents));
|
||||
|
||||
FillDB(db_.get(), 0, 100);
|
||||
// backup overwrites file 000008.sst
|
||||
// backup overwrites file 000009.sst
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
|
||||
|
||||
std::string new_file_five_contents;
|
||||
ASSERT_OK(ReadFileToString(backup_chroot_env_.get(), file_five,
|
||||
&new_file_five_contents));
|
||||
// file 000008.sst was overwritten
|
||||
// file 000009.sst was overwritten
|
||||
ASSERT_TRUE(new_file_five_contents != file_five_contents);
|
||||
|
||||
CloseDBAndBackupEngine();
|
||||
|
@ -22,7 +22,7 @@
|
||||
#include <string>
|
||||
|
||||
#include "file/filename.h"
|
||||
#include "include/rocksdb/file_system.h"
|
||||
#include "rocksdb/file_system.h"
|
||||
#include "util/mutexlock.h"
|
||||
#include "util/random.h"
|
||||
#include "util/thread_local.h"
|
||||
|
Loading…
Reference in New Issue
Block a user