Compare commits
29 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
a21a78a3cf | ||
|
910ea96ff7 | ||
|
5841911c4b | ||
|
053e59c25f | ||
|
f6013e16bc | ||
|
cef4f9178f | ||
|
201c5a5e61 | ||
|
a7b0f02c47 | ||
|
0c785a442b | ||
|
1ceec266d7 | ||
|
1482c8697c | ||
|
ff76f053af | ||
|
402fe7d469 | ||
|
0367cd4370 | ||
|
515a27941a | ||
|
138a967fa6 | ||
|
98d19c3056 | ||
|
3f4008b1f1 | ||
|
8f9cc10916 | ||
|
bfc1b7acf0 | ||
|
e0fcbf93d0 | ||
|
f08ee8d3de | ||
|
60de1e6c6d | ||
|
18d0c90bba | ||
|
6d6e857404 | ||
|
3df88b3953 | ||
|
4584a99a3c | ||
|
cb33efd0a9 | ||
|
15ee2ee438 |
@ -287,6 +287,9 @@ script:
|
||||
|
||||
mkdir build && cd build && cmake -DJNI=1 .. -DCMAKE_BUILD_TYPE=Release $OPT && make -j4 rocksdb rocksdbjni
|
||||
;;
|
||||
status_checked)
|
||||
OPT=-DTRAVIS V=1 ASSERT_STATUS_CHECKED=1 make -j4 check_some
|
||||
;;
|
||||
esac
|
||||
notifications:
|
||||
email:
|
||||
|
43
HISTORY.md
43
HISTORY.md
@ -1,5 +1,30 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
## 6.10.4 (11/15/2020)
|
||||
### Bug Fixes
|
||||
* Fixed a bug of encoding and parsing BlockBasedTableOptions::read_amp_bytes_per_bit as a 64-bit integer.
|
||||
* Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected.
|
||||
|
||||
## 6.10.3 (6/16/2020)
|
||||
### Bug fix
|
||||
* Fix potential file descriptor leakage in PosixEnv's IsDirectory() and NewRandomAccessFile().
|
||||
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
|
||||
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
|
||||
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
|
||||
* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement.
|
||||
|
||||
## 6.10.2 (6/5/2020)
|
||||
### Bug fix
|
||||
* Fix false negative from the VerifyChecksum() API when there is a checksum mismatch in an index partition block in a BlockBasedTable format table file (index_type is kTwoLevelIndexSearch).
|
||||
|
||||
## 6.10.1 (5/27/2020)
|
||||
### Bug fix
|
||||
* Remove "u'<string>'" in TARGETS file.
|
||||
* Fix db_stress_lib target in buck.
|
||||
|
||||
## 6.10 (5/2/2020)
|
||||
### Behavior Changes
|
||||
* Disable delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode and num_levels = 1 in order to avoid a corruption bug.
|
||||
|
||||
### Bug Fixes
|
||||
* Fix wrong result being read from ingested file. May happen when a key in the file happen to be prefix of another key also in the file. The issue can further cause more data corruption. The issue exists with rocksdb >= 5.0.0 since DB::IngestExternalFile() was introduced.
|
||||
* Finish implementation of BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey. It's now ready for use. Significantly reduces read amplification in some setups, especially for iterator seeks.
|
||||
@ -9,11 +34,22 @@
|
||||
* Fix a bug caused by not including user timestamp in MultiGet LookupKey construction. This can lead to wrong query result since the trailing bytes of a user key, if not shorter than timestamp, will be mistaken for user timestamp.
|
||||
* Fix a bug caused by using wrong compare function when sorting the input keys of MultiGet with timestamps.
|
||||
* Upgraded version of bzip library (1.0.6 -> 1.0.8) used with RocksJava to address potential vulnerabilities if an attacker can manipulate compressed data saved and loaded by RocksDB (not normal). See issue #6703.
|
||||
* Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true.
|
||||
* Fix possible false NotFound status from batched MultiGet using index type kHashSearch.
|
||||
* Fix corruption caused by enabling delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode, along with parallel compactions. The bug can result in two parallel compactions picking the same input files, resulting in the DB resurrecting older and deleted versions of some keys.
|
||||
* Fix a use-after-free bug in best-efforts recovery. column_family_memtables_ needs to point to valid ColumnFamilySet.
|
||||
* Let best-efforts recovery ignore corrupted files during table loading.
|
||||
* Fix a bug when making options.bottommost_compression, options.compression_opts and options.bottommost_compression_opts dynamically changeable: the modified values are not written to option files or returned back to users when being queried.
|
||||
* Fix a bug where index key comparisons were unaccounted in `PerfContext::user_key_comparison_count` for lookups in files written with `format_version >= 3`.
|
||||
* Fix many bloom.filter statistics not being updated in batch MultiGet.
|
||||
|
||||
|
||||
### Public API Change
|
||||
* Add a ConfigOptions argument to the APIs dealing with converting options to and from strings and files. The ConfigOptions is meant to replace some of the options (such as input_strings_escaped and ignore_unknown_options) and allow for more parameters to be passed in the future without changing the function signature.
|
||||
* Add NewFileChecksumGenCrc32cFactory to the file checksum public API, such that the builtin Crc32c based file checksum generator factory can be used by applications.
|
||||
* Add IsDirectory to Env and FS to indicate if a path is a directory.
|
||||
* ldb now uses options.force_consistency_checks = true by default and "--disable_consistency_checks" is added to disable it.
|
||||
* Add ReadOptions::deadline to allow users to specify a deadline for MultiGet requests
|
||||
|
||||
### New Features
|
||||
* Added support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism. This feature is experimental for now.
|
||||
@ -22,11 +58,6 @@
|
||||
* Added functionality in sst_dump tool to check the compressed file size for different compression levels and print the time spent on compressing files with each compression type. Added arguments `--compression_level_from` and `--compression_level_to` to report size of all compression levels and one compression_type must be specified with it so that it will report compressed sizes of one compression type with different levels.
|
||||
* Added statistics for redundant insertions into block cache: rocksdb.block.cache.*add.redundant. (There is currently no coordination to ensure that only one thread loads a table block when many threads are trying to access that same table block.)
|
||||
|
||||
### Bug Fixes
|
||||
* Fix a bug when making options.bottommost_compression, options.compression_opts and options.bottommost_compression_opts dynamically changeable: the modified values are not written to option files or returned back to users when being queried.
|
||||
* Fix a bug where index key comparisons were unaccounted in `PerfContext::user_key_comparison_count` for lookups in files written with `format_version >= 3`.
|
||||
* Fix many bloom.filter statistics not being updated in batch MultiGet.
|
||||
|
||||
### Performance Improvements
|
||||
* Improve performance of batch MultiGet with partitioned filters, by sharing block cache lookups to applicable filter blocks.
|
||||
* Reduced memory copies when fetching and uncompressing compressed blocks from sst files.
|
||||
|
11
Makefile
11
Makefile
@ -626,10 +626,6 @@ TESTS = \
|
||||
timer_test \
|
||||
db_with_timestamp_compaction_test \
|
||||
|
||||
ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1)
|
||||
TESTS += folly_synchronization_distributed_mutex_test
|
||||
endif
|
||||
|
||||
PARALLEL_TEST = \
|
||||
backupable_db_test \
|
||||
db_bloom_filter_test \
|
||||
@ -653,6 +649,11 @@ PARALLEL_TEST = \
|
||||
write_prepared_transaction_test \
|
||||
write_unprepared_transaction_test \
|
||||
|
||||
ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1)
|
||||
TESTS += folly_synchronization_distributed_mutex_test
|
||||
PARALLEL_TEST += folly_synchronization_distributed_mutex_test
|
||||
endif
|
||||
|
||||
# options_settable_test doesn't pass with UBSAN as we use hack in the test
|
||||
ifdef COMPILE_WITH_UBSAN
|
||||
TESTS := $(shell echo $(TESTS) | sed 's/\boptions_settable_test\b//g')
|
||||
@ -972,9 +973,11 @@ check: all
|
||||
ifneq ($(PLATFORM), OS_AIX)
|
||||
$(PYTHON) tools/check_all_python.py
|
||||
ifeq ($(filter -DROCKSDB_LITE,$(OPT)),)
|
||||
ifndef ASSERT_STATUS_CHECKED # not yet working with these tests
|
||||
$(PYTHON) tools/ldb_test.py
|
||||
sh tools/rocksdb_dump_test.sh
|
||||
endif
|
||||
endif
|
||||
endif
|
||||
$(MAKE) check-format
|
||||
$(MAKE) check-buck-targets
|
||||
|
10
TARGETS
10
TARGETS
@ -109,6 +109,11 @@ ROCKSDB_OS_DEPS += ([(
|
||||
["third-party//jemalloc:headers"],
|
||||
)] if sanitizer == "" else [])
|
||||
|
||||
ROCKSDB_LIB_DEPS = [
|
||||
":rocksdb_lib",
|
||||
":rocksdb_test_lib",
|
||||
] if not is_opt_mode else [":rocksdb_lib"]
|
||||
|
||||
cpp_library(
|
||||
name = "rocksdb_lib",
|
||||
srcs = [
|
||||
@ -437,10 +442,7 @@ cpp_library(
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [
|
||||
":rocksdb_lib",
|
||||
":rocksdb_test_lib",
|
||||
],
|
||||
deps = ROCKSDB_LIB_DEPS,
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
)
|
||||
|
||||
|
@ -168,14 +168,13 @@ def generate_targets(repo_path, deps_map):
|
||||
["test_util/testutil.cc"],
|
||||
[":rocksdb_lib"])
|
||||
# rocksdb_stress_lib
|
||||
TARGETS.add_library(
|
||||
TARGETS.add_rocksdb_library(
|
||||
"rocksdb_stress_lib",
|
||||
src_mk.get("ANALYZER_LIB_SOURCES", [])
|
||||
+ src_mk.get('STRESS_LIB_SOURCES', [])
|
||||
+ ["test_util/testutil.cc"],
|
||||
[":rocksdb_lib", ":rocksdb_test_lib"])
|
||||
+ ["test_util/testutil.cc"])
|
||||
|
||||
print("Extra dependencies:\n{0}".format(str(deps_map)))
|
||||
print("Extra dependencies:\n{0}".format(json.dumps(deps_map)))
|
||||
# test for every test we found in the Makefile
|
||||
for target_alias, deps in deps_map.items():
|
||||
for test in sorted(tests):
|
||||
@ -196,8 +195,8 @@ def generate_targets(repo_path, deps_map):
|
||||
test_target_name,
|
||||
match_src[0],
|
||||
is_parallel,
|
||||
deps['extra_deps'],
|
||||
deps['extra_compiler_flags'])
|
||||
json.dumps(deps['extra_deps']),
|
||||
json.dumps(deps['extra_compiler_flags']))
|
||||
|
||||
if test in _EXPORTED_TEST_LIBS:
|
||||
test_library = "%s_lib" % test_target_name
|
||||
|
@ -50,6 +50,18 @@ class TARGETSBuilder(object):
|
||||
deps=pretty_list(deps)))
|
||||
self.total_lib = self.total_lib + 1
|
||||
|
||||
def add_rocksdb_library(self, name, srcs, headers=None):
|
||||
headers_attr_prefix = ""
|
||||
if headers is None:
|
||||
headers_attr_prefix = "auto_"
|
||||
headers = "AutoHeaders.RECURSIVE_GLOB"
|
||||
self.targets_file.write(targets_cfg.rocksdb_library_template.format(
|
||||
name=name,
|
||||
srcs=pretty_list(srcs),
|
||||
headers_attr_prefix=headers_attr_prefix,
|
||||
headers=headers))
|
||||
self.total_lib = self.total_lib + 1
|
||||
|
||||
def add_binary(self, name, srcs, deps=None):
|
||||
self.targets_file.write(targets_cfg.binary_template % (
|
||||
name,
|
||||
|
@ -114,6 +114,11 @@ ROCKSDB_OS_DEPS += ([(
|
||||
"linux",
|
||||
["third-party//jemalloc:headers"],
|
||||
)] if sanitizer == "" else [])
|
||||
|
||||
ROCKSDB_LIB_DEPS = [
|
||||
":rocksdb_lib",
|
||||
":rocksdb_test_lib",
|
||||
] if not is_opt_mode else [":rocksdb_lib"]
|
||||
"""
|
||||
|
||||
|
||||
@ -132,6 +137,21 @@ cpp_library(
|
||||
)
|
||||
"""
|
||||
|
||||
rocksdb_library_template = """
|
||||
cpp_library(
|
||||
name = "{name}",
|
||||
srcs = [{srcs}],
|
||||
{headers_attr_prefix}headers = {headers},
|
||||
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = ROCKSDB_LIB_DEPS,
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
)
|
||||
"""
|
||||
|
||||
binary_template = """
|
||||
cpp_binary(
|
||||
name = "%s",
|
||||
|
@ -56,8 +56,9 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
|
||||
// correct behavior. Will be corrected automatically when we take a snapshot
|
||||
// here for the case of WritePreparedTxnDB.
|
||||
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
|
||||
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
|
||||
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
|
||||
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
|
||||
if (sv_number_ != cur_sv_number) {
|
||||
Env* env = db_iter_->env();
|
||||
db_iter_->~DBIter();
|
||||
@ -65,6 +66,7 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
new (&arena_) Arena();
|
||||
|
||||
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
|
||||
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
|
||||
if (read_callback_) {
|
||||
read_callback_->Refresh(latest_seq);
|
||||
}
|
||||
@ -78,7 +80,7 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
latest_seq, /* allow_unprepared_value */ true);
|
||||
SetIterUnderDBIter(internal_iter);
|
||||
} else {
|
||||
db_iter_->set_sequence(latest_seq);
|
||||
db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber());
|
||||
db_iter_->set_valid(false);
|
||||
}
|
||||
return Status::OK();
|
||||
|
@ -1085,6 +1085,8 @@ void CompactionPicker::PickFilesMarkedForCompaction(
|
||||
Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
|
||||
size_t random_file_index = static_cast<size_t>(rnd.Uniform(
|
||||
static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
|
||||
TEST_SYNC_POINT_CALLBACK("CompactionPicker::PickFilesMarkedForCompaction",
|
||||
&random_file_index);
|
||||
|
||||
if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
|
||||
// found the compaction!
|
||||
|
@ -78,8 +78,17 @@ class CompactionPickerTest : public testing::Test {
|
||||
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
||||
}
|
||||
|
||||
// Create a new VersionStorageInfo object so we can add mode files and then
|
||||
// merge it with the existing VersionStorageInfo
|
||||
void AddVersionStorage() {
|
||||
temp_vstorage_.reset(new VersionStorageInfo(
|
||||
&icmp_, ucmp_, options_.num_levels, ioptions_.compaction_style,
|
||||
vstorage_.get(), false));
|
||||
}
|
||||
|
||||
void DeleteVersionStorage() {
|
||||
vstorage_.reset();
|
||||
temp_vstorage_.reset();
|
||||
files_.clear();
|
||||
file_map_.clear();
|
||||
input_files_.clear();
|
||||
@ -88,18 +97,24 @@ class CompactionPickerTest : public testing::Test {
|
||||
void Add(int level, uint32_t file_number, const char* smallest,
|
||||
const char* largest, uint64_t file_size = 1, uint32_t path_id = 0,
|
||||
SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
|
||||
size_t compensated_file_size = 0) {
|
||||
assert(level < vstorage_->num_levels());
|
||||
size_t compensated_file_size = 0, bool marked_for_compact = false) {
|
||||
VersionStorageInfo* vstorage;
|
||||
if (temp_vstorage_) {
|
||||
vstorage = temp_vstorage_.get();
|
||||
} else {
|
||||
vstorage = vstorage_.get();
|
||||
}
|
||||
assert(level < vstorage->num_levels());
|
||||
FileMetaData* f = new FileMetaData(
|
||||
file_number, path_id, file_size,
|
||||
InternalKey(smallest, smallest_seq, kTypeValue),
|
||||
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
|
||||
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
|
||||
largest_seq, marked_for_compact, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
|
||||
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
|
||||
f->compensated_file_size =
|
||||
(compensated_file_size != 0) ? compensated_file_size : file_size;
|
||||
vstorage_->AddFile(level, f);
|
||||
vstorage->AddFile(level, f);
|
||||
files_.emplace_back(f);
|
||||
file_map_.insert({file_number, {f, level}});
|
||||
}
|
||||
@ -122,6 +137,12 @@ class CompactionPickerTest : public testing::Test {
|
||||
}
|
||||
|
||||
void UpdateVersionStorageInfo() {
|
||||
if (temp_vstorage_) {
|
||||
VersionBuilder builder(FileOptions(), &ioptions_, nullptr,
|
||||
vstorage_.get(), nullptr);
|
||||
builder.SaveTo(temp_vstorage_.get());
|
||||
vstorage_ = std::move(temp_vstorage_);
|
||||
}
|
||||
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
||||
vstorage_->UpdateFilesByCompactionPri(ioptions_.compaction_pri);
|
||||
vstorage_->UpdateNumNonEmptyLevels();
|
||||
@ -132,6 +153,28 @@ class CompactionPickerTest : public testing::Test {
|
||||
vstorage_->ComputeFilesMarkedForCompaction();
|
||||
vstorage_->SetFinalized();
|
||||
}
|
||||
void AddFileToVersionStorage(int level, uint32_t file_number,
|
||||
const char* smallest, const char* largest,
|
||||
uint64_t file_size = 1, uint32_t path_id = 0,
|
||||
SequenceNumber smallest_seq = 100,
|
||||
SequenceNumber largest_seq = 100,
|
||||
size_t compensated_file_size = 0,
|
||||
bool marked_for_compact = false) {
|
||||
VersionStorageInfo* base_vstorage = vstorage_.release();
|
||||
vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleUniversal,
|
||||
base_vstorage, false));
|
||||
Add(level, file_number, smallest, largest, file_size, path_id, smallest_seq,
|
||||
largest_seq, compensated_file_size, marked_for_compact);
|
||||
|
||||
VersionBuilder builder(FileOptions(), &ioptions_, nullptr, base_vstorage,
|
||||
nullptr);
|
||||
builder.SaveTo(vstorage_.get());
|
||||
UpdateVersionStorageInfo();
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<VersionStorageInfo> temp_vstorage_;
|
||||
};
|
||||
|
||||
TEST_F(CompactionPickerTest, Empty) {
|
||||
@ -1733,6 +1776,163 @@ TEST_F(CompactionPickerTest, IntraL0ForEarliestSeqno) {
|
||||
ASSERT_EQ(0, compaction->output_level());
|
||||
}
|
||||
|
||||
TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) {
|
||||
const uint64_t kFileSize = 100000;
|
||||
|
||||
ioptions_.compaction_style = kCompactionStyleUniversal;
|
||||
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
|
||||
|
||||
// This test covers the case where a "regular" universal compaction is
|
||||
// scheduled first, followed by a delete triggered compaction. The latter
|
||||
// should fail
|
||||
NewVersionStorage(5, kCompactionStyleUniversal);
|
||||
|
||||
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
|
||||
Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
|
||||
Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300);
|
||||
Add(3, 5U, "010", "080", 8 * kFileSize, 0, 200, 251);
|
||||
Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
|
||||
Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
|
||||
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
|
||||
ASSERT_TRUE(compaction);
|
||||
// Validate that its a compaction to reduce sorted runs
|
||||
ASSERT_EQ(CompactionReason::kUniversalSortedRunNum,
|
||||
compaction->compaction_reason());
|
||||
ASSERT_EQ(0, compaction->output_level());
|
||||
ASSERT_EQ(0, compaction->start_level());
|
||||
ASSERT_EQ(2U, compaction->num_input_files(0));
|
||||
|
||||
AddVersionStorage();
|
||||
// Simulate a flush and mark the file for compaction
|
||||
Add(0, 1U, "150", "200", kFileSize, 0, 551, 600, 0, true);
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction2(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
ASSERT_FALSE(compaction2);
|
||||
}
|
||||
|
||||
TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) {
|
||||
const uint64_t kFileSize = 100000;
|
||||
|
||||
ioptions_.compaction_style = kCompactionStyleUniversal;
|
||||
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
|
||||
|
||||
// This test covers the case where a delete triggered compaction is
|
||||
// scheduled first, followed by a "regular" compaction. The latter
|
||||
// should fail
|
||||
NewVersionStorage(5, kCompactionStyleUniversal);
|
||||
|
||||
// Mark file number 4 for compaction
|
||||
Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
|
||||
Add(3, 5U, "240", "290", 8 * kFileSize, 0, 201, 250);
|
||||
Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
|
||||
Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
|
||||
ASSERT_TRUE(compaction);
|
||||
// Validate that its a delete triggered compaction
|
||||
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
|
||||
compaction->compaction_reason());
|
||||
ASSERT_EQ(3, compaction->output_level());
|
||||
ASSERT_EQ(0, compaction->start_level());
|
||||
ASSERT_EQ(1U, compaction->num_input_files(0));
|
||||
ASSERT_EQ(1U, compaction->num_input_files(1));
|
||||
|
||||
AddVersionStorage();
|
||||
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
|
||||
Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction2(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
ASSERT_FALSE(compaction2);
|
||||
}
|
||||
|
||||
TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) {
|
||||
// The case where universal periodic compaction can be picked
|
||||
// with some newer files being compacted.
|
||||
const uint64_t kFileSize = 100000;
|
||||
|
||||
ioptions_.compaction_style = kCompactionStyleUniversal;
|
||||
|
||||
bool input_level_overlap = false;
|
||||
bool output_level_overlap = false;
|
||||
// Let's mark 2 files in 2 different levels for compaction. The
|
||||
// compaction picker will randomly pick one, so use the sync point to
|
||||
// ensure a deterministic order. Loop until both cases are covered
|
||||
size_t random_index = 0;
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionPicker::PickFilesMarkedForCompaction", [&](void* arg) {
|
||||
size_t* index = static_cast<size_t*>(arg);
|
||||
*index = random_index;
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
while (!input_level_overlap || !output_level_overlap) {
|
||||
// Ensure that the L0 file gets picked first
|
||||
random_index = !input_level_overlap ? 0 : 1;
|
||||
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
|
||||
NewVersionStorage(5, kCompactionStyleUniversal);
|
||||
|
||||
Add(0, 1U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
|
||||
Add(3, 2U, "010", "020", 2 * kFileSize, 0, 201, 248);
|
||||
Add(3, 3U, "250", "270", 2 * kFileSize, 0, 202, 249);
|
||||
Add(3, 4U, "290", "310", 2 * kFileSize, 0, 203, 250);
|
||||
Add(3, 5U, "310", "320", 2 * kFileSize, 0, 204, 251, 0, true);
|
||||
Add(4, 6U, "301", "350", 8 * kFileSize, 0, 101, 150);
|
||||
Add(4, 7U, "501", "750", 8 * kFileSize, 0, 101, 150);
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
std::unique_ptr<Compaction> compaction(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
|
||||
ASSERT_TRUE(compaction);
|
||||
// Validate that its a delete triggered compaction
|
||||
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
|
||||
compaction->compaction_reason());
|
||||
ASSERT_TRUE(compaction->start_level() == 0 ||
|
||||
compaction->start_level() == 3);
|
||||
if (compaction->start_level() == 0) {
|
||||
// The L0 file was picked. The next compaction will detect an
|
||||
// overlap on its input level
|
||||
input_level_overlap = true;
|
||||
ASSERT_EQ(3, compaction->output_level());
|
||||
ASSERT_EQ(1U, compaction->num_input_files(0));
|
||||
ASSERT_EQ(3U, compaction->num_input_files(1));
|
||||
} else {
|
||||
// The level 3 file was picked. The next compaction will pick
|
||||
// the L0 file and will detect overlap when adding output
|
||||
// level inputs
|
||||
output_level_overlap = true;
|
||||
ASSERT_EQ(4, compaction->output_level());
|
||||
ASSERT_EQ(2U, compaction->num_input_files(0));
|
||||
ASSERT_EQ(1U, compaction->num_input_files(1));
|
||||
}
|
||||
|
||||
vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
|
||||
// After recomputing the compaction score, only one marked file will remain
|
||||
random_index = 0;
|
||||
std::unique_ptr<Compaction> compaction2(
|
||||
universal_compaction_picker.PickCompaction(
|
||||
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
||||
ASSERT_FALSE(compaction2);
|
||||
DeleteVersionStorage();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -120,8 +120,7 @@ class UniversalCompactionBuilder {
|
||||
LogBuffer* log_buffer_;
|
||||
|
||||
static std::vector<SortedRun> CalculateSortedRuns(
|
||||
const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions,
|
||||
const MutableCFOptions& mutable_cf_options);
|
||||
const VersionStorageInfo& vstorage);
|
||||
|
||||
// Pick a path ID to place a newly generated file, with its estimated file
|
||||
// size.
|
||||
@ -325,8 +324,7 @@ void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
|
||||
|
||||
std::vector<UniversalCompactionBuilder::SortedRun>
|
||||
UniversalCompactionBuilder::CalculateSortedRuns(
|
||||
const VersionStorageInfo& vstorage, const ImmutableCFOptions& /*ioptions*/,
|
||||
const MutableCFOptions& mutable_cf_options) {
|
||||
const VersionStorageInfo& vstorage) {
|
||||
std::vector<UniversalCompactionBuilder::SortedRun> ret;
|
||||
for (FileMetaData* f : vstorage.LevelFiles(0)) {
|
||||
ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
|
||||
@ -336,27 +334,16 @@ UniversalCompactionBuilder::CalculateSortedRuns(
|
||||
uint64_t total_compensated_size = 0U;
|
||||
uint64_t total_size = 0U;
|
||||
bool being_compacted = false;
|
||||
bool is_first = true;
|
||||
for (FileMetaData* f : vstorage.LevelFiles(level)) {
|
||||
total_compensated_size += f->compensated_file_size;
|
||||
total_size += f->fd.GetFileSize();
|
||||
if (mutable_cf_options.compaction_options_universal.allow_trivial_move ==
|
||||
true) {
|
||||
if (f->being_compacted) {
|
||||
being_compacted = f->being_compacted;
|
||||
}
|
||||
} else {
|
||||
// Compaction always includes all files for a non-zero level, so for a
|
||||
// non-zero level, all the files should share the same being_compacted
|
||||
// value.
|
||||
// This assumption is only valid when
|
||||
// mutable_cf_options.compaction_options_universal.allow_trivial_move
|
||||
// is false
|
||||
assert(is_first || f->being_compacted == being_compacted);
|
||||
}
|
||||
if (is_first) {
|
||||
// Size amp, read amp and periodic compactions always include all files
|
||||
// for a non-zero level. However, a delete triggered compaction and
|
||||
// a trivial move might pick a subset of files in a sorted run. So
|
||||
// always check all files in a sorted run and mark the entire run as
|
||||
// being compacted if one or more files are being compacted
|
||||
if (f->being_compacted) {
|
||||
being_compacted = f->being_compacted;
|
||||
is_first = false;
|
||||
}
|
||||
}
|
||||
if (total_compensated_size > 0) {
|
||||
@ -372,8 +359,7 @@ UniversalCompactionBuilder::CalculateSortedRuns(
|
||||
Compaction* UniversalCompactionBuilder::PickCompaction() {
|
||||
const int kLevel0 = 0;
|
||||
score_ = vstorage_->CompactionScore(kLevel0);
|
||||
sorted_runs_ =
|
||||
CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_);
|
||||
sorted_runs_ = CalculateSortedRuns(*vstorage_);
|
||||
|
||||
if (sorted_runs_.size() == 0 ||
|
||||
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
|
||||
@ -855,6 +841,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
|
||||
std::vector<CompactionInputFiles> inputs;
|
||||
|
||||
if (vstorage_->num_levels() == 1) {
|
||||
#if defined(ENABLE_SINGLE_LEVEL_DTC)
|
||||
// This is single level universal. Since we're basically trying to reclaim
|
||||
// space by processing files marked for compaction due to high tombstone
|
||||
// density, let's do the same thing as compaction to reduce size amp which
|
||||
@ -877,6 +864,11 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
|
||||
return nullptr;
|
||||
}
|
||||
inputs.push_back(start_level_inputs);
|
||||
#else
|
||||
// Disable due to a known race condition.
|
||||
// TODO: Reenable once the race condition is fixed
|
||||
return nullptr;
|
||||
#endif // ENABLE_SINGLE_LEVEL_DTC
|
||||
} else {
|
||||
int start_level;
|
||||
|
||||
|
@ -157,42 +157,6 @@ class CorruptionTest : public testing::Test {
|
||||
ASSERT_GE(max_expected, correct);
|
||||
}
|
||||
|
||||
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt) {
|
||||
struct stat sbuf;
|
||||
if (stat(fname.c_str(), &sbuf) != 0) {
|
||||
const char* msg = strerror(errno);
|
||||
FAIL() << fname << ": " << msg;
|
||||
}
|
||||
|
||||
if (offset < 0) {
|
||||
// Relative to end of file; make it absolute
|
||||
if (-offset > sbuf.st_size) {
|
||||
offset = 0;
|
||||
} else {
|
||||
offset = static_cast<int>(sbuf.st_size + offset);
|
||||
}
|
||||
}
|
||||
if (offset > sbuf.st_size) {
|
||||
offset = static_cast<int>(sbuf.st_size);
|
||||
}
|
||||
if (offset + bytes_to_corrupt > sbuf.st_size) {
|
||||
bytes_to_corrupt = static_cast<int>(sbuf.st_size - offset);
|
||||
}
|
||||
|
||||
// Do it
|
||||
std::string contents;
|
||||
Status s = ReadFileToString(Env::Default(), fname, &contents);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
for (int i = 0; i < bytes_to_corrupt; i++) {
|
||||
contents[i + offset] ^= 0x80;
|
||||
}
|
||||
s = WriteStringToFile(Env::Default(), contents, fname);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
Options options;
|
||||
EnvOptions env_options;
|
||||
ASSERT_NOK(VerifySstFileChecksum(options, env_options, fname));
|
||||
}
|
||||
|
||||
void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
|
||||
// Pick file to corrupt
|
||||
std::vector<std::string> filenames;
|
||||
@ -211,7 +175,7 @@ class CorruptionTest : public testing::Test {
|
||||
}
|
||||
ASSERT_TRUE(!fname.empty()) << filetype;
|
||||
|
||||
CorruptFile(fname, offset, bytes_to_corrupt);
|
||||
test::CorruptFile(fname, offset, bytes_to_corrupt);
|
||||
}
|
||||
|
||||
// corrupts exactly one file at level `level`. if no file found at level,
|
||||
@ -221,7 +185,7 @@ class CorruptionTest : public testing::Test {
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
for (const auto& m : metadata) {
|
||||
if (m.level == level) {
|
||||
CorruptFile(dbname_ + "/" + m.name, offset, bytes_to_corrupt);
|
||||
test::CorruptFile(dbname_ + "/" + m.name, offset, bytes_to_corrupt);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -556,7 +520,7 @@ TEST_F(CorruptionTest, RangeDeletionCorrupted) {
|
||||
ImmutableCFOptions(options_), kRangeDelBlock, &range_del_handle));
|
||||
|
||||
ASSERT_OK(TryReopen());
|
||||
CorruptFile(filename, static_cast<int>(range_del_handle.offset()), 1);
|
||||
test::CorruptFile(filename, static_cast<int>(range_del_handle.offset()), 1);
|
||||
ASSERT_TRUE(TryReopen().IsCorruption());
|
||||
}
|
||||
|
||||
|
@ -1295,14 +1295,18 @@ TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
|
||||
TEST_F(DBBasicTest, MultiGetBatchedSortedMultiFile) {
|
||||
do {
|
||||
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
||||
SetPerfLevel(kEnableCount);
|
||||
// To expand the power of this test, generate > 1 table file and
|
||||
// mix with memtable
|
||||
ASSERT_OK(Put(1, "k1", "v1"));
|
||||
ASSERT_OK(Put(1, "k2", "v2"));
|
||||
Flush(1);
|
||||
ASSERT_OK(Put(1, "k3", "v3"));
|
||||
ASSERT_OK(Put(1, "k4", "v4"));
|
||||
Flush(1);
|
||||
ASSERT_OK(Delete(1, "k4"));
|
||||
ASSERT_OK(Put(1, "k5", "v5"));
|
||||
ASSERT_OK(Delete(1, "no_key"));
|
||||
@ -1333,7 +1337,7 @@ TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
|
||||
ASSERT_TRUE(s[5].IsNotFound());
|
||||
|
||||
SetPerfLevel(kDisable);
|
||||
} while (ChangeCompactOptions());
|
||||
} while (ChangeOptions());
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
|
||||
@ -1774,6 +1778,28 @@ TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
|
||||
ASSERT_NE(nullptr, arg);
|
||||
*(reinterpret_cast<Status*>(arg)) =
|
||||
Status::Corruption("Inject corruption");
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
options.best_efforts_recovery = true;
|
||||
Status s = TryReopen(options);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
namespace {
|
||||
class TableFileListener : public EventListener {
|
||||
@ -1814,11 +1840,16 @@ TEST_F(DBBasicTest, RecoverWithMissingFiles) {
|
||||
ASSERT_OK(Flush(static_cast<int>(cf)));
|
||||
}
|
||||
|
||||
// Delete files
|
||||
// Delete and corrupt files
|
||||
for (size_t i = 0; i < all_cf_names.size(); ++i) {
|
||||
std::vector<std::string>& files = listener->GetFiles(all_cf_names[i]);
|
||||
ASSERT_EQ(3, files.size());
|
||||
for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
|
||||
std::string corrupted_data;
|
||||
ASSERT_OK(ReadFileToString(env_, files[files.size() - 1], &corrupted_data));
|
||||
ASSERT_OK(WriteStringToFile(
|
||||
env_, corrupted_data.substr(0, corrupted_data.size() - 2),
|
||||
files[files.size() - 1], /*should_sync=*/true));
|
||||
for (int j = static_cast<int>(files.size() - 2); j >= static_cast<int>(i);
|
||||
--j) {
|
||||
ASSERT_OK(env_->DeleteFile(files[j]));
|
||||
}
|
||||
@ -1850,6 +1881,32 @@ TEST_F(DBBasicTest, RecoverWithMissingFiles) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, BestEffortsRecoveryTryMultipleManifests) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "value0"));
|
||||
ASSERT_OK(Flush());
|
||||
Close();
|
||||
{
|
||||
// Hack by adding a new MANIFEST with high file number
|
||||
std::string garbage(10, '\0');
|
||||
ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/MANIFEST-001000",
|
||||
/*should_sync=*/true));
|
||||
}
|
||||
{
|
||||
// Hack by adding a corrupted SST not referenced by any MANIFEST
|
||||
std::string garbage(10, '\0');
|
||||
ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/001001.sst",
|
||||
/*should_sync=*/true));
|
||||
}
|
||||
|
||||
options.best_efforts_recovery = true;
|
||||
|
||||
Reopen(options);
|
||||
ASSERT_OK(Put("bar", "value"));
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
@ -1873,6 +1930,35 @@ TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, RecoverWithNoManifest) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
Close();
|
||||
{
|
||||
// Delete all MANIFEST.
|
||||
std::vector<std::string> files;
|
||||
ASSERT_OK(env_->GetChildren(dbname_, &files));
|
||||
for (const auto& file : files) {
|
||||
uint64_t number = 0;
|
||||
FileType type = kLogFile;
|
||||
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
|
||||
ASSERT_OK(env_->DeleteFile(dbname_ + "/" + file));
|
||||
}
|
||||
}
|
||||
}
|
||||
options.best_efforts_recovery = true;
|
||||
options.create_if_missing = false;
|
||||
Status s = TryReopen(options);
|
||||
ASSERT_TRUE(s.IsInvalidArgument());
|
||||
options.create_if_missing = true;
|
||||
Reopen(options);
|
||||
// Since no MANIFEST exists, best-efforts recovery creates a new, empty db.
|
||||
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
@ -1913,6 +1999,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBBasicTest, ManifestChecksumMismatch) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("bar", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
|
||||
auto* crc = reinterpret_cast<uint32_t*>(arg);
|
||||
*crc = *crc + 1;
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions write_opts;
|
||||
write_opts.disableWAL = true;
|
||||
Status s = db_->Put(write_opts, "foo", "value");
|
||||
ASSERT_OK(s);
|
||||
ASSERT_OK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
ASSERT_OK(Put("foo", "value1"));
|
||||
ASSERT_OK(Flush());
|
||||
s = TryReopen(options);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
}
|
||||
|
||||
class DBBasicTestMultiGet : public DBTestBase {
|
||||
public:
|
||||
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
|
||||
@ -2511,9 +2624,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
|
||||
std::shared_ptr<DBBasicTestMultiGetDeadline::DeadlineFS> fs(
|
||||
new DBBasicTestMultiGetDeadline::DeadlineFS(env_));
|
||||
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
|
||||
env_->no_slowdown_ = true;
|
||||
env_->time_elapse_only_sleep_.store(true);
|
||||
Options options = CurrentOptions();
|
||||
env_->SetTimeElapseOnlySleep(&options);
|
||||
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
|
||||
BlockBasedTableOptions table_options;
|
||||
|
@ -1063,12 +1063,12 @@ TEST_P(DBBloomFilterTestVaryPrefixAndFormatVer, PartitionedMultiGet) {
|
||||
bbto.partition_filters = true;
|
||||
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
||||
bbto.whole_key_filtering = !use_prefix_;
|
||||
bbto.metadata_block_size = 128;
|
||||
bbto.metadata_block_size = 290;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
DestroyAndReopen(options);
|
||||
ReadOptions ropts;
|
||||
|
||||
constexpr uint32_t N = 10000;
|
||||
constexpr uint32_t N = 12000;
|
||||
// Add N/2 evens
|
||||
for (uint32_t i = 0; i < N; i += 2) {
|
||||
ASSERT_OK(Put(UKey(i), UKey(i)));
|
||||
|
@ -2655,7 +2655,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
||||
" guaranteed to be preserved, try larger iter_start_seqnum opt."));
|
||||
}
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
auto cfd = cfh->cfd();
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
assert(cfd != nullptr);
|
||||
ReadCallback* read_callback = nullptr; // No read callback provided.
|
||||
if (read_options.tailing) {
|
||||
#ifdef ROCKSDB_LITE
|
||||
@ -2676,10 +2677,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
||||
// Note: no need to consider the special case of
|
||||
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
|
||||
// WritePreparedTxnDB
|
||||
auto snapshot = read_options.snapshot != nullptr
|
||||
? read_options.snapshot->GetSequenceNumber()
|
||||
: versions_->LastSequence();
|
||||
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
|
||||
result = NewIteratorImpl(read_options, cfd,
|
||||
(read_options.snapshot != nullptr)
|
||||
? read_options.snapshot->GetSequenceNumber()
|
||||
: kMaxSequenceNumber,
|
||||
read_callback);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@ -2692,6 +2694,24 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
|
||||
bool allow_refresh) {
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:1");
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:2");
|
||||
|
||||
if (snapshot == kMaxSequenceNumber) {
|
||||
// Note that the snapshot is assigned AFTER referencing the super
|
||||
// version because otherwise a flush happening in between may compact away
|
||||
// data for the snapshot, so the reader would see neither data that was be
|
||||
// visible to the snapshot before compaction nor the newer data inserted
|
||||
// afterwards.
|
||||
// Note that the super version might not contain all the data available
|
||||
// to this snapshot, but in that case it can see all the data in the
|
||||
// super version, which is a valid consistent state after the user
|
||||
// calls NewIterator().
|
||||
snapshot = versions_->LastSequence();
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:3");
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:4");
|
||||
}
|
||||
|
||||
// Try to generate a DB iterator tree in continuous memory area to be
|
||||
// cache friendly. Here is an example of result:
|
||||
// +-------------------------------+
|
||||
|
@ -477,6 +477,7 @@ class DBImpl : public DB {
|
||||
Status GetImpl(const ReadOptions& options, const Slice& key,
|
||||
GetImplOptions& get_impl_options);
|
||||
|
||||
// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
|
||||
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
|
||||
ColumnFamilyData* cfd,
|
||||
SequenceNumber snapshot,
|
||||
|
@ -668,13 +668,15 @@ uint64_t PrecomputeMinLogNumberToKeep(
|
||||
Status DBImpl::FinishBestEffortsRecovery() {
|
||||
mutex_.AssertHeld();
|
||||
std::vector<std::string> paths;
|
||||
paths.push_back(dbname_);
|
||||
paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
|
||||
for (const auto& db_path : immutable_db_options_.db_paths) {
|
||||
paths.push_back(db_path.path);
|
||||
paths.push_back(
|
||||
NormalizePath(db_path.path + std::string(1, kFilePathSeparator)));
|
||||
}
|
||||
for (const auto* cfd : *versions_->GetColumnFamilySet()) {
|
||||
for (const auto& cf_path : cfd->ioptions()->cf_paths) {
|
||||
paths.push_back(cf_path.path);
|
||||
paths.push_back(
|
||||
NormalizePath(cf_path.path + std::string(1, kFilePathSeparator)));
|
||||
}
|
||||
}
|
||||
// Dedup paths
|
||||
@ -693,7 +695,8 @@ Status DBImpl::FinishBestEffortsRecovery() {
|
||||
if (!ParseFileName(fname, &number, &type)) {
|
||||
continue;
|
||||
}
|
||||
const std::string normalized_fpath = NormalizePath(path + fname);
|
||||
// path ends with '/' or '\\'
|
||||
const std::string normalized_fpath = path + fname;
|
||||
largest_file_number = std::max(largest_file_number, number);
|
||||
if (type == kTableFile && number >= next_file_number &&
|
||||
files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
|
||||
|
@ -370,7 +370,30 @@ Status DBImpl::Recover(
|
||||
}
|
||||
|
||||
std::string current_fname = CurrentFileName(dbname_);
|
||||
s = env_->FileExists(current_fname);
|
||||
// Path to any MANIFEST file in the db dir. It does not matter which one.
|
||||
// Since best-efforts recovery ignores CURRENT file, existence of a
|
||||
// MANIFEST indicates the recovery to recover existing db. If no MANIFEST
|
||||
// can be found, a new db will be created.
|
||||
std::string manifest_path;
|
||||
if (!immutable_db_options_.best_efforts_recovery) {
|
||||
s = env_->FileExists(current_fname);
|
||||
} else {
|
||||
s = Status::NotFound();
|
||||
std::vector<std::string> files;
|
||||
// No need to check return value
|
||||
env_->GetChildren(dbname_, &files);
|
||||
for (const std::string& file : files) {
|
||||
uint64_t number = 0;
|
||||
FileType type = kLogFile; // initialize
|
||||
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
|
||||
// Found MANIFEST (descriptor log), thus best-efforts recovery does
|
||||
// not have to treat the db as empty.
|
||||
s = Status::OK();
|
||||
manifest_path = dbname_ + "/" + file;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (s.IsNotFound()) {
|
||||
if (immutable_db_options_.create_if_missing) {
|
||||
s = NewDB();
|
||||
@ -398,14 +421,14 @@ Status DBImpl::Recover(
|
||||
FileOptions customized_fs(file_options_);
|
||||
customized_fs.use_direct_reads |=
|
||||
immutable_db_options_.use_direct_io_for_flush_and_compaction;
|
||||
s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
|
||||
nullptr);
|
||||
const std::string& fname =
|
||||
manifest_path.empty() ? current_fname : manifest_path;
|
||||
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
|
||||
if (!s.ok()) {
|
||||
std::string error_str = s.ToString();
|
||||
// Check if unsupported Direct I/O is the root cause
|
||||
customized_fs.use_direct_reads = false;
|
||||
s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
|
||||
nullptr);
|
||||
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
|
||||
if (s.ok()) {
|
||||
return Status::InvalidArgument(
|
||||
"Direct I/O is not supported by the specified DB.");
|
||||
@ -425,6 +448,9 @@ Status DBImpl::Recover(
|
||||
s = versions_->TryRecover(column_families, read_only, &db_id_,
|
||||
&missing_table_file);
|
||||
if (s.ok()) {
|
||||
// TryRecover may delete previous column_family_set_.
|
||||
column_family_memtables_.reset(
|
||||
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
|
||||
s = FinishBestEffortsRecovery();
|
||||
}
|
||||
}
|
||||
|
@ -45,6 +45,8 @@ class DBSecondaryTest : public DBTestBase {
|
||||
|
||||
void OpenSecondary(const Options& options);
|
||||
|
||||
Status TryOpenSecondary(const Options& options);
|
||||
|
||||
void OpenSecondaryWithColumnFamilies(
|
||||
const std::vector<std::string>& column_families, const Options& options);
|
||||
|
||||
@ -70,9 +72,13 @@ class DBSecondaryTest : public DBTestBase {
|
||||
};
|
||||
|
||||
void DBSecondaryTest::OpenSecondary(const Options& options) {
|
||||
ASSERT_OK(TryOpenSecondary(options));
|
||||
}
|
||||
|
||||
Status DBSecondaryTest::TryOpenSecondary(const Options& options) {
|
||||
Status s =
|
||||
DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
|
||||
ASSERT_OK(s);
|
||||
return s;
|
||||
}
|
||||
|
||||
void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
|
||||
@ -858,6 +864,56 @@ TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) {
|
||||
thread.join();
|
||||
ASSERT_TRUE(called);
|
||||
}
|
||||
|
||||
TEST_F(DBSecondaryTest, StartFromInconsistent) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
|
||||
ASSERT_NE(nullptr, arg);
|
||||
*(reinterpret_cast<Status*>(arg)) =
|
||||
Status::Corruption("Inject corruption");
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
Options options1;
|
||||
Status s = TryOpenSecondary(options1);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
}
|
||||
|
||||
TEST_F(DBSecondaryTest, InconsistencyDuringCatchUp) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
Options options1;
|
||||
OpenSecondary(options1);
|
||||
|
||||
{
|
||||
std::string value;
|
||||
ASSERT_OK(db_secondary_->Get(ReadOptions(), "foo", &value));
|
||||
ASSERT_EQ("value", value);
|
||||
}
|
||||
|
||||
ASSERT_OK(Put("bar", "value1"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
|
||||
ASSERT_NE(nullptr, arg);
|
||||
*(reinterpret_cast<Status*>(arg)) =
|
||||
Status::Corruption("Inject corruption");
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
Status s = db_secondary_->TryCatchUpWithPrimary();
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
}
|
||||
#endif //! ROCKSDB_LITE
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -356,37 +356,25 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
|
||||
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
|
||||
// Turn timed wait into a simulated sleep
|
||||
uint64_t* abs_time_us = static_cast<uint64_t*>(arg);
|
||||
int64_t cur_time = 0;
|
||||
env_->GetCurrentTime(&cur_time);
|
||||
if (*abs_time_us > static_cast<uint64_t>(cur_time)) {
|
||||
env_->addon_time_.fetch_add(*abs_time_us -
|
||||
static_cast<uint64_t>(cur_time));
|
||||
uint64_t cur_time = env_->NowMicros();
|
||||
if (*abs_time_us > cur_time) {
|
||||
env_->addon_time_.fetch_add(*abs_time_us - cur_time);
|
||||
}
|
||||
|
||||
// Randomly sleep shortly
|
||||
env_->addon_time_.fetch_add(
|
||||
static_cast<uint64_t>(Random::GetTLSInstance()->Uniform(10)));
|
||||
|
||||
// Set wait until time to before current to force not to sleep.
|
||||
int64_t real_cur_time = 0;
|
||||
Env::Default()->GetCurrentTime(&real_cur_time);
|
||||
*abs_time_us = static_cast<uint64_t>(real_cur_time);
|
||||
// Set wait until time to before (actual) current time to force not
|
||||
// to sleep
|
||||
*abs_time_us = Env::Default()->NowMicros();
|
||||
});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
env_->no_slowdown_ = true;
|
||||
env_->time_elapse_only_sleep_ = true;
|
||||
Options options = CurrentOptions();
|
||||
env_->SetTimeElapseOnlySleep(&options);
|
||||
options.disable_auto_compactions = true;
|
||||
// Need to disable stats dumping and persisting which also use
|
||||
// RepeatableThread, one of whose member variables is of type
|
||||
// InstrumentedCondVar. The callback for
|
||||
// InstrumentedCondVar::TimedWaitInternal can be triggered by stats dumping
|
||||
// and persisting threads and cause time_spent_deleting measurement to become
|
||||
// incorrect.
|
||||
options.stats_dump_period_sec = 0;
|
||||
options.stats_persist_period_sec = 0;
|
||||
options.env = env_;
|
||||
|
||||
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
|
||||
|
106
db/db_test2.cc
106
db/db_test2.cc
@ -2688,6 +2688,94 @@ TEST_F(DBTest2, OptimizeForSmallDB) {
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBTest2, IterRaceFlush1) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
|
||||
{"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ROCKSDB_NAMESPACE::port::Thread t1([&] {
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Flush();
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
|
||||
});
|
||||
|
||||
// iterator is created after the first Put(), so it should see either
|
||||
// "v1" or "v2".
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->Seek("foo");
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ("foo", it->key().ToString());
|
||||
}
|
||||
|
||||
t1.join();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, IterRaceFlush2) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
|
||||
{"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ROCKSDB_NAMESPACE::port::Thread t1([&] {
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Flush();
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
|
||||
});
|
||||
|
||||
// iterator is created after the first Put(), so it should see either
|
||||
// "v1" or "v2".
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->Seek("foo");
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ("foo", it->key().ToString());
|
||||
}
|
||||
|
||||
t1.join();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, IterRefreshRaceFlush) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"ArenaWrappedDBIter::Refresh:1", "DBTest2::IterRefreshRaceFlush:1"},
|
||||
{"DBTest2::IterRefreshRaceFlush:2", "ArenaWrappedDBIter::Refresh:2"}});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ROCKSDB_NAMESPACE::port::Thread t1([&] {
|
||||
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Flush();
|
||||
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
|
||||
});
|
||||
|
||||
// iterator is created after the first Put(), so it should see either
|
||||
// "v1" or "v2".
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->Refresh();
|
||||
it->Seek("foo");
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ("foo", it->key().ToString());
|
||||
}
|
||||
|
||||
t1.join();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, GetRaceFlush1) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
@ -4328,6 +4416,24 @@ TEST_F(DBTest2, SameSmallestInSameLevel) {
|
||||
ASSERT_EQ("2,3,4,5,6,7,8", Get("key"));
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, FileConsistencyCheckInOpen) {
|
||||
Put("foo", "bar");
|
||||
Flush();
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
|
||||
Status* ret_s = static_cast<Status*>(arg);
|
||||
*ret_s = Status::Corruption("fcc");
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.force_consistency_checks = true;
|
||||
ASSERT_NOK(TryReopen(options));
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, BlockBasedTablePrefixIndexSeekForPrev) {
|
||||
// create a DB with block prefix index
|
||||
BlockBasedTableOptions table_options;
|
||||
|
@ -14,10 +14,19 @@
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
namespace {
|
||||
int64_t MaybeCurrentTime(Env* env) {
|
||||
int64_t time = 1337346000; // arbitrary fallback default
|
||||
(void)env->GetCurrentTime(&time);
|
||||
return time;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
// Special Env used to delay background operations
|
||||
|
||||
SpecialEnv::SpecialEnv(Env* base)
|
||||
: EnvWrapper(base),
|
||||
maybe_starting_time_(MaybeCurrentTime(base)),
|
||||
rnd_(301),
|
||||
sleep_counter_(this),
|
||||
addon_time_(0),
|
||||
|
@ -502,10 +502,14 @@ class SpecialEnv : public EnvWrapper {
|
||||
|
||||
virtual Status GetCurrentTime(int64_t* unix_time) override {
|
||||
Status s;
|
||||
if (!time_elapse_only_sleep_) {
|
||||
if (time_elapse_only_sleep_) {
|
||||
*unix_time = maybe_starting_time_;
|
||||
} else {
|
||||
s = target()->GetCurrentTime(unix_time);
|
||||
}
|
||||
if (s.ok()) {
|
||||
// FIXME: addon_time_ sometimes used to mean seconds (here) and
|
||||
// sometimes microseconds
|
||||
*unix_time += addon_time_.load();
|
||||
}
|
||||
return s;
|
||||
@ -531,6 +535,20 @@ class SpecialEnv : public EnvWrapper {
|
||||
return target()->DeleteFile(fname);
|
||||
}
|
||||
|
||||
void SetTimeElapseOnlySleep(Options* options) {
|
||||
time_elapse_only_sleep_ = true;
|
||||
no_slowdown_ = true;
|
||||
// Need to disable stats dumping and persisting which also use
|
||||
// RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
|
||||
// With time_elapse_only_sleep_, this can hang on some platforms.
|
||||
// TODO: why? investigate/fix
|
||||
options->stats_dump_period_sec = 0;
|
||||
options->stats_persist_period_sec = 0;
|
||||
}
|
||||
|
||||
// Something to return when mocking current time
|
||||
const int64_t maybe_starting_time_;
|
||||
|
||||
Random rnd_;
|
||||
port::Mutex rnd_mutex_; // Lock to pretect rnd_
|
||||
|
||||
|
@ -1953,6 +1953,7 @@ TEST_F(DBTestUniversalCompaction2, BasicL0toL1) {
|
||||
ASSERT_GT(NumTableFilesAtLevel(6), 0);
|
||||
}
|
||||
|
||||
#if defined(ENABLE_SINGLE_LEVEL_DTC)
|
||||
TEST_F(DBTestUniversalCompaction2, SingleLevel) {
|
||||
const int kNumKeys = 3000;
|
||||
const int kWindowSize = 100;
|
||||
@ -1991,6 +1992,7 @@ TEST_F(DBTestUniversalCompaction2, SingleLevel) {
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(0));
|
||||
}
|
||||
#endif // ENABLE_SINGLE_LEVEL_DTC
|
||||
|
||||
TEST_F(DBTestUniversalCompaction2, MultipleLevels) {
|
||||
const int kWindowSize = 100;
|
||||
|
@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
|
||||
// Compute the crc of the record type and the payload.
|
||||
crc = crc32c::Extend(crc, ptr, n);
|
||||
crc = crc32c::Mask(crc); // Adjust for storage
|
||||
TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
|
||||
&crc);
|
||||
EncodeFixed32(buf, crc);
|
||||
|
||||
// Write the header and the payload
|
||||
|
@ -316,7 +316,10 @@ class VersionBuilder::Rep {
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
Status ret_s;
|
||||
TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistencyBeforeReturn",
|
||||
&ret_s);
|
||||
return ret_s;
|
||||
}
|
||||
|
||||
Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
|
||||
@ -463,7 +466,10 @@ class VersionBuilder::Rep {
|
||||
const auto number = del_file.second;
|
||||
if (level < num_levels_) {
|
||||
levels_[level].deleted_files.insert(number);
|
||||
CheckConsistencyForDeletes(edit, number, level);
|
||||
s = CheckConsistencyForDeletes(edit, number, level);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
auto exising = levels_[level].added_files.find(number);
|
||||
if (exising != levels_[level].added_files.end()) {
|
||||
|
@ -839,6 +839,36 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbage) {
|
||||
UnrefFilesInVersion(&new_vstorage);
|
||||
}
|
||||
|
||||
TEST_F(VersionBuilderTest, CheckConsistencyForFileDeletedTwice) {
|
||||
Add(0, 1U, "150", "200", 100U);
|
||||
UpdateVersionStorageInfo();
|
||||
|
||||
VersionEdit version_edit;
|
||||
version_edit.DeleteFile(0, 1U);
|
||||
|
||||
EnvOptions env_options;
|
||||
constexpr TableCache* table_cache = nullptr;
|
||||
constexpr VersionSet* version_set = nullptr;
|
||||
|
||||
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
|
||||
&vstorage_, version_set);
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, nullptr,
|
||||
true /* force_consistency_checks */);
|
||||
ASSERT_OK(version_builder.Apply(&version_edit));
|
||||
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
|
||||
|
||||
VersionBuilder version_builder2(env_options, &ioptions_, table_cache,
|
||||
&new_vstorage, version_set);
|
||||
VersionStorageInfo new_vstorage2(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, nullptr,
|
||||
true /* force_consistency_checks */);
|
||||
ASSERT_NOK(version_builder2.Apply(&version_edit));
|
||||
|
||||
UnrefFilesInVersion(&new_vstorage);
|
||||
UnrefFilesInVersion(&new_vstorage2);
|
||||
}
|
||||
|
||||
TEST_F(VersionBuilderTest, EstimatedActiveKeys) {
|
||||
const uint32_t kTotalSamples = 20;
|
||||
const uint32_t kNumLevels = 5;
|
||||
|
@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler(
|
||||
assert(version_set_ != nullptr);
|
||||
}
|
||||
|
||||
Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
|
||||
void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
|
||||
std::string* db_id) {
|
||||
Slice record;
|
||||
std::string scratch;
|
||||
assert(log_read_status);
|
||||
assert(log_read_status->ok());
|
||||
|
||||
size_t recovered_edits = 0;
|
||||
Status s = Initialize();
|
||||
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
|
||||
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
|
||||
log_read_status->ok()) {
|
||||
VersionEdit edit;
|
||||
s = edit.DecodeFrom(record);
|
||||
if (!s.ok()) {
|
||||
@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!log_read_status->ok()) {
|
||||
s = *log_read_status;
|
||||
}
|
||||
|
||||
CheckIterationResult(reader, &s);
|
||||
|
||||
if (!s.ok()) {
|
||||
status_ = s;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status VersionEditHandler::Initialize() {
|
||||
@ -377,6 +384,7 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
|
||||
ColumnFamilyData* cfd,
|
||||
bool force_create_version) {
|
||||
assert(cfd->initialized());
|
||||
Status s;
|
||||
if (force_create_version) {
|
||||
auto builder_iter = builders_.find(cfd->GetID());
|
||||
assert(builder_iter != builders_.end());
|
||||
@ -384,13 +392,18 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
|
||||
auto* v = new Version(cfd, version_set_, version_set_->file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
version_set_->current_version_number_++);
|
||||
builder->SaveTo(v->storage_info());
|
||||
// Install new version
|
||||
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
|
||||
!(version_set_->db_options_->skip_stats_update_on_db_open));
|
||||
version_set_->AppendVersion(cfd, v);
|
||||
s = builder->SaveTo(v->storage_info());
|
||||
if (s.ok()) {
|
||||
// Install new version
|
||||
v->PrepareApply(
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
!(version_set_->db_options_->skip_stats_update_on_db_open));
|
||||
version_set_->AppendVersion(cfd, v);
|
||||
} else {
|
||||
delete v;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
return s;
|
||||
}
|
||||
|
||||
Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
|
||||
@ -398,18 +411,18 @@ Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
|
||||
bool is_initial_load) {
|
||||
assert(cfd != nullptr);
|
||||
assert(!cfd->IsDropped());
|
||||
Status s;
|
||||
auto builder_iter = builders_.find(cfd->GetID());
|
||||
assert(builder_iter != builders_.end());
|
||||
assert(builder_iter->second != nullptr);
|
||||
VersionBuilder* builder = builder_iter->second->version_builder();
|
||||
assert(builder);
|
||||
s = builder->LoadTableHandlers(
|
||||
Status s = builder->LoadTableHandlers(
|
||||
cfd->internal_stats(),
|
||||
version_set_->db_options_->max_file_opening_threads,
|
||||
prefetch_index_and_filter_in_cache, is_initial_load,
|
||||
cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
|
||||
if (s.IsPathNotFound() && no_error_if_table_files_missing_) {
|
||||
if ((s.IsPathNotFound() || s.IsCorruption()) &&
|
||||
no_error_if_table_files_missing_) {
|
||||
s = Status::OK();
|
||||
}
|
||||
if (!s.ok() && !version_set_->db_options_->paranoid_checks) {
|
||||
@ -540,9 +553,11 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
|
||||
const std::string fpath =
|
||||
MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num);
|
||||
s = version_set_->VerifyFileMetadata(fpath, meta);
|
||||
if (s.IsPathNotFound() || s.IsNotFound()) {
|
||||
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
|
||||
missing_files.insert(file_num);
|
||||
s = Status::OK();
|
||||
} else if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
bool missing_info = !version_edit_params_.has_log_number_ ||
|
||||
@ -550,24 +565,29 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
|
||||
!version_edit_params_.has_last_sequence_;
|
||||
|
||||
// Create version before apply edit
|
||||
if (!missing_info && ((!missing_files.empty() && !prev_has_missing_files) ||
|
||||
(missing_files.empty() && force_create_version))) {
|
||||
if (s.ok() && !missing_info &&
|
||||
((!missing_files.empty() && !prev_has_missing_files) ||
|
||||
(missing_files.empty() && force_create_version))) {
|
||||
auto builder_iter = builders_.find(cfd->GetID());
|
||||
assert(builder_iter != builders_.end());
|
||||
auto* builder = builder_iter->second->version_builder();
|
||||
auto* version = new Version(cfd, version_set_, version_set_->file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
version_set_->current_version_number_++);
|
||||
builder->SaveTo(version->storage_info());
|
||||
version->PrepareApply(
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
!version_set_->db_options_->skip_stats_update_on_db_open);
|
||||
auto v_iter = versions_.find(cfd->GetID());
|
||||
if (v_iter != versions_.end()) {
|
||||
delete v_iter->second;
|
||||
v_iter->second = version;
|
||||
s = builder->SaveTo(version->storage_info());
|
||||
if (s.ok()) {
|
||||
version->PrepareApply(
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
!version_set_->db_options_->skip_stats_update_on_db_open);
|
||||
auto v_iter = versions_.find(cfd->GetID());
|
||||
if (v_iter != versions_.end()) {
|
||||
delete v_iter->second;
|
||||
v_iter->second = version;
|
||||
} else {
|
||||
versions_.emplace(cfd->GetID(), version);
|
||||
}
|
||||
} else {
|
||||
versions_.emplace(cfd->GetID(), version);
|
||||
delete version;
|
||||
}
|
||||
}
|
||||
return s;
|
||||
|
@ -40,7 +40,8 @@ class VersionEditHandler {
|
||||
|
||||
virtual ~VersionEditHandler() {}
|
||||
|
||||
Status Iterate(log::Reader& reader, std::string* db_id);
|
||||
void Iterate(log::Reader& reader, Status* log_read_status,
|
||||
std::string* db_id);
|
||||
|
||||
const Status& status() const { return status_; }
|
||||
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include "db/version_set.h"
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <array>
|
||||
#include <cinttypes>
|
||||
@ -19,6 +20,7 @@
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "compaction/compaction.h"
|
||||
#include "db/internal_stats.h"
|
||||
#include "db/log_reader.h"
|
||||
@ -50,6 +52,7 @@
|
||||
#include "table/table_reader.h"
|
||||
#include "table/two_level_iterator.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "util/cast_util.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/string_util.h"
|
||||
@ -2087,6 +2090,9 @@ void VersionStorageInfo::GenerateLevelFilesBrief() {
|
||||
void Version::PrepareApply(
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
bool update_stats) {
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"Version::PrepareApply:forced_check",
|
||||
reinterpret_cast<void*>(&storage_info_.force_consistency_checks_));
|
||||
UpdateAccumulatedStats(update_stats);
|
||||
storage_info_.UpdateNumNonEmptyLevels();
|
||||
storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
|
||||
@ -2388,6 +2394,11 @@ void VersionStorageInfo::ComputeCompactionScore(
|
||||
// compaction score for the whole DB. Adding other levels as if
|
||||
// they are L0 files.
|
||||
for (int i = 1; i < num_levels(); i++) {
|
||||
// Its possible that a subset of the files in a level may be in a
|
||||
// compaction, due to delete triggered compaction or trivial move.
|
||||
// In that case, the below check may not catch a level being
|
||||
// compacted as it only checks the first file. The worst that can
|
||||
// happen is a scheduled compaction thread will find nothing to do.
|
||||
if (!files_[i].empty() && !files_[i][0]->being_compacted) {
|
||||
num_sorted_runs++;
|
||||
}
|
||||
@ -4375,24 +4386,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
|
||||
if (dbname.back() != '/') {
|
||||
manifest_path->push_back('/');
|
||||
}
|
||||
*manifest_path += fname;
|
||||
manifest_path->append(fname);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VersionSet::ReadAndRecover(
|
||||
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
|
||||
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
|
||||
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
|
||||
std::unordered_map<int, std::string>& column_families_not_found,
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
|
||||
builders,
|
||||
VersionEditParams* version_edit_params, std::string* db_id) {
|
||||
assert(reader != nullptr);
|
||||
Status* log_read_status, VersionEditParams* version_edit_params,
|
||||
std::string* db_id) {
|
||||
assert(read_buffer != nullptr);
|
||||
assert(log_read_status != nullptr);
|
||||
Status s;
|
||||
Slice record;
|
||||
std::string scratch;
|
||||
size_t recovered_edits = 0;
|
||||
while (reader->ReadRecord(&record, &scratch) && s.ok()) {
|
||||
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
|
||||
log_read_status->ok()) {
|
||||
VersionEdit edit;
|
||||
s = edit.DecodeFrom(record);
|
||||
if (!s.ok()) {
|
||||
@ -4436,6 +4449,9 @@ Status VersionSet::ReadAndRecover(
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!log_read_status->ok()) {
|
||||
s = *log_read_status;
|
||||
}
|
||||
if (!s.ok()) {
|
||||
// Clear the buffer if we fail to decode/apply an edit.
|
||||
read_buffer->Clear();
|
||||
@ -4482,8 +4498,7 @@ Status VersionSet::Recover(
|
||||
db_options_->log_readahead_size));
|
||||
}
|
||||
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
||||
builders;
|
||||
VersionBuilderMap builders;
|
||||
|
||||
// add default column family
|
||||
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
|
||||
@ -4505,12 +4520,13 @@ Status VersionSet::Recover(
|
||||
VersionEditParams version_edit_params;
|
||||
{
|
||||
VersionSet::LogReporter reporter;
|
||||
reporter.status = &s;
|
||||
Status log_read_status;
|
||||
reporter.status = &log_read_status;
|
||||
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
|
||||
true /* checksum */, 0 /* log_number */);
|
||||
AtomicGroupReadBuffer read_buffer;
|
||||
s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
|
||||
column_families_not_found, builders,
|
||||
s = ReadAndRecover(reader, &read_buffer, cf_name_to_options,
|
||||
column_families_not_found, builders, &log_read_status,
|
||||
&version_edit_params, db_id);
|
||||
current_manifest_file_size = reader.GetReadOffset();
|
||||
assert(current_manifest_file_size != 0);
|
||||
@ -4594,7 +4610,11 @@ Status VersionSet::Recover(
|
||||
Version* v = new Version(cfd, this, file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
current_version_number_++);
|
||||
builder->SaveTo(v->storage_info());
|
||||
s = builder->SaveTo(v->storage_info());
|
||||
if (!s.ok()) {
|
||||
delete v;
|
||||
return s;
|
||||
}
|
||||
|
||||
// Install recovered version
|
||||
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
|
||||
@ -4771,21 +4791,20 @@ Status VersionSet::TryRecoverFromOneManifest(
|
||||
db_options_->log_readahead_size));
|
||||
}
|
||||
|
||||
assert(s.ok());
|
||||
VersionSet::LogReporter reporter;
|
||||
reporter.status = &s;
|
||||
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
|
||||
/*checksum=*/true, /*log_num=*/0);
|
||||
{
|
||||
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
|
||||
const_cast<VersionSet*>(this));
|
||||
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
|
||||
const_cast<VersionSet*>(this));
|
||||
|
||||
s = handler_pit.Iterate(reader, db_id);
|
||||
handler_pit.Iterate(reader, &s, db_id);
|
||||
|
||||
assert(nullptr != has_missing_table_file);
|
||||
*has_missing_table_file = handler_pit.HasMissingFiles();
|
||||
}
|
||||
assert(nullptr != has_missing_table_file);
|
||||
*has_missing_table_file = handler_pit.HasMissingFiles();
|
||||
|
||||
return s;
|
||||
return handler_pit.status();
|
||||
}
|
||||
|
||||
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
|
||||
@ -5145,7 +5164,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
|
||||
Version* v = new Version(cfd, this, file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
current_version_number_++);
|
||||
builder->SaveTo(v->storage_info());
|
||||
s = builder->SaveTo(v->storage_info());
|
||||
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
|
||||
|
||||
printf("--------------- Column family \"%s\" (ID %" PRIu32
|
||||
@ -5866,8 +5885,7 @@ Status ReactiveVersionSet::Recover(
|
||||
// In recovery, nobody else can access it, so it's fine to set it to be
|
||||
// initialized earlier.
|
||||
default_cfd->set_initialized();
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
||||
builders;
|
||||
VersionBuilderMap builders;
|
||||
std::unordered_map<int, std::string> column_families_not_found;
|
||||
builders.insert(
|
||||
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
|
||||
@ -5875,7 +5893,7 @@ Status ReactiveVersionSet::Recover(
|
||||
|
||||
manifest_reader_status->reset(new Status());
|
||||
manifest_reporter->reset(new LogReporter());
|
||||
static_cast<LogReporter*>(manifest_reporter->get())->status =
|
||||
static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
|
||||
manifest_reader_status->get();
|
||||
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
|
||||
log::Reader* reader = manifest_reader->get();
|
||||
@ -5884,10 +5902,9 @@ Status ReactiveVersionSet::Recover(
|
||||
VersionEdit version_edit;
|
||||
while (s.ok() && retry < 1) {
|
||||
assert(reader != nullptr);
|
||||
Slice record;
|
||||
std::string scratch;
|
||||
s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
|
||||
column_families_not_found, builders, &version_edit);
|
||||
s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
|
||||
column_families_not_found, builders,
|
||||
manifest_reader_status->get(), &version_edit);
|
||||
if (s.ok()) {
|
||||
bool enough = version_edit.has_next_file_number_ &&
|
||||
version_edit.has_log_number_ &&
|
||||
@ -5966,13 +5983,23 @@ Status ReactiveVersionSet::Recover(
|
||||
Version* v = new Version(cfd, this, file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
current_version_number_++);
|
||||
builder->SaveTo(v->storage_info());
|
||||
s = builder->SaveTo(v->storage_info());
|
||||
|
||||
// Install recovered version
|
||||
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
|
||||
!(db_options_->skip_stats_update_on_db_open));
|
||||
AppendVersion(cfd, v);
|
||||
if (s.ok()) {
|
||||
// Install recovered version
|
||||
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
|
||||
!(db_options_->skip_stats_update_on_db_open));
|
||||
AppendVersion(cfd, v);
|
||||
} else {
|
||||
ROCKS_LOG_ERROR(db_options_->info_log,
|
||||
"[%s]: inconsistent version: %s\n",
|
||||
cfd->GetName().c_str(), s.ToString().c_str());
|
||||
delete v;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
next_file_number_.store(version_edit.next_file_number_ + 1);
|
||||
last_allocated_sequence_ = version_edit.last_sequence_;
|
||||
last_published_sequence_ = version_edit.last_sequence_;
|
||||
@ -6049,6 +6076,8 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
|
||||
if (s.ok()) {
|
||||
applied_edits++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6058,13 +6087,14 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
}
|
||||
// It's possible that:
|
||||
// 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
|
||||
// Or the version(s) rebuilt from tailing the MANIFEST is inconsistent.
|
||||
// 2) we have finished reading the current MANIFEST.
|
||||
// 3) we have encountered an IOError reading the current MANIFEST.
|
||||
// We need to look for the next MANIFEST and start from there. If we cannot
|
||||
// find the next MANIFEST, we should exit the loop.
|
||||
s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
|
||||
Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
|
||||
reader = manifest_reader->get();
|
||||
if (s.ok()) {
|
||||
if (tmp_s.ok()) {
|
||||
if (reader->file()->file_name() == old_manifest_path) {
|
||||
// Still processing the same MANIFEST, thus no need to continue this
|
||||
// loop since no record is available if we have reached here.
|
||||
@ -6094,6 +6124,7 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
number_of_edits_to_skip_ += 2;
|
||||
}
|
||||
}
|
||||
s = tmp_s;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6186,12 +6217,16 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
|
||||
auto version = new Version(cfd, this, file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
current_version_number_++);
|
||||
builder->SaveTo(version->storage_info());
|
||||
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
|
||||
AppendVersion(cfd, version);
|
||||
active_version_builders_.erase(builder_iter);
|
||||
if (cfds_changed->count(cfd) == 0) {
|
||||
cfds_changed->insert(cfd);
|
||||
s = builder->SaveTo(version->storage_info());
|
||||
if (s.ok()) {
|
||||
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
|
||||
AppendVersion(cfd, version);
|
||||
active_version_builders_.erase(builder_iter);
|
||||
if (cfds_changed->count(cfd) == 0) {
|
||||
cfds_changed->insert(cfd);
|
||||
}
|
||||
} else {
|
||||
delete version;
|
||||
}
|
||||
} else if (s.IsPathNotFound()) {
|
||||
s = Status::OK();
|
||||
@ -6199,23 +6234,25 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
|
||||
// Some other error has occurred during LoadTableHandlers.
|
||||
}
|
||||
|
||||
if (version_edit->HasNextFile()) {
|
||||
next_file_number_.store(version_edit->next_file_number_ + 1);
|
||||
if (s.ok()) {
|
||||
if (version_edit->HasNextFile()) {
|
||||
next_file_number_.store(version_edit->next_file_number_ + 1);
|
||||
}
|
||||
if (version_edit->has_last_sequence_) {
|
||||
last_allocated_sequence_ = version_edit->last_sequence_;
|
||||
last_published_sequence_ = version_edit->last_sequence_;
|
||||
last_sequence_ = version_edit->last_sequence_;
|
||||
}
|
||||
if (version_edit->has_prev_log_number_) {
|
||||
prev_log_number_ = version_edit->prev_log_number_;
|
||||
MarkFileNumberUsed(version_edit->prev_log_number_);
|
||||
}
|
||||
if (version_edit->has_log_number_) {
|
||||
MarkFileNumberUsed(version_edit->log_number_);
|
||||
}
|
||||
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
|
||||
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
|
||||
}
|
||||
if (version_edit->has_last_sequence_) {
|
||||
last_allocated_sequence_ = version_edit->last_sequence_;
|
||||
last_published_sequence_ = version_edit->last_sequence_;
|
||||
last_sequence_ = version_edit->last_sequence_;
|
||||
}
|
||||
if (version_edit->has_prev_log_number_) {
|
||||
prev_log_number_ = version_edit->prev_log_number_;
|
||||
MarkFileNumberUsed(version_edit->prev_log_number_);
|
||||
}
|
||||
if (version_edit->has_log_number_) {
|
||||
MarkFileNumberUsed(version_edit->log_number_);
|
||||
}
|
||||
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
|
||||
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -1102,6 +1102,10 @@ class VersionSet {
|
||||
void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
|
||||
|
||||
protected:
|
||||
using VersionBuilderMap =
|
||||
std::unordered_map<uint32_t,
|
||||
std::unique_ptr<BaseReferencedVersionBuilder>>;
|
||||
|
||||
struct ManifestWriter;
|
||||
|
||||
friend class Version;
|
||||
@ -1113,7 +1117,9 @@ class VersionSet {
|
||||
struct LogReporter : public log::Reader::Reporter {
|
||||
Status* status;
|
||||
virtual void Corruption(size_t /*bytes*/, const Status& s) override {
|
||||
if (this->status->ok()) *this->status = s;
|
||||
if (status->ok()) {
|
||||
*status = s;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -1144,13 +1150,14 @@ class VersionSet {
|
||||
const VersionEdit* edit);
|
||||
|
||||
Status ReadAndRecover(
|
||||
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
|
||||
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
|
||||
const std::unordered_map<std::string, ColumnFamilyOptions>&
|
||||
name_to_options,
|
||||
std::unordered_map<int, std::string>& column_families_not_found,
|
||||
std::unordered_map<
|
||||
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
|
||||
VersionEditParams* version_edit, std::string* db_id = nullptr);
|
||||
Status* log_read_status, VersionEditParams* version_edit,
|
||||
std::string* db_id = nullptr);
|
||||
|
||||
// REQUIRES db mutex
|
||||
Status ApplyOneVersionEditToBuilder(
|
||||
@ -1279,8 +1286,7 @@ class ReactiveVersionSet : public VersionSet {
|
||||
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
|
||||
|
||||
private:
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
||||
active_version_builders_;
|
||||
VersionBuilderMap active_version_builders_;
|
||||
AtomicGroupReadBuffer read_buffer_;
|
||||
// Number of version edits to skip by ReadAndApply at the beginning of a new
|
||||
// MANIFEST created by primary.
|
||||
|
@ -1370,7 +1370,7 @@ TEST_F(VersionSetAtomicGroupTest,
|
||||
// Write the corrupted edits.
|
||||
AddNewEditsToLog(kAtomicGroupSize);
|
||||
mu.Lock();
|
||||
EXPECT_OK(
|
||||
EXPECT_NOK(
|
||||
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
|
||||
mu.Unlock();
|
||||
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
|
||||
@ -1420,7 +1420,7 @@ TEST_F(VersionSetAtomicGroupTest,
|
||||
&manifest_reader_status));
|
||||
AddNewEditsToLog(kAtomicGroupSize);
|
||||
mu.Lock();
|
||||
EXPECT_OK(
|
||||
EXPECT_NOK(
|
||||
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
|
||||
mu.Unlock();
|
||||
EXPECT_EQ(edits_[1].DebugString(),
|
||||
|
10
env/fs_posix.cc
vendored
10
env/fs_posix.cc
vendored
@ -241,6 +241,8 @@ class PosixFileSystem : public FileSystem {
|
||||
s = IOError("while mmap file for read", fname, errno);
|
||||
close(fd);
|
||||
}
|
||||
} else {
|
||||
close(fd);
|
||||
}
|
||||
} else {
|
||||
if (options.use_direct_reads && !options.use_mmap_reads) {
|
||||
@ -889,14 +891,16 @@ class PosixFileSystem : public FileSystem {
|
||||
if (fd < 0) {
|
||||
return IOError("While open for IsDirectory()", path, errno);
|
||||
}
|
||||
IOStatus io_s;
|
||||
struct stat sbuf;
|
||||
if (fstat(fd, &sbuf) < 0) {
|
||||
return IOError("While doing stat for IsDirectory()", path, errno);
|
||||
io_s = IOError("While doing stat for IsDirectory()", path, errno);
|
||||
}
|
||||
if (nullptr != is_dir) {
|
||||
close(fd);
|
||||
if (io_s.ok() && nullptr != is_dir) {
|
||||
*is_dir = S_ISDIR(sbuf.st_mode);
|
||||
}
|
||||
return IOStatus::OK();
|
||||
return io_s;
|
||||
}
|
||||
|
||||
FileOptions OptimizeForLogWrite(const FileOptions& file_options,
|
||||
|
@ -62,13 +62,13 @@ class RandomAccessFileReader {
|
||||
public:
|
||||
explicit RandomAccessFileReader(
|
||||
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
|
||||
Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
|
||||
Env* _env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
|
||||
HistogramImpl* file_read_hist = nullptr,
|
||||
RateLimiter* rate_limiter = nullptr,
|
||||
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
|
||||
: file_(std::move(raf)),
|
||||
file_name_(std::move(_file_name)),
|
||||
env_(env),
|
||||
env_(_env),
|
||||
stats_(stats),
|
||||
hist_type_(hist_type),
|
||||
file_read_hist_(file_read_hist),
|
||||
|
@ -16,7 +16,17 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#endif
|
||||
|
||||
#include <string>
|
||||
|
||||
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
#include "port/stack_trace.h"
|
||||
#endif
|
||||
|
||||
#include "rocksdb/slice.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
@ -25,7 +35,16 @@ class Status {
|
||||
public:
|
||||
// Create a success status.
|
||||
Status() : code_(kOk), subcode_(kNone), sev_(kNoError), state_(nullptr) {}
|
||||
~Status() { delete[] state_; }
|
||||
~Status() {
|
||||
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
if (!checked_) {
|
||||
fprintf(stderr, "Failed to check Status\n");
|
||||
port::PrintStack();
|
||||
abort();
|
||||
}
|
||||
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
delete[] state_;
|
||||
}
|
||||
|
||||
// Copy the specified status.
|
||||
Status(const Status& s);
|
||||
@ -362,8 +381,6 @@ inline Status::Status(const Status& s, Severity sev)
|
||||
state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_);
|
||||
}
|
||||
inline Status& Status::operator=(const Status& s) {
|
||||
// The following condition catches both aliasing (when this == &s),
|
||||
// and the common case where both s and *this are ok.
|
||||
if (this != &s) {
|
||||
code_ = s.code_;
|
||||
subcode_ = s.subcode_;
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include "rocksdb/cache.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/status.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
@ -40,11 +41,8 @@ class TableBuilder;
|
||||
class TableFactory;
|
||||
class TableReader;
|
||||
class WritableFileWriter;
|
||||
struct ColumnFamilyOptions;
|
||||
struct ConfigOptions;
|
||||
struct DBOptions;
|
||||
struct EnvOptions;
|
||||
struct Options;
|
||||
|
||||
enum ChecksumType : char {
|
||||
kNoChecksum = 0x0,
|
||||
|
@ -59,6 +59,7 @@ class LDBCommand {
|
||||
static const std::string ARG_FILE_SIZE;
|
||||
static const std::string ARG_CREATE_IF_MISSING;
|
||||
static const std::string ARG_NO_VALUE;
|
||||
static const std::string ARG_DISABLE_CONSISTENCY_CHECKS;
|
||||
|
||||
struct ParsedParams {
|
||||
std::string cmd;
|
||||
@ -163,6 +164,9 @@ class LDBCommand {
|
||||
// If true, try to construct options from DB's option files.
|
||||
bool try_load_options_;
|
||||
|
||||
// The value passed to options.force_consistency_checks.
|
||||
bool force_consistency_checks_;
|
||||
|
||||
bool create_if_missing_;
|
||||
|
||||
/**
|
||||
|
@ -5,8 +5,8 @@
|
||||
#pragma once
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 9
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_MINOR 10
|
||||
#define ROCKSDB_PATCH 4
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -647,7 +647,12 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
||||
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
|
||||
"block_size_deviation=8;block_restart_interval=4;"
|
||||
"format_version=5;whole_key_filtering=1;"
|
||||
"filter_policy=bloomfilter:4.567:false;",
|
||||
"filter_policy=bloomfilter:4.567:false;"
|
||||
// A bug caused read_amp_bytes_per_bit to be a large integer in OPTIONS
|
||||
// file generated by 6.10 to 6.14. Though bug is fixed in these releases,
|
||||
// we need to handle the case of loading OPTIONS file generated before the
|
||||
// fix.
|
||||
"read_amp_bytes_per_bit=17179869185;",
|
||||
&new_opt));
|
||||
ASSERT_TRUE(new_opt.cache_index_and_filter_blocks);
|
||||
ASSERT_EQ(new_opt.index_type, BlockBasedTableOptions::kHashSearch);
|
||||
@ -668,6 +673,9 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
||||
dynamic_cast<const BloomFilterPolicy&>(*new_opt.filter_policy);
|
||||
EXPECT_EQ(bfp.GetMillibitsPerKey(), 4567);
|
||||
EXPECT_EQ(bfp.GetWholeBitsPerKey(), 5);
|
||||
// Verify that only the lower 32bits are stored in
|
||||
// new_opt.read_amp_bytes_per_bit.
|
||||
EXPECT_EQ(1U, new_opt.read_amp_bytes_per_bit);
|
||||
|
||||
// unknown option
|
||||
ASSERT_NOK(GetBlockBasedTableOptionsFromString(
|
||||
@ -1279,7 +1287,7 @@ TEST_F(OptionsTest, ConvertOptionsTest) {
|
||||
// This test suite tests the old APIs into the Configure options methods.
|
||||
// Once those APIs are officially deprecated, this test suite can be deleted.
|
||||
class OptionsOldApiTest : public testing::Test {};
|
||||
|
||||
|
||||
TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
|
||||
std::unordered_map<std::string, std::string> cf_options_map = {
|
||||
{"write_buffer_size", "1"},
|
||||
@ -1744,7 +1752,7 @@ TEST_F(OptionsOldApiTest, GetColumnFamilyOptionsFromStringTest) {
|
||||
ASSERT_TRUE(new_cf_opt.memtable_factory != nullptr);
|
||||
ASSERT_EQ(std::string(new_cf_opt.memtable_factory->Name()), "SkipListFactory");
|
||||
}
|
||||
|
||||
|
||||
TEST_F(OptionsOldApiTest, GetBlockBasedTableOptionsFromString) {
|
||||
BlockBasedTableOptions table_opt;
|
||||
BlockBasedTableOptions new_opt;
|
||||
@ -1919,7 +1927,7 @@ TEST_F(OptionsOldApiTest, GetBlockBasedTableOptionsFromString) {
|
||||
->GetHighPriPoolRatio(),
|
||||
0.5);
|
||||
}
|
||||
|
||||
|
||||
TEST_F(OptionsOldApiTest, GetPlainTableOptionsFromString) {
|
||||
PlainTableOptions table_opt;
|
||||
PlainTableOptions new_opt;
|
||||
@ -1950,7 +1958,7 @@ TEST_F(OptionsOldApiTest, GetPlainTableOptionsFromString) {
|
||||
"encoding_type=kPrefixXX",
|
||||
&new_opt));
|
||||
}
|
||||
|
||||
|
||||
TEST_F(OptionsOldApiTest, GetOptionsFromStringTest) {
|
||||
Options base_options, new_options;
|
||||
base_options.write_buffer_size = 20;
|
||||
@ -2504,7 +2512,7 @@ TEST_F(OptionsParserTest, Readahead) {
|
||||
uint64_t file_size = 0;
|
||||
ASSERT_OK(env_->GetFileSize(kOptionsFileName, &file_size));
|
||||
assert(file_size > 0);
|
||||
|
||||
|
||||
RocksDBOptionsParser parser;
|
||||
|
||||
env_->num_seq_file_read_ = 0;
|
||||
|
@ -300,8 +300,24 @@ static std::unordered_map<std::string, OptionTypeInfo>
|
||||
OptionTypeFlags::kNone, 0}},
|
||||
{"read_amp_bytes_per_bit",
|
||||
{offsetof(struct BlockBasedTableOptions, read_amp_bytes_per_bit),
|
||||
OptionType::kSizeT, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone, 0}},
|
||||
OptionType::kUInt32T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone, 0,
|
||||
[](const ConfigOptions& /*opts*/, const std::string& /*name*/,
|
||||
const std::string& value, char* addr) {
|
||||
// A workaround to fix a bug in 6.10, 6.11, 6.12, 6.13
|
||||
// and 6.14. The bug will write out 8 bytes to OPTIONS file from the
|
||||
// starting address of BlockBasedTableOptions.read_amp_bytes_per_bit
|
||||
// which is actually a uint32. Consequently, the value of
|
||||
// read_amp_bytes_per_bit written in the OPTIONS file is wrong.
|
||||
// From 6.15, RocksDB will try to parse the read_amp_bytes_per_bit
|
||||
// from OPTIONS file as a uint32. To be able to load OPTIONS file
|
||||
// generated by affected releases before the fix, we need to
|
||||
// manually parse read_amp_bytes_per_bit with this special hack.
|
||||
uint64_t read_amp_bytes_per_bit = ParseUint64(value);
|
||||
*(reinterpret_cast<uint32_t*>(addr)) =
|
||||
static_cast<uint32_t>(read_amp_bytes_per_bit);
|
||||
return Status::OK();
|
||||
}}},
|
||||
{"enable_index_compression",
|
||||
{offsetof(struct BlockBasedTableOptions, enable_index_compression),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal,
|
||||
|
@ -2461,13 +2461,16 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
ExtractUserKey(v.first_internal_key)) < 0)) {
|
||||
// The requested key falls between highest key in previous block and
|
||||
// lowest key in current block.
|
||||
*(miter->s) = iiter->status();
|
||||
if (!iiter->status().IsNotFound()) {
|
||||
*(miter->s) = iiter->status();
|
||||
}
|
||||
data_block_range.SkipKey(miter);
|
||||
sst_file_range.SkipKey(miter);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!uncompression_dict_status.ok()) {
|
||||
assert(!uncompression_dict_status.IsNotFound());
|
||||
*(miter->s) = uncompression_dict_status;
|
||||
data_block_range.SkipKey(miter);
|
||||
sst_file_range.SkipKey(miter);
|
||||
@ -2680,7 +2683,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
|
||||
rep_->level);
|
||||
}
|
||||
if (s.ok()) {
|
||||
if (s.ok() && !iiter->status().IsNotFound()) {
|
||||
s = iiter->status();
|
||||
}
|
||||
*(miter->s) = s;
|
||||
@ -2815,6 +2818,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
// In the case of two level indexes, we would have exited the above loop
|
||||
// by checking index_iter->Valid(), but Valid() might have returned false
|
||||
// due to an IO error. So check the index_iter status
|
||||
s = index_iter->status();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -252,6 +252,7 @@ class BlockBasedTable : public TableReader {
|
||||
|
||||
private:
|
||||
friend class MockedBlockBasedTable;
|
||||
friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
|
||||
static std::atomic<uint64_t> next_cache_key_id_;
|
||||
BlockCacheTracer* const block_cache_tracer_;
|
||||
|
||||
|
341
table/block_based/block_based_table_reader_test.cc
Normal file
341
table/block_based/block_based_table_reader_test.cc
Normal file
@ -0,0 +1,341 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#include "table/block_based/block_based_table_reader.h"
|
||||
#include "rocksdb/file_system.h"
|
||||
#include "table/block_based/partitioned_index_iterator.h"
|
||||
|
||||
#include "db/table_properties_collector.h"
|
||||
#include "options/options_helper.h"
|
||||
#include "port/port.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "table/block_based/block_based_table_builder.h"
|
||||
#include "table/block_based/block_based_table_factory.h"
|
||||
#include "table/format.h"
|
||||
#include "test_util/testharness.h"
|
||||
#include "test_util/testutil.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
class BlockBasedTableReaderTest
|
||||
: public testing::Test,
|
||||
public testing::WithParamInterface<std::tuple<
|
||||
CompressionType, bool, BlockBasedTableOptions::IndexType, bool>> {
|
||||
protected:
|
||||
CompressionType compression_type_;
|
||||
bool use_direct_reads_;
|
||||
|
||||
void SetUp() override {
|
||||
BlockBasedTableOptions::IndexType index_type;
|
||||
bool no_block_cache;
|
||||
std::tie(compression_type_, use_direct_reads_, index_type, no_block_cache) =
|
||||
GetParam();
|
||||
|
||||
test::SetupSyncPointsToMockDirectIO();
|
||||
test_dir_ = test::PerThreadDBPath("block_based_table_reader_test");
|
||||
env_ = Env::Default();
|
||||
fs_ = FileSystem::Default();
|
||||
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
|
||||
|
||||
BlockBasedTableOptions opts;
|
||||
opts.index_type = index_type;
|
||||
opts.no_block_cache = no_block_cache;
|
||||
table_factory_.reset(
|
||||
static_cast<BlockBasedTableFactory*>(NewBlockBasedTableFactory(opts)));
|
||||
}
|
||||
|
||||
void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); }
|
||||
|
||||
// Creates a table with the specificied key value pairs (kv).
|
||||
void CreateTable(const std::string& table_name,
|
||||
const CompressionType& compression_type,
|
||||
const std::map<std::string, std::string>& kv) {
|
||||
std::unique_ptr<WritableFileWriter> writer;
|
||||
NewFileWriter(table_name, &writer);
|
||||
|
||||
// Create table builder.
|
||||
Options options;
|
||||
ImmutableCFOptions ioptions(options);
|
||||
InternalKeyComparator comparator(options.comparator);
|
||||
ColumnFamilyOptions cf_options;
|
||||
MutableCFOptions moptions(cf_options);
|
||||
std::vector<std::unique_ptr<IntTblPropCollectorFactory>> factories;
|
||||
std::unique_ptr<TableBuilder> table_builder(table_factory_->NewTableBuilder(
|
||||
TableBuilderOptions(ioptions, moptions, comparator, &factories,
|
||||
compression_type, 0 /* sample_for_compression */,
|
||||
CompressionOptions(), false /* skip_filters */,
|
||||
kDefaultColumnFamilyName, -1 /* level */),
|
||||
0 /* column_family_id */, writer.get()));
|
||||
|
||||
// Build table.
|
||||
for (auto it = kv.begin(); it != kv.end(); it++) {
|
||||
std::string k = ToInternalKey(it->first);
|
||||
std::string v = it->second;
|
||||
table_builder->Add(k, v);
|
||||
}
|
||||
ASSERT_OK(table_builder->Finish());
|
||||
}
|
||||
|
||||
void NewBlockBasedTableReader(const FileOptions& foptions,
|
||||
const ImmutableCFOptions& ioptions,
|
||||
const InternalKeyComparator& comparator,
|
||||
const std::string& table_name,
|
||||
std::unique_ptr<BlockBasedTable>* table) {
|
||||
std::unique_ptr<RandomAccessFileReader> file;
|
||||
NewFileReader(table_name, foptions, &file);
|
||||
|
||||
uint64_t file_size = 0;
|
||||
ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size));
|
||||
|
||||
std::unique_ptr<TableReader> table_reader;
|
||||
ASSERT_OK(BlockBasedTable::Open(ioptions, EnvOptions(),
|
||||
table_factory_->table_options(), comparator,
|
||||
std::move(file), file_size, &table_reader));
|
||||
|
||||
table->reset(reinterpret_cast<BlockBasedTable*>(table_reader.release()));
|
||||
}
|
||||
|
||||
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
|
||||
|
||||
const std::shared_ptr<FileSystem>& fs() const { return fs_; }
|
||||
|
||||
private:
|
||||
std::string test_dir_;
|
||||
Env* env_;
|
||||
std::shared_ptr<FileSystem> fs_;
|
||||
std::unique_ptr<BlockBasedTableFactory> table_factory_;
|
||||
|
||||
void WriteToFile(const std::string& content, const std::string& filename) {
|
||||
std::unique_ptr<FSWritableFile> f;
|
||||
ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr));
|
||||
ASSERT_OK(f->Append(content, IOOptions(), nullptr));
|
||||
ASSERT_OK(f->Close(IOOptions(), nullptr));
|
||||
}
|
||||
|
||||
void NewFileWriter(const std::string& filename,
|
||||
std::unique_ptr<WritableFileWriter>* writer) {
|
||||
std::string path = Path(filename);
|
||||
EnvOptions env_options;
|
||||
FileOptions foptions;
|
||||
std::unique_ptr<FSWritableFile> file;
|
||||
ASSERT_OK(fs_->NewWritableFile(path, foptions, &file, nullptr));
|
||||
writer->reset(new WritableFileWriter(std::move(file), path, env_options));
|
||||
}
|
||||
|
||||
void NewFileReader(const std::string& filename, const FileOptions& opt,
|
||||
std::unique_ptr<RandomAccessFileReader>* reader) {
|
||||
std::string path = Path(filename);
|
||||
std::unique_ptr<FSRandomAccessFile> f;
|
||||
ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
|
||||
reader->reset(new RandomAccessFileReader(std::move(f), path, env_));
|
||||
}
|
||||
|
||||
std::string ToInternalKey(const std::string& key) {
|
||||
InternalKey internal_key(key, 0, ValueType::kTypeValue);
|
||||
return internal_key.Encode().ToString();
|
||||
}
|
||||
};
|
||||
|
||||
// Tests MultiGet in both direct IO and non-direct IO mode.
|
||||
// The keys should be in cache after MultiGet.
|
||||
TEST_P(BlockBasedTableReaderTest, MultiGet) {
|
||||
// Prepare key-value pairs to occupy multiple blocks.
|
||||
// Each value is 256B, every 16 pairs constitute 1 block.
|
||||
// Adjacent blocks contain values with different compression complexity:
|
||||
// human readable strings are easier to compress than random strings.
|
||||
std::map<std::string, std::string> kv;
|
||||
{
|
||||
Random rnd(101);
|
||||
uint32_t key = 0;
|
||||
for (int block = 0; block < 100; block++) {
|
||||
for (int i = 0; i < 16; i++) {
|
||||
char k[9] = {0};
|
||||
// Internal key is constructed directly from this key,
|
||||
// and internal key size is required to be >= 8 bytes,
|
||||
// so use %08u as the format string.
|
||||
sprintf(k, "%08u", key);
|
||||
std::string v;
|
||||
if (block % 2) {
|
||||
v = test::RandomHumanReadableString(&rnd, 256);
|
||||
} else {
|
||||
test::RandomString(&rnd, 256, &v);
|
||||
}
|
||||
kv[std::string(k)] = v;
|
||||
key++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare keys, values, and statuses for MultiGet.
|
||||
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys;
|
||||
autovector<PinnableSlice, MultiGetContext::MAX_BATCH_SIZE> values;
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
|
||||
{
|
||||
const int step =
|
||||
static_cast<int>(kv.size()) / MultiGetContext::MAX_BATCH_SIZE;
|
||||
auto it = kv.begin();
|
||||
for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE; i++) {
|
||||
keys.emplace_back(it->first);
|
||||
values.emplace_back();
|
||||
statuses.emplace_back();
|
||||
std::advance(it, step);
|
||||
}
|
||||
}
|
||||
|
||||
std::string table_name =
|
||||
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
|
||||
CreateTable(table_name, compression_type_, kv);
|
||||
|
||||
std::unique_ptr<BlockBasedTable> table;
|
||||
Options options;
|
||||
ImmutableCFOptions ioptions(options);
|
||||
FileOptions foptions;
|
||||
foptions.use_direct_reads = use_direct_reads_;
|
||||
InternalKeyComparator comparator(options.comparator);
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
|
||||
|
||||
// Ensure that keys are not in cache before MultiGet.
|
||||
for (auto& key : keys) {
|
||||
ASSERT_FALSE(table->TEST_KeyInCache(ReadOptions(), key));
|
||||
}
|
||||
|
||||
// Prepare MultiGetContext.
|
||||
autovector<GetContext, MultiGetContext::MAX_BATCH_SIZE> get_context;
|
||||
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
||||
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
|
||||
for (size_t i = 0; i < keys.size(); ++i) {
|
||||
get_context.emplace_back(
|
||||
BytewiseComparator(), nullptr, nullptr, nullptr, GetContext::kNotFound,
|
||||
keys[i], &values[i], nullptr, nullptr, nullptr, true /* do_merge */,
|
||||
nullptr, nullptr, nullptr, nullptr, nullptr, nullptr);
|
||||
key_context.emplace_back(nullptr, keys[i], &values[i], nullptr,
|
||||
&statuses.back());
|
||||
key_context.back().get_context = &get_context.back();
|
||||
}
|
||||
for (auto& key_ctx : key_context) {
|
||||
sorted_keys.emplace_back(&key_ctx);
|
||||
}
|
||||
MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, ReadOptions());
|
||||
|
||||
// Execute MultiGet.
|
||||
MultiGetContext::Range range = ctx.GetMultiGetRange();
|
||||
table->MultiGet(ReadOptions(), &range, nullptr);
|
||||
|
||||
for (const Status& status : statuses) {
|
||||
ASSERT_OK(status);
|
||||
}
|
||||
// Check that keys are in cache after MultiGet.
|
||||
for (size_t i = 0; i < keys.size(); i++) {
|
||||
ASSERT_TRUE(table->TEST_KeyInCache(ReadOptions(), keys[i]));
|
||||
ASSERT_EQ(values[i].ToString(), kv[keys[i].ToString()]);
|
||||
}
|
||||
}
|
||||
|
||||
class BlockBasedTableReaderTestVerifyChecksum
|
||||
: public BlockBasedTableReaderTest {
|
||||
public:
|
||||
BlockBasedTableReaderTestVerifyChecksum() : BlockBasedTableReaderTest() {}
|
||||
};
|
||||
|
||||
TEST_P(BlockBasedTableReaderTestVerifyChecksum, ChecksumMismatch) {
|
||||
// Prepare key-value pairs to occupy multiple blocks.
|
||||
// Each value is 256B, every 16 pairs constitute 1 block.
|
||||
// Adjacent blocks contain values with different compression complexity:
|
||||
// human readable strings are easier to compress than random strings.
|
||||
Random rnd(101);
|
||||
std::map<std::string, std::string> kv;
|
||||
{
|
||||
uint32_t key = 0;
|
||||
for (int block = 0; block < 800; block++) {
|
||||
for (int i = 0; i < 16; i++) {
|
||||
char k[9] = {0};
|
||||
// Internal key is constructed directly from this key,
|
||||
// and internal key size is required to be >= 8 bytes,
|
||||
// so use %08u as the format string.
|
||||
sprintf(k, "%08u", key);
|
||||
std::string v;
|
||||
test::RandomString(&rnd, 256, &v);
|
||||
kv[std::string(k)] = v;
|
||||
key++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string table_name =
|
||||
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
|
||||
CreateTable(table_name, compression_type_, kv);
|
||||
|
||||
std::unique_ptr<BlockBasedTable> table;
|
||||
Options options;
|
||||
ImmutableCFOptions ioptions(options);
|
||||
FileOptions foptions;
|
||||
foptions.use_direct_reads = use_direct_reads_;
|
||||
InternalKeyComparator comparator(options.comparator);
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
|
||||
|
||||
// Use the top level iterator to find the offset/size of the first
|
||||
// 2nd level index block and corrupt the block
|
||||
IndexBlockIter iiter_on_stack;
|
||||
BlockCacheLookupContext context{TableReaderCaller::kUserVerifyChecksum};
|
||||
InternalIteratorBase<IndexValue>* iiter = table->NewIndexIterator(
|
||||
ReadOptions(), /*disable_prefix_seek=*/false, &iiter_on_stack,
|
||||
/*get_context=*/nullptr, &context);
|
||||
std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
|
||||
if (iiter != &iiter_on_stack) {
|
||||
iiter_unique_ptr = std::unique_ptr<InternalIteratorBase<IndexValue>>(iiter);
|
||||
}
|
||||
ASSERT_OK(iiter->status());
|
||||
iiter->SeekToFirst();
|
||||
BlockHandle handle = static_cast<ParititionedIndexIterator*>(iiter)
|
||||
->index_iter_->value()
|
||||
.handle;
|
||||
table.reset();
|
||||
|
||||
// Corrupt the block pointed to by handle
|
||||
test::CorruptFile(Path(table_name), static_cast<int>(handle.offset()), 128);
|
||||
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
|
||||
Status s = table->VerifyChecksum(ReadOptions(),
|
||||
TableReaderCaller::kUserVerifyChecksum);
|
||||
ASSERT_EQ(s.code(), Status::kCorruption);
|
||||
}
|
||||
|
||||
// Param 1: compression type
|
||||
// Param 2: whether to use direct reads
|
||||
// Param 3: Block Based Table Index type
|
||||
// Param 4: BBTO no_block_cache option
|
||||
#ifdef ROCKSDB_LITE
|
||||
// Skip direct I/O tests in lite mode since direct I/O is unsupported.
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
MultiGet, BlockBasedTableReaderTest,
|
||||
::testing::Combine(
|
||||
::testing::ValuesIn(GetSupportedCompressions()),
|
||||
::testing::Values(false),
|
||||
::testing::Values(BlockBasedTableOptions::IndexType::kBinarySearch),
|
||||
::testing::Values(false)));
|
||||
#else // ROCKSDB_LITE
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
MultiGet, BlockBasedTableReaderTest,
|
||||
::testing::Combine(
|
||||
::testing::ValuesIn(GetSupportedCompressions()), ::testing::Bool(),
|
||||
::testing::Values(BlockBasedTableOptions::IndexType::kBinarySearch),
|
||||
::testing::Values(false)));
|
||||
#endif // ROCKSDB_LITE
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
VerifyChecksum, BlockBasedTableReaderTestVerifyChecksum,
|
||||
::testing::Combine(
|
||||
::testing::ValuesIn(GetSupportedCompressions()),
|
||||
::testing::Values(false),
|
||||
::testing::Values(
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch),
|
||||
::testing::Values(true)));
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -122,6 +122,7 @@ class ParititionedIndexIterator : public InternalIteratorBase<IndexValue> {
|
||||
}
|
||||
|
||||
private:
|
||||
friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
|
||||
const BlockBasedTable* table_;
|
||||
const ReadOptions read_options_;
|
||||
#ifndef NDEBUG
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include "test_util/testutil.h"
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <sys/stat.h>
|
||||
#include <array>
|
||||
#include <cctype>
|
||||
#include <fstream>
|
||||
@ -540,5 +541,46 @@ void SetupSyncPointsToMockDirectIO() {
|
||||
#endif
|
||||
}
|
||||
|
||||
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt) {
|
||||
struct stat sbuf;
|
||||
if (stat(fname.c_str(), &sbuf) != 0) {
|
||||
// strerror is not thread-safe so should not be used in the "passing" path
|
||||
// of unit tests (sometimes parallelized) but is OK here where test fails
|
||||
const char* msg = strerror(errno);
|
||||
fprintf(stderr, "%s:%s\n", fname.c_str(), msg);
|
||||
assert(false);
|
||||
}
|
||||
|
||||
if (offset < 0) {
|
||||
// Relative to end of file; make it absolute
|
||||
if (-offset > sbuf.st_size) {
|
||||
offset = 0;
|
||||
} else {
|
||||
offset = static_cast<int>(sbuf.st_size + offset);
|
||||
}
|
||||
}
|
||||
if (offset > sbuf.st_size) {
|
||||
offset = static_cast<int>(sbuf.st_size);
|
||||
}
|
||||
if (offset + bytes_to_corrupt > sbuf.st_size) {
|
||||
bytes_to_corrupt = static_cast<int>(sbuf.st_size - offset);
|
||||
}
|
||||
|
||||
// Do it
|
||||
std::string contents;
|
||||
Status s = ReadFileToString(Env::Default(), fname, &contents);
|
||||
assert(s.ok());
|
||||
for (int i = 0; i < bytes_to_corrupt; i++) {
|
||||
contents[i + offset] ^= 0x80;
|
||||
}
|
||||
s = WriteStringToFile(Env::Default(), contents, fname);
|
||||
assert(s.ok());
|
||||
Options options;
|
||||
EnvOptions env_options;
|
||||
#ifndef ROCKSDB_LITE
|
||||
assert(!VerifySstFileChecksum(options, env_options, fname).ok());
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -809,5 +809,7 @@ void ResetTmpDirForDirectIO();
|
||||
// to the file system.
|
||||
void SetupSyncPointsToMockDirectIO();
|
||||
|
||||
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt);
|
||||
|
||||
} // namespace test
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
9
third-party/folly/folly/lang/Align.h
vendored
9
third-party/folly/folly/lang/Align.h
vendored
@ -22,6 +22,15 @@
|
||||
#include <folly/Portability.h>
|
||||
#include <folly/ConstexprMath.h>
|
||||
|
||||
// Work around bug https://gcc.gnu.org/bugzilla/show_bug.cgi?id=56019
|
||||
#ifdef __GNUC__
|
||||
#if __GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 9)
|
||||
namespace std {
|
||||
using ::max_align_t;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
namespace folly {
|
||||
|
||||
// has_extended_alignment
|
||||
|
@ -64,6 +64,8 @@ const std::string LDBCommand::ARG_TTL_START = "start_time";
|
||||
const std::string LDBCommand::ARG_TTL_END = "end_time";
|
||||
const std::string LDBCommand::ARG_TIMESTAMP = "timestamp";
|
||||
const std::string LDBCommand::ARG_TRY_LOAD_OPTIONS = "try_load_options";
|
||||
const std::string LDBCommand::ARG_DISABLE_CONSISTENCY_CHECKS =
|
||||
"disable_consistency_checks";
|
||||
const std::string LDBCommand::ARG_IGNORE_UNKNOWN_OPTIONS =
|
||||
"ignore_unknown_options";
|
||||
const std::string LDBCommand::ARG_FROM = "from";
|
||||
@ -362,6 +364,8 @@ LDBCommand::LDBCommand(const std::map<std::string, std::string>& options,
|
||||
is_db_ttl_ = IsFlagPresent(flags, ARG_TTL);
|
||||
timestamp_ = IsFlagPresent(flags, ARG_TIMESTAMP);
|
||||
try_load_options_ = IsFlagPresent(flags, ARG_TRY_LOAD_OPTIONS);
|
||||
force_consistency_checks_ =
|
||||
!IsFlagPresent(flags, ARG_DISABLE_CONSISTENCY_CHECKS);
|
||||
config_options_.ignore_unknown_options =
|
||||
IsFlagPresent(flags, ARG_IGNORE_UNKNOWN_OPTIONS);
|
||||
}
|
||||
@ -527,6 +531,7 @@ std::vector<std::string> LDBCommand::BuildCmdLineOptions(
|
||||
ARG_FILE_SIZE,
|
||||
ARG_FIX_PREFIX_LEN,
|
||||
ARG_TRY_LOAD_OPTIONS,
|
||||
ARG_DISABLE_CONSISTENCY_CHECKS,
|
||||
ARG_IGNORE_UNKNOWN_OPTIONS,
|
||||
ARG_CF_NAME};
|
||||
ret.insert(ret.end(), options.begin(), options.end());
|
||||
@ -622,6 +627,7 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
|
||||
}
|
||||
}
|
||||
|
||||
cf_opts->force_consistency_checks = force_consistency_checks_;
|
||||
if (use_table_options) {
|
||||
cf_opts->table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
}
|
||||
@ -2839,7 +2845,7 @@ CheckConsistencyCommand::CheckConsistencyCommand(
|
||||
const std::vector<std::string>& /*params*/,
|
||||
const std::map<std::string, std::string>& options,
|
||||
const std::vector<std::string>& flags)
|
||||
: LDBCommand(options, flags, false, BuildCmdLineOptions({})) {}
|
||||
: LDBCommand(options, flags, true, BuildCmdLineOptions({})) {}
|
||||
|
||||
void CheckConsistencyCommand::Help(std::string& ret) {
|
||||
ret.append(" ");
|
||||
@ -2848,19 +2854,13 @@ void CheckConsistencyCommand::Help(std::string& ret) {
|
||||
}
|
||||
|
||||
void CheckConsistencyCommand::DoCommand() {
|
||||
Options opt = PrepareOptionsForOpenDB();
|
||||
opt.paranoid_checks = true;
|
||||
if (!exec_state_.IsNotStarted()) {
|
||||
return;
|
||||
}
|
||||
DB* db;
|
||||
Status st = DB::OpenForReadOnly(opt, db_path_, &db, false);
|
||||
delete db;
|
||||
if (st.ok()) {
|
||||
options_.paranoid_checks = true;
|
||||
options_.num_levels = 64;
|
||||
OpenDB();
|
||||
if (exec_state_.IsSucceed() || exec_state_.IsNotStarted()) {
|
||||
fprintf(stdout, "OK\n");
|
||||
} else {
|
||||
exec_state_ = LDBCommandExecuteResult::Failed(st.ToString());
|
||||
}
|
||||
CloseDB();
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
@ -551,6 +551,98 @@ TEST_F(LdbCmdTest, ListFileTombstone) {
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(LdbCmdTest, DisableConsistencyChecks) {
|
||||
Env* base_env = TryLoadCustomOrDefaultEnv();
|
||||
std::unique_ptr<Env> env(NewMemEnv(base_env));
|
||||
Options opts;
|
||||
opts.env = env.get();
|
||||
opts.create_if_missing = true;
|
||||
|
||||
std::string dbname = test::TmpDir();
|
||||
|
||||
{
|
||||
DB* db = nullptr;
|
||||
ASSERT_OK(DB::Open(opts, dbname, &db));
|
||||
|
||||
WriteOptions wopts;
|
||||
FlushOptions fopts;
|
||||
fopts.wait = true;
|
||||
|
||||
ASSERT_OK(db->Put(wopts, "foo1", "1"));
|
||||
ASSERT_OK(db->Put(wopts, "bar1", "2"));
|
||||
ASSERT_OK(db->Flush(fopts));
|
||||
|
||||
ASSERT_OK(db->Put(wopts, "foo2", "3"));
|
||||
ASSERT_OK(db->Put(wopts, "bar2", "4"));
|
||||
ASSERT_OK(db->Flush(fopts));
|
||||
|
||||
delete db;
|
||||
}
|
||||
|
||||
{
|
||||
char arg1[] = "./ldb";
|
||||
char arg2[1024];
|
||||
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
|
||||
char arg3[] = "checkconsistency";
|
||||
char* argv[] = {arg1, arg2, arg3};
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"Version::PrepareApply:forced_check", [&](void* arg) {
|
||||
bool* forced = reinterpret_cast<bool*>(arg);
|
||||
ASSERT_TRUE(*forced);
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_EQ(
|
||||
0, LDBCommandRunner::RunCommand(3, argv, opts, LDBOptions(), nullptr));
|
||||
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
{
|
||||
char arg1[] = "./ldb";
|
||||
char arg2[1024];
|
||||
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
|
||||
char arg3[] = "scan";
|
||||
char* argv[] = {arg1, arg2, arg3};
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"Version::PrepareApply:forced_check", [&](void* arg) {
|
||||
bool* forced = reinterpret_cast<bool*>(arg);
|
||||
ASSERT_TRUE(*forced);
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_EQ(
|
||||
0, LDBCommandRunner::RunCommand(3, argv, opts, LDBOptions(), nullptr));
|
||||
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
{
|
||||
char arg1[] = "./ldb";
|
||||
char arg2[1024];
|
||||
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
|
||||
char arg3[] = "scan";
|
||||
char arg4[] = "--disable_consistency_checks";
|
||||
char* argv[] = {arg1, arg2, arg3, arg4};
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"ColumnFamilyData::ColumnFamilyData", [&](void* arg) {
|
||||
ColumnFamilyOptions* cfo =
|
||||
reinterpret_cast<ColumnFamilyOptions*>(arg);
|
||||
ASSERT_FALSE(cfo->force_consistency_checks);
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_EQ(
|
||||
0, LDBCommandRunner::RunCommand(4, argv, opts, LDBOptions(), nullptr));
|
||||
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
}
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
@ -46,6 +46,8 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
|
||||
" : DB supports ttl and value is internally timestamp-suffixed\n");
|
||||
ret.append(" --" + LDBCommand::ARG_TRY_LOAD_OPTIONS +
|
||||
" : Try to load option file from DB.\n");
|
||||
ret.append(" --" + LDBCommand::ARG_DISABLE_CONSISTENCY_CHECKS +
|
||||
" : Set options.force_consistency_checks = false.\n");
|
||||
ret.append(" --" + LDBCommand::ARG_IGNORE_UNKNOWN_OPTIONS +
|
||||
" : Ignore unknown options when loading option file.\n");
|
||||
ret.append(" --" + LDBCommand::ARG_BLOOM_BITS + "=<int,e.g.:14>\n");
|
||||
|
Loading…
Reference in New Issue
Block a user