Compare commits

...

29 Commits

Author SHA1 Message Date
Yanqin Jin
a21a78a3cf Fix the logic of setting read_amp_bytes_per_bit from OPTIONS file (#7680)
Summary:
Instead of using `EncodeFixed32` which always serialize a integer to
little endian, we should use the local machine's endianness when
populating a native data structure during options parsing.
Without this fix, `read_amp_bytes_per_bit` may be populated incorrectly
on big-endian machines.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7680

Test Plan: make check

Reviewed By: pdillinger

Differential Revision: D24999166

Pulled By: riversand963

fbshipit-source-id: dc603cff6e17f8fa32479ce6df93b93082e6b0c4
2020-11-17 10:07:53 -08:00
Yanqin Jin
910ea96ff7 Bump version and update HISTORY 2020-11-15 14:44:07 -08:00
Yanqin Jin
5841911c4b Hack to load OPTIONS file for read_amp_bytes_per_bit (#7659)
Summary:
A temporary hack to work around a bug in 6.10, 6.11, 6.12, 6.13 and
6.14. The bug will write out 8 bytes to OPTIONS file from the starting
address of BlockBasedTableOptions.read_amp_bytes_per_bit which is
actually a uint32. Consequently, the value of read_amp_bytes_per_bit
written in the OPTIONS file is wrong. From 6.15, RocksDB will
try to parse the read_amp_bytes_per_bit from OPTIONS file as a uint32.
To be able to load OPTIONS file generated by affected releases before
the fix, we need to manually parse read_amp_bytes_per_bit with this hack.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7659

Test Plan:
Generate a db with current 6.14.fb (head at b6db05dbb5). Maybe use db_stress.

Checkout this PR, run
```
 ~/rocksdb/ldb --db=. --try_load_options --ignore_unknown_options idump --count_only
```
Expect success, and should not see
```
Failed: Invalid argument: Error parsing read_amp_bytes_per_bit:17179869184
```

Also
make check

Reviewed By: anand1976

Differential Revision: D24954752

Pulled By: riversand963

fbshipit-source-id: c7b802fc3e52acd050a4fc1cd475016122234394
2020-11-13 23:01:00 -08:00
Huisheng Liu
053e59c25f fix read_amp_bytes_per_bit field size (#7651)
Summary:
The field in BlockBasedTableOptions is 4 bytes:
  // Default: 0 (disabled)
  uint32_t read_amp_bytes_per_bit = 0;

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7651

Reviewed By: ltamasi

Differential Revision: D24844994

Pulled By: riversand963

fbshipit-source-id: e2695e55532256ef8996dd6939cad06987a80293
2020-11-11 12:09:30 -08:00
Yanqin Jin
f6013e16bc Update version 2020-06-19 15:10:41 -07:00
sdong
cef4f9178f Fix a bug that causes iterator to return wrong result in a rare data race (#6973)
Summary:
The bug fixed in https://github.com/facebook/rocksdb/pull/1816/ is now applicable to iterator too. This was not an issue but https://github.com/facebook/rocksdb/pull/2886 caused the regression. If a put and DB flush happens just between iterator to get latest sequence number and getting super version, empty result for the key or an older value can be returned, which is wrong.
Fix it in the same way as the fix in https://github.com/facebook/rocksdb/issues/1816, that is to get the sequence number after referencing the super version.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6973

Test Plan: Will run stress tests for a while to make sure there is no general regression.

Reviewed By: ajkr

Differential Revision: D22029348

fbshipit-source-id: 94390f93630906796d6e2fec321f44a920953fd1
2020-06-19 14:32:22 -07:00
Yanqin Jin
201c5a5e61 Fail recovery when MANIFEST record checksum mismatch (#6996)
Summary:
https://github.com/facebook/rocksdb/issues/5411 refactored `VersionSet::Recover` but introduced a bug, explained as follows.
Before, once a checksum mismatch happens, `reporter` will set `s` to be non-ok. Therefore, Recover will stop processing the MANIFEST any further.
```
// Correct
// Inside Recover
LogReporter reporter;
reporter.status = &s;
log::Reader reader(..., reporter);
while (reader.ReadRecord() && s.ok()) {
...
}
```
The bug is that, the local variable `s` in `ReadAndRecover` won't be updated by `reporter` while reading the MANIFEST. It is possible that the reader sees a checksum mismatch in a record, but `ReadRecord` retries internally read and finds the next valid record. The mismatched record will be ignored and no error is reported.
```
// Incorrect
// Inside Recover
LogReporter reporter;
reporter.status = &s;
log::Reader reader(..., reporter);
s = ReadAndRecover(reader, ...);

// Inside ReadAndRecover
  Status s;  // Shadows the s in Recover.
  while (reader.ReadRecord() && s.ok()) {
   ...
  }
```
`LogReporter` can use a separate `log_read_status` to track the errors while reading the MANIFEST. RocksDB can process more MANIFEST entries only if `log_read_status.ok()`.
Test plan (devserver):
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6996

Reviewed By: ajkr

Differential Revision: D22105746

Pulled By: riversand963

fbshipit-source-id: b22f717a423457a41ca152a242abbb64cf91fc38
2020-06-19 14:32:19 -07:00
Yanqin Jin
a7b0f02c47 Close file to avoid file-descriptor leakage (#6936)
Summary:
When operation on an open file descriptor fails, we should close the file descriptor.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6936

Test Plan: make check

Reviewed By: pdillinger

Differential Revision: D21885458

Pulled By: riversand963

fbshipit-source-id: ba077a76b256a8537f21e22e4ec198f45390bf50
2020-06-19 13:57:53 -07:00
Yanqin Jin
0c785a442b Fix a bug of overwriting return code (#6989)
Summary:
In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6989

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D22071418

Pulled By: riversand963

fbshipit-source-id: 5a4ea5dfb1a41f41c7a3fdaf62b163007b42f04b
2020-06-16 14:46:39 -07:00
Yanqin Jin
1ceec266d7 Let best-efforts recovery ignore CURRENT file (#6970)
Summary:
Best-efforts recovery does not check the content of CURRENT file to determine which MANIFEST to recover from. However, it still checks the presence of CURRENT file to determine whether to create a new DB during `open()`. Therefore, we can tweak the logic in `open()` a little bit so that best-efforts recovery does not rely on CURRENT file at all.

Test plan (dev server):
make check
./db_basic_test --gtest_filter=DBBasicTest.RecoverWithNoCurrentFile
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6970

Reviewed By: anand1976

Differential Revision: D22013990

Pulled By: riversand963

fbshipit-source-id: db552a1868c60ed70e1f7cd252a3a076eb8ea58f
2020-06-16 14:45:57 -07:00
Peter Dillinger
1482c8697c Misc things for ASSERT_STATUS_CHECKED, also gcc 4.8.5 (#6871)
Summary:
* Print stack trace on status checked failure
* Make folly_synchronization_distributed_mutex_test a parallel test
* Disable ldb_test.py and rocksdb_dump_test.sh with
  ASSERT_STATUS_CHECKED (broken)
* Fix shadow warning in random_access_file_reader.h reported by gcc
  4.8.5 (ROCKSDB_NO_FBCODE), also https://github.com/facebook/rocksdb/issues/6866
* Work around compiler bug on max_align_t for gcc < 4.9
* Remove an apparently wrong comment in status.h
* Use check_some in Travis config (for proper diagnostic output)
* Fix ignored Status in loop in options_helper.cc
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6871

Test Plan: manual, CI

Reviewed By: ajkr

Differential Revision: D21706619

Pulled By: pdillinger

fbshipit-source-id: daf6364173d6689904eb394461a69a11f5bee2cb
2020-06-05 12:49:16 -07:00
anand76
ff76f053af Update version 2020-06-05 11:44:01 -07:00
anand76
402fe7d469 Check iterator status BlockBasedTableReader::VerifyChecksumInBlocks() (#6909)
Summary:
The ```for``` loop in ```VerifyChecksumInBlocks``` only checks ```index_iter->Valid()``` which could be ```false``` either due to reaching the end of the index or, in case of partitioned index, it could be due to a checksum mismatch error when reading a 2nd level index block. Instead of throwing away the index iterator status, we need to return any errors back to the caller.

Tests:
Add a test in block_based_table_reader_test.cc.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6909

Reviewed By: pdillinger

Differential Revision: D21833922

Pulled By: anand1976

fbshipit-source-id: bc778ebf1121dbbdd768689de5183f07a9f0beae
2020-06-05 11:33:59 -07:00
Peter Dillinger
0367cd4370 Fix test PartitionedMultiGet in db_bloom_filter_test
Partial back-port of #6905
2020-06-03 20:11:16 -07:00
Yanqin Jin
515a27941a Update version and HISTORY 2020-05-27 16:38:10 -07:00
Yanqin Jin
138a967fa6 Fix buck target db_stress_lib in opt mode (#6847)
Summary:
In buck build with opt mode, target should not include rocksdb_test_lib.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6847

Test Plan: Watch for internal cont build.

Reviewed By: ajkr

Differential Revision: D21586803

Pulled By: riversand963

fbshipit-source-id: 76d253c18d16fac6cab86a8c3f6b471ad5b6efb3
2020-05-18 10:45:56 -07:00
Yanqin Jin
98d19c3056 Do not print u'string' in TARGETS file (#6841)
Summary:
Before this PR, extra deps passed in from cmd line to buckifier will be parsed
and used to populate a dict. Using this dict and printing to TARGETS file will
lead to printing u'', disallowed by build tools. This PR removes the u''.

Test Plan (local dev server):
```
python buckifier/buckify_rocksdb.py  '{"fake": {"extra_deps": [":test_dep", "//fake/module:mock1"], "extra_compiler_flags": ["-Os", "-DROCKSDB_LITE"]}}'
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6841

Reviewed By: siying

Differential Revision: D21538155

Pulled By: riversand963

fbshipit-source-id: 09403668a4aa1a15bad7dac229c2bc8ce8ee1349
2020-05-13 16:15:06 -07:00
anand76
3f4008b1f1 Update HISTORY.md with a missing API change 2020-05-13 10:11:44 -07:00
sdong
8f9cc10916 Improve ldb consistency checks (#6802)
Summary:
When using ldb, users cannot turn on force consistency check in most commands, while they cannot use checksonsistnecy with --try_load_options. The change fixes both by:
1. checkconsistency now calls OpenDB() so that it gets all the options loading and sanitized options logic
2. use options.check_consistency_checks = true by default, and add a --disable_consistency_checks to turn it off.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6802

Test Plan: Add a new unit test. Some manual tests with corrupted DBs.

Reviewed By: pdillinger

Differential Revision: D21388051

fbshipit-source-id: 8d122732d391b426e3982a1c3232a8e3763ffad0
2020-05-11 12:38:50 -07:00
Yanqin Jin
bfc1b7acf0 Fix a few bugs in best-efforts recovery (#6824)
Summary:
1. Update column_family_memtables_ to point to latest column_family_set in
   version_set after recovery.
2. Normalize file paths passed by application so that directories end with '/'
   or '\\'.
3. In addition to missing files, corrupted files are also ignored in
   best-efforts recovery.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6824

Test Plan: COMPILE_WITH_ASAN=1 make check

Reviewed By: anand1976

Differential Revision: D21463905

Pulled By: riversand963

fbshipit-source-id: c48db8843cc93c8c1c7139c474b64e6f775307d2
2020-05-08 14:14:20 -07:00
Andrew Kryczka
e0fcbf93d0 Fixup formatting of 15ee2ee "fix swallowed error for file deletion consistency check (#6809)" 2020-05-08 12:50:31 -07:00
anand76
f08ee8d3de Include options.h in table.h (#6823)
Summary:
https://github.com/facebook/rocksdb/issues/6389 replaced the #include of options.h in table.h with forward declarations, which is causing some build failures in RocksDB users in 6.10. Remove the forward declarations and #include options.h as recommended by the style guide - https://google.github.io/styleguide/cppguide.html#Forward_Declarations
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6823

Reviewed By: riversand963

Differential Revision: D21464078

Pulled By: anand1976

fbshipit-source-id: 6033ee2544d279690f57bb0db91bc83816cee11d
2020-05-08 11:48:48 -07:00
anand76
60de1e6c6d Update release version to 6.10 (#6797)
Summary:
Update HISTORY.md and version.h to 6.10.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6797

Reviewed By: zhichao-cao

Differential Revision: D21371390

Pulled By: anand1976

fbshipit-source-id: 6017bca24fc5d12076d1ddaec7783c9b85712d42
2020-05-08 11:48:17 -07:00
Peter Dillinger
18d0c90bba Fix failure to write output in SpecialEnv::GetCurrentTime (#6803)
Summary:
This very old test code bug was causing a new valgrind failure
in MultiGetDeadlineExceeded

Also fix hang in MultiGetDeadlineExceeded by unifying with some logic from another test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6803

Test Plan: run that unit test under valgrind, make check

Reviewed By: siying

Differential Revision: D21388470

Pulled By: pdillinger

fbshipit-source-id: 0ce99d6d5eb8cd3195b17406892c8c5cff5fa5dd
2020-05-08 10:11:04 -07:00
Yanqin Jin
6d6e857404 Do not swallow error returned from SaveTo() (#6801)
Summary:
With consistency check enabled, VersionBuilder::SaveTo() may return error once
corruption is detected while building versions. We should handle these errors.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6801

Test Plan: make check

Reviewed By: siying

Differential Revision: D21385045

Pulled By: riversand963

fbshipit-source-id: 98f6424e2a4699b62befa21e9fe00e70a771118e
2020-05-08 10:02:02 -07:00
anand76
3df88b3953 Fix race due to delete triggered compaction in Universal compaction mode (#6799)
Summary:
Delete triggered compaction in universal compaction mode was causing a corruption when scheduled in parallel with other compactions.
1. When num_levels = 1, a file marked for compaction may be picked along with all older files in L0, without checking if any of them are already being compaction. This can cause unpredictable results like resurrection of older versions of keys or deleted keys.
2. When num_levels > 1, a delete triggered compaction would not get scheduled if it overlaps with a running regular compaction. However, the reverse is not true. This is due to the fact that in ```UniversalCompactionBuilder::CalculateSortedRuns```, it assumes that entire sorted runs are picked for compaction and only checks the first file in a sorted run to determine conflicts. This is violated by a delete triggered compaction as it works on a subset of a sorted run.

Fix the bug for num_levels > 1, and disable the feature for now when num_levels = 1. After disabling this feature, files would still get marked for compaction, but no compaction would get scheduled.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6799

Reviewed By: pdillinger

Differential Revision: D21431286

Pulled By: anand1976

fbshipit-source-id: ae9f0bdb1d6ae2f10284847db731c23f43af164a
2020-05-08 09:02:10 -07:00
Peter Dillinger
4584a99a3c Fix false NotFound from batched MultiGet with kHashSearch (#6821)
Summary:
The error is assigning KeyContext::s to NotFound status in a
table reader for a "not found in this table" case, which skips searching
in later tables, like only a delete should. (The hash search index iterator
is the only one that can return status NotFound even if Valid() == false.)

This was detected by intermittent failure in
MultiThreadedDBTest.MultiThreaded/5, a kHashSearch configuration.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6821

Test Plan: modified existing unit test to reproduce problem

Reviewed By: anand1976

Differential Revision: D21450469

Pulled By: pdillinger

fbshipit-source-id: 7478003684d637dbd491cdac81468041a791be2c
2020-05-08 08:58:06 -07:00
sdong
cb33efd0a9 Avoid Swallowing Some File Consistency Checking Bugs (#6793)
Summary:
We are swallowing some file consistency checking failures. This is not expected. We are fixing two cases: DB reopen and manifest dump.
More places are not fixed and need follow-up.

Error from CheckConsistencyForDeletes() is also swallowed, which is not fixed in this PR.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6793

Test Plan: Add a unit test to cover the reopen case.

Reviewed By: riversand963

Differential Revision: D21366525

fbshipit-source-id: eb438a322237814e8d5125f916a3c6de97f39ded
2020-05-08 08:51:41 -07:00
Andrew Kryczka
15ee2ee438 fix swallowed error for file deletion consistency check (#6809)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/6809

Reviewed By: pdillinger

Differential Revision: D21411971

Pulled By: ajkr

fbshipit-source-id: 900b6b0370b76e9a3e5e03f968e2ac1bbaab73b8
2020-05-07 14:35:10 -07:00
50 changed files with 1475 additions and 256 deletions

View File

@ -287,6 +287,9 @@ script:
mkdir build && cd build && cmake -DJNI=1 .. -DCMAKE_BUILD_TYPE=Release $OPT && make -j4 rocksdb rocksdbjni
;;
status_checked)
OPT=-DTRAVIS V=1 ASSERT_STATUS_CHECKED=1 make -j4 check_some
;;
esac
notifications:
email:

View File

@ -1,5 +1,30 @@
# Rocksdb Change Log
## Unreleased
## 6.10.4 (11/15/2020)
### Bug Fixes
* Fixed a bug of encoding and parsing BlockBasedTableOptions::read_amp_bytes_per_bit as a 64-bit integer.
* Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected.
## 6.10.3 (6/16/2020)
### Bug fix
* Fix potential file descriptor leakage in PosixEnv's IsDirectory() and NewRandomAccessFile().
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement.
## 6.10.2 (6/5/2020)
### Bug fix
* Fix false negative from the VerifyChecksum() API when there is a checksum mismatch in an index partition block in a BlockBasedTable format table file (index_type is kTwoLevelIndexSearch).
## 6.10.1 (5/27/2020)
### Bug fix
* Remove "u'<string>'" in TARGETS file.
* Fix db_stress_lib target in buck.
## 6.10 (5/2/2020)
### Behavior Changes
* Disable delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode and num_levels = 1 in order to avoid a corruption bug.
### Bug Fixes
* Fix wrong result being read from ingested file. May happen when a key in the file happen to be prefix of another key also in the file. The issue can further cause more data corruption. The issue exists with rocksdb >= 5.0.0 since DB::IngestExternalFile() was introduced.
* Finish implementation of BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey. It's now ready for use. Significantly reduces read amplification in some setups, especially for iterator seeks.
@ -9,11 +34,22 @@
* Fix a bug caused by not including user timestamp in MultiGet LookupKey construction. This can lead to wrong query result since the trailing bytes of a user key, if not shorter than timestamp, will be mistaken for user timestamp.
* Fix a bug caused by using wrong compare function when sorting the input keys of MultiGet with timestamps.
* Upgraded version of bzip library (1.0.6 -> 1.0.8) used with RocksJava to address potential vulnerabilities if an attacker can manipulate compressed data saved and loaded by RocksDB (not normal). See issue #6703.
* Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true.
* Fix possible false NotFound status from batched MultiGet using index type kHashSearch.
* Fix corruption caused by enabling delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode, along with parallel compactions. The bug can result in two parallel compactions picking the same input files, resulting in the DB resurrecting older and deleted versions of some keys.
* Fix a use-after-free bug in best-efforts recovery. column_family_memtables_ needs to point to valid ColumnFamilySet.
* Let best-efforts recovery ignore corrupted files during table loading.
* Fix a bug when making options.bottommost_compression, options.compression_opts and options.bottommost_compression_opts dynamically changeable: the modified values are not written to option files or returned back to users when being queried.
* Fix a bug where index key comparisons were unaccounted in `PerfContext::user_key_comparison_count` for lookups in files written with `format_version >= 3`.
* Fix many bloom.filter statistics not being updated in batch MultiGet.
### Public API Change
* Add a ConfigOptions argument to the APIs dealing with converting options to and from strings and files. The ConfigOptions is meant to replace some of the options (such as input_strings_escaped and ignore_unknown_options) and allow for more parameters to be passed in the future without changing the function signature.
* Add NewFileChecksumGenCrc32cFactory to the file checksum public API, such that the builtin Crc32c based file checksum generator factory can be used by applications.
* Add IsDirectory to Env and FS to indicate if a path is a directory.
* ldb now uses options.force_consistency_checks = true by default and "--disable_consistency_checks" is added to disable it.
* Add ReadOptions::deadline to allow users to specify a deadline for MultiGet requests
### New Features
* Added support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism. This feature is experimental for now.
@ -22,11 +58,6 @@
* Added functionality in sst_dump tool to check the compressed file size for different compression levels and print the time spent on compressing files with each compression type. Added arguments `--compression_level_from` and `--compression_level_to` to report size of all compression levels and one compression_type must be specified with it so that it will report compressed sizes of one compression type with different levels.
* Added statistics for redundant insertions into block cache: rocksdb.block.cache.*add.redundant. (There is currently no coordination to ensure that only one thread loads a table block when many threads are trying to access that same table block.)
### Bug Fixes
* Fix a bug when making options.bottommost_compression, options.compression_opts and options.bottommost_compression_opts dynamically changeable: the modified values are not written to option files or returned back to users when being queried.
* Fix a bug where index key comparisons were unaccounted in `PerfContext::user_key_comparison_count` for lookups in files written with `format_version >= 3`.
* Fix many bloom.filter statistics not being updated in batch MultiGet.
### Performance Improvements
* Improve performance of batch MultiGet with partitioned filters, by sharing block cache lookups to applicable filter blocks.
* Reduced memory copies when fetching and uncompressing compressed blocks from sst files.

View File

@ -626,10 +626,6 @@ TESTS = \
timer_test \
db_with_timestamp_compaction_test \
ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1)
TESTS += folly_synchronization_distributed_mutex_test
endif
PARALLEL_TEST = \
backupable_db_test \
db_bloom_filter_test \
@ -653,6 +649,11 @@ PARALLEL_TEST = \
write_prepared_transaction_test \
write_unprepared_transaction_test \
ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1)
TESTS += folly_synchronization_distributed_mutex_test
PARALLEL_TEST += folly_synchronization_distributed_mutex_test
endif
# options_settable_test doesn't pass with UBSAN as we use hack in the test
ifdef COMPILE_WITH_UBSAN
TESTS := $(shell echo $(TESTS) | sed 's/\boptions_settable_test\b//g')
@ -972,9 +973,11 @@ check: all
ifneq ($(PLATFORM), OS_AIX)
$(PYTHON) tools/check_all_python.py
ifeq ($(filter -DROCKSDB_LITE,$(OPT)),)
ifndef ASSERT_STATUS_CHECKED # not yet working with these tests
$(PYTHON) tools/ldb_test.py
sh tools/rocksdb_dump_test.sh
endif
endif
endif
$(MAKE) check-format
$(MAKE) check-buck-targets

10
TARGETS
View File

@ -109,6 +109,11 @@ ROCKSDB_OS_DEPS += ([(
["third-party//jemalloc:headers"],
)] if sanitizer == "" else [])
ROCKSDB_LIB_DEPS = [
":rocksdb_lib",
":rocksdb_test_lib",
] if not is_opt_mode else [":rocksdb_lib"]
cpp_library(
name = "rocksdb_lib",
srcs = [
@ -437,10 +442,7 @@ cpp_library(
os_deps = ROCKSDB_OS_DEPS,
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
deps = [
":rocksdb_lib",
":rocksdb_test_lib",
],
deps = ROCKSDB_LIB_DEPS,
external_deps = ROCKSDB_EXTERNAL_DEPS,
)

View File

@ -168,14 +168,13 @@ def generate_targets(repo_path, deps_map):
["test_util/testutil.cc"],
[":rocksdb_lib"])
# rocksdb_stress_lib
TARGETS.add_library(
TARGETS.add_rocksdb_library(
"rocksdb_stress_lib",
src_mk.get("ANALYZER_LIB_SOURCES", [])
+ src_mk.get('STRESS_LIB_SOURCES', [])
+ ["test_util/testutil.cc"],
[":rocksdb_lib", ":rocksdb_test_lib"])
+ ["test_util/testutil.cc"])
print("Extra dependencies:\n{0}".format(str(deps_map)))
print("Extra dependencies:\n{0}".format(json.dumps(deps_map)))
# test for every test we found in the Makefile
for target_alias, deps in deps_map.items():
for test in sorted(tests):
@ -196,8 +195,8 @@ def generate_targets(repo_path, deps_map):
test_target_name,
match_src[0],
is_parallel,
deps['extra_deps'],
deps['extra_compiler_flags'])
json.dumps(deps['extra_deps']),
json.dumps(deps['extra_compiler_flags']))
if test in _EXPORTED_TEST_LIBS:
test_library = "%s_lib" % test_target_name

View File

@ -50,6 +50,18 @@ class TARGETSBuilder(object):
deps=pretty_list(deps)))
self.total_lib = self.total_lib + 1
def add_rocksdb_library(self, name, srcs, headers=None):
headers_attr_prefix = ""
if headers is None:
headers_attr_prefix = "auto_"
headers = "AutoHeaders.RECURSIVE_GLOB"
self.targets_file.write(targets_cfg.rocksdb_library_template.format(
name=name,
srcs=pretty_list(srcs),
headers_attr_prefix=headers_attr_prefix,
headers=headers))
self.total_lib = self.total_lib + 1
def add_binary(self, name, srcs, deps=None):
self.targets_file.write(targets_cfg.binary_template % (
name,

View File

@ -114,6 +114,11 @@ ROCKSDB_OS_DEPS += ([(
"linux",
["third-party//jemalloc:headers"],
)] if sanitizer == "" else [])
ROCKSDB_LIB_DEPS = [
":rocksdb_lib",
":rocksdb_test_lib",
] if not is_opt_mode else [":rocksdb_lib"]
"""
@ -132,6 +137,21 @@ cpp_library(
)
"""
rocksdb_library_template = """
cpp_library(
name = "{name}",
srcs = [{srcs}],
{headers_attr_prefix}headers = {headers},
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
compiler_flags = ROCKSDB_COMPILER_FLAGS,
os_deps = ROCKSDB_OS_DEPS,
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
deps = ROCKSDB_LIB_DEPS,
external_deps = ROCKSDB_EXTERNAL_DEPS,
)
"""
binary_template = """
cpp_binary(
name = "%s",

View File

@ -56,8 +56,9 @@ Status ArenaWrappedDBIter::Refresh() {
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
if (sv_number_ != cur_sv_number) {
Env* env = db_iter_->env();
db_iter_->~DBIter();
@ -65,6 +66,7 @@ Status ArenaWrappedDBIter::Refresh() {
new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
if (read_callback_) {
read_callback_->Refresh(latest_seq);
}
@ -78,7 +80,7 @@ Status ArenaWrappedDBIter::Refresh() {
latest_seq, /* allow_unprepared_value */ true);
SetIterUnderDBIter(internal_iter);
} else {
db_iter_->set_sequence(latest_seq);
db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber());
db_iter_->set_valid(false);
}
return Status::OK();

View File

@ -1085,6 +1085,8 @@ void CompactionPicker::PickFilesMarkedForCompaction(
Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
size_t random_file_index = static_cast<size_t>(rnd.Uniform(
static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
TEST_SYNC_POINT_CALLBACK("CompactionPicker::PickFilesMarkedForCompaction",
&random_file_index);
if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
// found the compaction!

View File

@ -78,8 +78,17 @@ class CompactionPickerTest : public testing::Test {
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
}
// Create a new VersionStorageInfo object so we can add mode files and then
// merge it with the existing VersionStorageInfo
void AddVersionStorage() {
temp_vstorage_.reset(new VersionStorageInfo(
&icmp_, ucmp_, options_.num_levels, ioptions_.compaction_style,
vstorage_.get(), false));
}
void DeleteVersionStorage() {
vstorage_.reset();
temp_vstorage_.reset();
files_.clear();
file_map_.clear();
input_files_.clear();
@ -88,18 +97,24 @@ class CompactionPickerTest : public testing::Test {
void Add(int level, uint32_t file_number, const char* smallest,
const char* largest, uint64_t file_size = 1, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
size_t compensated_file_size = 0) {
assert(level < vstorage_->num_levels());
size_t compensated_file_size = 0, bool marked_for_compact = false) {
VersionStorageInfo* vstorage;
if (temp_vstorage_) {
vstorage = temp_vstorage_.get();
} else {
vstorage = vstorage_.get();
}
assert(level < vstorage->num_levels());
FileMetaData* f = new FileMetaData(
file_number, path_id, file_size,
InternalKey(smallest, smallest_seq, kTypeValue),
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
largest_seq, marked_for_compact, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
vstorage_->AddFile(level, f);
vstorage->AddFile(level, f);
files_.emplace_back(f);
file_map_.insert({file_number, {f, level}});
}
@ -122,6 +137,12 @@ class CompactionPickerTest : public testing::Test {
}
void UpdateVersionStorageInfo() {
if (temp_vstorage_) {
VersionBuilder builder(FileOptions(), &ioptions_, nullptr,
vstorage_.get(), nullptr);
builder.SaveTo(temp_vstorage_.get());
vstorage_ = std::move(temp_vstorage_);
}
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
vstorage_->UpdateFilesByCompactionPri(ioptions_.compaction_pri);
vstorage_->UpdateNumNonEmptyLevels();
@ -132,6 +153,28 @@ class CompactionPickerTest : public testing::Test {
vstorage_->ComputeFilesMarkedForCompaction();
vstorage_->SetFinalized();
}
void AddFileToVersionStorage(int level, uint32_t file_number,
const char* smallest, const char* largest,
uint64_t file_size = 1, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100,
SequenceNumber largest_seq = 100,
size_t compensated_file_size = 0,
bool marked_for_compact = false) {
VersionStorageInfo* base_vstorage = vstorage_.release();
vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleUniversal,
base_vstorage, false));
Add(level, file_number, smallest, largest, file_size, path_id, smallest_seq,
largest_seq, compensated_file_size, marked_for_compact);
VersionBuilder builder(FileOptions(), &ioptions_, nullptr, base_vstorage,
nullptr);
builder.SaveTo(vstorage_.get());
UpdateVersionStorageInfo();
}
private:
std::unique_ptr<VersionStorageInfo> temp_vstorage_;
};
TEST_F(CompactionPickerTest, Empty) {
@ -1733,6 +1776,163 @@ TEST_F(CompactionPickerTest, IntraL0ForEarliestSeqno) {
ASSERT_EQ(0, compaction->output_level());
}
TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) {
const uint64_t kFileSize = 100000;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
// This test covers the case where a "regular" universal compaction is
// scheduled first, followed by a delete triggered compaction. The latter
// should fail
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300);
Add(3, 5U, "010", "080", 8 * kFileSize, 0, 200, 251);
Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction);
// Validate that its a compaction to reduce sorted runs
ASSERT_EQ(CompactionReason::kUniversalSortedRunNum,
compaction->compaction_reason());
ASSERT_EQ(0, compaction->output_level());
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(2U, compaction->num_input_files(0));
AddVersionStorage();
// Simulate a flush and mark the file for compaction
Add(0, 1U, "150", "200", kFileSize, 0, 551, 600, 0, true);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_FALSE(compaction2);
}
TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) {
const uint64_t kFileSize = 100000;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
// This test covers the case where a delete triggered compaction is
// scheduled first, followed by a "regular" compaction. The latter
// should fail
NewVersionStorage(5, kCompactionStyleUniversal);
// Mark file number 4 for compaction
Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
Add(3, 5U, "240", "290", 8 * kFileSize, 0, 201, 250);
Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
compaction->compaction_reason());
ASSERT_EQ(3, compaction->output_level());
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->num_input_files(1));
AddVersionStorage();
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_FALSE(compaction2);
}
TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) {
// The case where universal periodic compaction can be picked
// with some newer files being compacted.
const uint64_t kFileSize = 100000;
ioptions_.compaction_style = kCompactionStyleUniversal;
bool input_level_overlap = false;
bool output_level_overlap = false;
// Let's mark 2 files in 2 different levels for compaction. The
// compaction picker will randomly pick one, so use the sync point to
// ensure a deterministic order. Loop until both cases are covered
size_t random_index = 0;
SyncPoint::GetInstance()->SetCallBack(
"CompactionPicker::PickFilesMarkedForCompaction", [&](void* arg) {
size_t* index = static_cast<size_t*>(arg);
*index = random_index;
});
SyncPoint::GetInstance()->EnableProcessing();
while (!input_level_overlap || !output_level_overlap) {
// Ensure that the L0 file gets picked first
random_index = !input_level_overlap ? 0 : 1;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
Add(3, 2U, "010", "020", 2 * kFileSize, 0, 201, 248);
Add(3, 3U, "250", "270", 2 * kFileSize, 0, 202, 249);
Add(3, 4U, "290", "310", 2 * kFileSize, 0, 203, 250);
Add(3, 5U, "310", "320", 2 * kFileSize, 0, 204, 251, 0, true);
Add(4, 6U, "301", "350", 8 * kFileSize, 0, 101, 150);
Add(4, 7U, "501", "750", 8 * kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
compaction->compaction_reason());
ASSERT_TRUE(compaction->start_level() == 0 ||
compaction->start_level() == 3);
if (compaction->start_level() == 0) {
// The L0 file was picked. The next compaction will detect an
// overlap on its input level
input_level_overlap = true;
ASSERT_EQ(3, compaction->output_level());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(3U, compaction->num_input_files(1));
} else {
// The level 3 file was picked. The next compaction will pick
// the L0 file and will detect overlap when adding output
// level inputs
output_level_overlap = true;
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->num_input_files(1));
}
vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
// After recomputing the compaction score, only one marked file will remain
random_index = 0;
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_FALSE(compaction2);
DeleteVersionStorage();
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -120,8 +120,7 @@ class UniversalCompactionBuilder {
LogBuffer* log_buffer_;
static std::vector<SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);
const VersionStorageInfo& vstorage);
// Pick a path ID to place a newly generated file, with its estimated file
// size.
@ -325,8 +324,7 @@ void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
std::vector<UniversalCompactionBuilder::SortedRun>
UniversalCompactionBuilder::CalculateSortedRuns(
const VersionStorageInfo& vstorage, const ImmutableCFOptions& /*ioptions*/,
const MutableCFOptions& mutable_cf_options) {
const VersionStorageInfo& vstorage) {
std::vector<UniversalCompactionBuilder::SortedRun> ret;
for (FileMetaData* f : vstorage.LevelFiles(0)) {
ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
@ -336,27 +334,16 @@ UniversalCompactionBuilder::CalculateSortedRuns(
uint64_t total_compensated_size = 0U;
uint64_t total_size = 0U;
bool being_compacted = false;
bool is_first = true;
for (FileMetaData* f : vstorage.LevelFiles(level)) {
total_compensated_size += f->compensated_file_size;
total_size += f->fd.GetFileSize();
if (mutable_cf_options.compaction_options_universal.allow_trivial_move ==
true) {
if (f->being_compacted) {
being_compacted = f->being_compacted;
}
} else {
// Compaction always includes all files for a non-zero level, so for a
// non-zero level, all the files should share the same being_compacted
// value.
// This assumption is only valid when
// mutable_cf_options.compaction_options_universal.allow_trivial_move
// is false
assert(is_first || f->being_compacted == being_compacted);
}
if (is_first) {
// Size amp, read amp and periodic compactions always include all files
// for a non-zero level. However, a delete triggered compaction and
// a trivial move might pick a subset of files in a sorted run. So
// always check all files in a sorted run and mark the entire run as
// being compacted if one or more files are being compacted
if (f->being_compacted) {
being_compacted = f->being_compacted;
is_first = false;
}
}
if (total_compensated_size > 0) {
@ -372,8 +359,7 @@ UniversalCompactionBuilder::CalculateSortedRuns(
Compaction* UniversalCompactionBuilder::PickCompaction() {
const int kLevel0 = 0;
score_ = vstorage_->CompactionScore(kLevel0);
sorted_runs_ =
CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_);
sorted_runs_ = CalculateSortedRuns(*vstorage_);
if (sorted_runs_.size() == 0 ||
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
@ -855,6 +841,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
std::vector<CompactionInputFiles> inputs;
if (vstorage_->num_levels() == 1) {
#if defined(ENABLE_SINGLE_LEVEL_DTC)
// This is single level universal. Since we're basically trying to reclaim
// space by processing files marked for compaction due to high tombstone
// density, let's do the same thing as compaction to reduce size amp which
@ -877,6 +864,11 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
return nullptr;
}
inputs.push_back(start_level_inputs);
#else
// Disable due to a known race condition.
// TODO: Reenable once the race condition is fixed
return nullptr;
#endif // ENABLE_SINGLE_LEVEL_DTC
} else {
int start_level;

View File

@ -157,42 +157,6 @@ class CorruptionTest : public testing::Test {
ASSERT_GE(max_expected, correct);
}
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt) {
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
const char* msg = strerror(errno);
FAIL() << fname << ": " << msg;
}
if (offset < 0) {
// Relative to end of file; make it absolute
if (-offset > sbuf.st_size) {
offset = 0;
} else {
offset = static_cast<int>(sbuf.st_size + offset);
}
}
if (offset > sbuf.st_size) {
offset = static_cast<int>(sbuf.st_size);
}
if (offset + bytes_to_corrupt > sbuf.st_size) {
bytes_to_corrupt = static_cast<int>(sbuf.st_size - offset);
}
// Do it
std::string contents;
Status s = ReadFileToString(Env::Default(), fname, &contents);
ASSERT_TRUE(s.ok()) << s.ToString();
for (int i = 0; i < bytes_to_corrupt; i++) {
contents[i + offset] ^= 0x80;
}
s = WriteStringToFile(Env::Default(), contents, fname);
ASSERT_TRUE(s.ok()) << s.ToString();
Options options;
EnvOptions env_options;
ASSERT_NOK(VerifySstFileChecksum(options, env_options, fname));
}
void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
// Pick file to corrupt
std::vector<std::string> filenames;
@ -211,7 +175,7 @@ class CorruptionTest : public testing::Test {
}
ASSERT_TRUE(!fname.empty()) << filetype;
CorruptFile(fname, offset, bytes_to_corrupt);
test::CorruptFile(fname, offset, bytes_to_corrupt);
}
// corrupts exactly one file at level `level`. if no file found at level,
@ -221,7 +185,7 @@ class CorruptionTest : public testing::Test {
db_->GetLiveFilesMetaData(&metadata);
for (const auto& m : metadata) {
if (m.level == level) {
CorruptFile(dbname_ + "/" + m.name, offset, bytes_to_corrupt);
test::CorruptFile(dbname_ + "/" + m.name, offset, bytes_to_corrupt);
return;
}
}
@ -556,7 +520,7 @@ TEST_F(CorruptionTest, RangeDeletionCorrupted) {
ImmutableCFOptions(options_), kRangeDelBlock, &range_del_handle));
ASSERT_OK(TryReopen());
CorruptFile(filename, static_cast<int>(range_del_handle.offset()), 1);
test::CorruptFile(filename, static_cast<int>(range_del_handle.offset()), 1);
ASSERT_TRUE(TryReopen().IsCorruption());
}

View File

@ -1295,14 +1295,18 @@ TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
} while (ChangeCompactOptions());
}
TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
TEST_F(DBBasicTest, MultiGetBatchedSortedMultiFile) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
SetPerfLevel(kEnableCount);
// To expand the power of this test, generate > 1 table file and
// mix with memtable
ASSERT_OK(Put(1, "k1", "v1"));
ASSERT_OK(Put(1, "k2", "v2"));
Flush(1);
ASSERT_OK(Put(1, "k3", "v3"));
ASSERT_OK(Put(1, "k4", "v4"));
Flush(1);
ASSERT_OK(Delete(1, "k4"));
ASSERT_OK(Put(1, "k5", "v5"));
ASSERT_OK(Delete(1, "no_key"));
@ -1333,7 +1337,7 @@ TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
ASSERT_TRUE(s[5].IsNotFound());
SetPerfLevel(kDisable);
} while (ChangeCompactOptions());
} while (ChangeOptions());
}
TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
@ -1774,6 +1778,28 @@ TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
}
}
TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(reinterpret_cast<Status*>(arg)) =
Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();
options.best_efforts_recovery = true;
Status s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
#ifndef ROCKSDB_LITE
namespace {
class TableFileListener : public EventListener {
@ -1814,11 +1840,16 @@ TEST_F(DBBasicTest, RecoverWithMissingFiles) {
ASSERT_OK(Flush(static_cast<int>(cf)));
}
// Delete files
// Delete and corrupt files
for (size_t i = 0; i < all_cf_names.size(); ++i) {
std::vector<std::string>& files = listener->GetFiles(all_cf_names[i]);
ASSERT_EQ(3, files.size());
for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
std::string corrupted_data;
ASSERT_OK(ReadFileToString(env_, files[files.size() - 1], &corrupted_data));
ASSERT_OK(WriteStringToFile(
env_, corrupted_data.substr(0, corrupted_data.size() - 2),
files[files.size() - 1], /*should_sync=*/true));
for (int j = static_cast<int>(files.size() - 2); j >= static_cast<int>(i);
--j) {
ASSERT_OK(env_->DeleteFile(files[j]));
}
@ -1850,6 +1881,32 @@ TEST_F(DBBasicTest, RecoverWithMissingFiles) {
}
}
TEST_F(DBBasicTest, BestEffortsRecoveryTryMultipleManifests) {
Options options = CurrentOptions();
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value0"));
ASSERT_OK(Flush());
Close();
{
// Hack by adding a new MANIFEST with high file number
std::string garbage(10, '\0');
ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/MANIFEST-001000",
/*should_sync=*/true));
}
{
// Hack by adding a corrupted SST not referenced by any MANIFEST
std::string garbage(10, '\0');
ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/001001.sst",
/*should_sync=*/true));
}
options.best_efforts_recovery = true;
Reopen(options);
ASSERT_OK(Put("bar", "value"));
}
TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
Options options = CurrentOptions();
options.env = env_;
@ -1873,6 +1930,35 @@ TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
}
}
TEST_F(DBBasicTest, RecoverWithNoManifest) {
Options options = CurrentOptions();
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
Close();
{
// Delete all MANIFEST.
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
for (const auto& file : files) {
uint64_t number = 0;
FileType type = kLogFile;
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
ASSERT_OK(env_->DeleteFile(dbname_ + "/" + file));
}
}
}
options.best_efforts_recovery = true;
options.create_if_missing = false;
Status s = TryReopen(options);
ASSERT_TRUE(s.IsInvalidArgument());
options.create_if_missing = true;
Reopen(options);
// Since no MANIFEST exists, best-efforts recovery creates a new, empty db.
ASSERT_EQ("NOT_FOUND", Get("foo"));
}
TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
Options options = CurrentOptions();
DestroyAndReopen(options);
@ -1913,6 +1999,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
}
#endif // !ROCKSDB_LITE
TEST_F(DBBasicTest, ManifestChecksumMismatch) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("bar", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
auto* crc = reinterpret_cast<uint32_t*>(arg);
*crc = *crc + 1;
});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions write_opts;
write_opts.disableWAL = true;
Status s = db_->Put(write_opts, "foo", "value");
ASSERT_OK(s);
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_OK(Put("foo", "value1"));
ASSERT_OK(Flush());
s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
}
class DBBasicTestMultiGet : public DBTestBase {
public:
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
@ -2511,9 +2624,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
std::shared_ptr<DBBasicTestMultiGetDeadline::DeadlineFS> fs(
new DBBasicTestMultiGetDeadline::DeadlineFS(env_));
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
env_->no_slowdown_ = true;
env_->time_elapse_only_sleep_.store(true);
Options options = CurrentOptions();
env_->SetTimeElapseOnlySleep(&options);
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
BlockBasedTableOptions table_options;

View File

@ -1063,12 +1063,12 @@ TEST_P(DBBloomFilterTestVaryPrefixAndFormatVer, PartitionedMultiGet) {
bbto.partition_filters = true;
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
bbto.whole_key_filtering = !use_prefix_;
bbto.metadata_block_size = 128;
bbto.metadata_block_size = 290;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
ReadOptions ropts;
constexpr uint32_t N = 10000;
constexpr uint32_t N = 12000;
// Add N/2 evens
for (uint32_t i = 0; i < N; i += 2) {
ASSERT_OK(Put(UKey(i), UKey(i)));

View File

@ -2655,7 +2655,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
" guaranteed to be preserved, try larger iter_start_seqnum opt."));
}
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
ColumnFamilyData* cfd = cfh->cfd();
assert(cfd != nullptr);
ReadCallback* read_callback = nullptr; // No read callback provided.
if (read_options.tailing) {
#ifdef ROCKSDB_LITE
@ -2676,10 +2677,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
// WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
result = NewIteratorImpl(read_options, cfd,
(read_options.snapshot != nullptr)
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber,
read_callback);
}
return result;
}
@ -2692,6 +2694,24 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
TEST_SYNC_POINT("DBImpl::NewIterator:1");
TEST_SYNC_POINT("DBImpl::NewIterator:2");
if (snapshot == kMaxSequenceNumber) {
// Note that the snapshot is assigned AFTER referencing the super
// version because otherwise a flush happening in between may compact away
// data for the snapshot, so the reader would see neither data that was be
// visible to the snapshot before compaction nor the newer data inserted
// afterwards.
// Note that the super version might not contain all the data available
// to this snapshot, but in that case it can see all the data in the
// super version, which is a valid consistent state after the user
// calls NewIterator().
snapshot = versions_->LastSequence();
TEST_SYNC_POINT("DBImpl::NewIterator:3");
TEST_SYNC_POINT("DBImpl::NewIterator:4");
}
// Try to generate a DB iterator tree in continuous memory area to be
// cache friendly. Here is an example of result:
// +-------------------------------+

View File

@ -477,6 +477,7 @@ class DBImpl : public DB {
Status GetImpl(const ReadOptions& options, const Slice& key,
GetImplOptions& get_impl_options);
// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,

View File

@ -668,13 +668,15 @@ uint64_t PrecomputeMinLogNumberToKeep(
Status DBImpl::FinishBestEffortsRecovery() {
mutex_.AssertHeld();
std::vector<std::string> paths;
paths.push_back(dbname_);
paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
for (const auto& db_path : immutable_db_options_.db_paths) {
paths.push_back(db_path.path);
paths.push_back(
NormalizePath(db_path.path + std::string(1, kFilePathSeparator)));
}
for (const auto* cfd : *versions_->GetColumnFamilySet()) {
for (const auto& cf_path : cfd->ioptions()->cf_paths) {
paths.push_back(cf_path.path);
paths.push_back(
NormalizePath(cf_path.path + std::string(1, kFilePathSeparator)));
}
}
// Dedup paths
@ -693,7 +695,8 @@ Status DBImpl::FinishBestEffortsRecovery() {
if (!ParseFileName(fname, &number, &type)) {
continue;
}
const std::string normalized_fpath = NormalizePath(path + fname);
// path ends with '/' or '\\'
const std::string normalized_fpath = path + fname;
largest_file_number = std::max(largest_file_number, number);
if (type == kTableFile && number >= next_file_number &&
files_to_delete.find(normalized_fpath) == files_to_delete.end()) {

View File

@ -370,7 +370,30 @@ Status DBImpl::Recover(
}
std::string current_fname = CurrentFileName(dbname_);
s = env_->FileExists(current_fname);
// Path to any MANIFEST file in the db dir. It does not matter which one.
// Since best-efforts recovery ignores CURRENT file, existence of a
// MANIFEST indicates the recovery to recover existing db. If no MANIFEST
// can be found, a new db will be created.
std::string manifest_path;
if (!immutable_db_options_.best_efforts_recovery) {
s = env_->FileExists(current_fname);
} else {
s = Status::NotFound();
std::vector<std::string> files;
// No need to check return value
env_->GetChildren(dbname_, &files);
for (const std::string& file : files) {
uint64_t number = 0;
FileType type = kLogFile; // initialize
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
// Found MANIFEST (descriptor log), thus best-efforts recovery does
// not have to treat the db as empty.
s = Status::OK();
manifest_path = dbname_ + "/" + file;
break;
}
}
}
if (s.IsNotFound()) {
if (immutable_db_options_.create_if_missing) {
s = NewDB();
@ -398,14 +421,14 @@ Status DBImpl::Recover(
FileOptions customized_fs(file_options_);
customized_fs.use_direct_reads |=
immutable_db_options_.use_direct_io_for_flush_and_compaction;
s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
nullptr);
const std::string& fname =
manifest_path.empty() ? current_fname : manifest_path;
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
if (!s.ok()) {
std::string error_str = s.ToString();
// Check if unsupported Direct I/O is the root cause
customized_fs.use_direct_reads = false;
s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
nullptr);
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
if (s.ok()) {
return Status::InvalidArgument(
"Direct I/O is not supported by the specified DB.");
@ -425,6 +448,9 @@ Status DBImpl::Recover(
s = versions_->TryRecover(column_families, read_only, &db_id_,
&missing_table_file);
if (s.ok()) {
// TryRecover may delete previous column_family_set_.
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
s = FinishBestEffortsRecovery();
}
}

View File

@ -45,6 +45,8 @@ class DBSecondaryTest : public DBTestBase {
void OpenSecondary(const Options& options);
Status TryOpenSecondary(const Options& options);
void OpenSecondaryWithColumnFamilies(
const std::vector<std::string>& column_families, const Options& options);
@ -70,9 +72,13 @@ class DBSecondaryTest : public DBTestBase {
};
void DBSecondaryTest::OpenSecondary(const Options& options) {
ASSERT_OK(TryOpenSecondary(options));
}
Status DBSecondaryTest::TryOpenSecondary(const Options& options) {
Status s =
DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
ASSERT_OK(s);
return s;
}
void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
@ -858,6 +864,56 @@ TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) {
thread.join();
ASSERT_TRUE(called);
}
TEST_F(DBSecondaryTest, StartFromInconsistent) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(reinterpret_cast<Status*>(arg)) =
Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();
Options options1;
Status s = TryOpenSecondary(options1);
ASSERT_TRUE(s.IsCorruption());
}
TEST_F(DBSecondaryTest, InconsistencyDuringCatchUp) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
Options options1;
OpenSecondary(options1);
{
std::string value;
ASSERT_OK(db_secondary_->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("value", value);
}
ASSERT_OK(Put("bar", "value1"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(reinterpret_cast<Status*>(arg)) =
Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = db_secondary_->TryCatchUpWithPrimary();
ASSERT_TRUE(s.IsCorruption());
}
#endif //! ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

View File

@ -356,37 +356,25 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
// Turn timed wait into a simulated sleep
uint64_t* abs_time_us = static_cast<uint64_t*>(arg);
int64_t cur_time = 0;
env_->GetCurrentTime(&cur_time);
if (*abs_time_us > static_cast<uint64_t>(cur_time)) {
env_->addon_time_.fetch_add(*abs_time_us -
static_cast<uint64_t>(cur_time));
uint64_t cur_time = env_->NowMicros();
if (*abs_time_us > cur_time) {
env_->addon_time_.fetch_add(*abs_time_us - cur_time);
}
// Randomly sleep shortly
env_->addon_time_.fetch_add(
static_cast<uint64_t>(Random::GetTLSInstance()->Uniform(10)));
// Set wait until time to before current to force not to sleep.
int64_t real_cur_time = 0;
Env::Default()->GetCurrentTime(&real_cur_time);
*abs_time_us = static_cast<uint64_t>(real_cur_time);
// Set wait until time to before (actual) current time to force not
// to sleep
*abs_time_us = Env::Default()->NowMicros();
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
env_->no_slowdown_ = true;
env_->time_elapse_only_sleep_ = true;
Options options = CurrentOptions();
env_->SetTimeElapseOnlySleep(&options);
options.disable_auto_compactions = true;
// Need to disable stats dumping and persisting which also use
// RepeatableThread, one of whose member variables is of type
// InstrumentedCondVar. The callback for
// InstrumentedCondVar::TimedWaitInternal can be triggered by stats dumping
// and persisting threads and cause time_spent_deleting measurement to become
// incorrect.
options.stats_dump_period_sec = 0;
options.stats_persist_period_sec = 0;
options.env = env_;
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec

View File

@ -2688,6 +2688,94 @@ TEST_F(DBTest2, OptimizeForSmallDB) {
#endif // ROCKSDB_LITE
TEST_F(DBTest2, IterRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
{"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
});
// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, IterRaceFlush2) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
{"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
});
// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, IterRefreshRaceFlush) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"ArenaWrappedDBIter::Refresh:1", "DBTest2::IterRefreshRaceFlush:1"},
{"DBTest2::IterRefreshRaceFlush:2", "ArenaWrappedDBIter::Refresh:2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
});
// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Refresh();
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, GetRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));
@ -4328,6 +4416,24 @@ TEST_F(DBTest2, SameSmallestInSameLevel) {
ASSERT_EQ("2,3,4,5,6,7,8", Get("key"));
}
TEST_F(DBTest2, FileConsistencyCheckInOpen) {
Put("foo", "bar");
Flush();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
Status* ret_s = static_cast<Status*>(arg);
*ret_s = Status::Corruption("fcc");
});
SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.force_consistency_checks = true;
ASSERT_NOK(TryReopen(options));
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, BlockBasedTablePrefixIndexSeekForPrev) {
// create a DB with block prefix index
BlockBasedTableOptions table_options;

View File

@ -14,10 +14,19 @@
namespace ROCKSDB_NAMESPACE {
namespace {
int64_t MaybeCurrentTime(Env* env) {
int64_t time = 1337346000; // arbitrary fallback default
(void)env->GetCurrentTime(&time);
return time;
}
} // namespace
// Special Env used to delay background operations
SpecialEnv::SpecialEnv(Env* base)
: EnvWrapper(base),
maybe_starting_time_(MaybeCurrentTime(base)),
rnd_(301),
sleep_counter_(this),
addon_time_(0),

View File

@ -502,10 +502,14 @@ class SpecialEnv : public EnvWrapper {
virtual Status GetCurrentTime(int64_t* unix_time) override {
Status s;
if (!time_elapse_only_sleep_) {
if (time_elapse_only_sleep_) {
*unix_time = maybe_starting_time_;
} else {
s = target()->GetCurrentTime(unix_time);
}
if (s.ok()) {
// FIXME: addon_time_ sometimes used to mean seconds (here) and
// sometimes microseconds
*unix_time += addon_time_.load();
}
return s;
@ -531,6 +535,20 @@ class SpecialEnv : public EnvWrapper {
return target()->DeleteFile(fname);
}
void SetTimeElapseOnlySleep(Options* options) {
time_elapse_only_sleep_ = true;
no_slowdown_ = true;
// Need to disable stats dumping and persisting which also use
// RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
// With time_elapse_only_sleep_, this can hang on some platforms.
// TODO: why? investigate/fix
options->stats_dump_period_sec = 0;
options->stats_persist_period_sec = 0;
}
// Something to return when mocking current time
const int64_t maybe_starting_time_;
Random rnd_;
port::Mutex rnd_mutex_; // Lock to pretect rnd_

View File

@ -1953,6 +1953,7 @@ TEST_F(DBTestUniversalCompaction2, BasicL0toL1) {
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
#if defined(ENABLE_SINGLE_LEVEL_DTC)
TEST_F(DBTestUniversalCompaction2, SingleLevel) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
@ -1991,6 +1992,7 @@ TEST_F(DBTestUniversalCompaction2, SingleLevel) {
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
#endif // ENABLE_SINGLE_LEVEL_DTC
TEST_F(DBTestUniversalCompaction2, MultipleLevels) {
const int kWindowSize = 100;

View File

@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
// Compute the crc of the record type and the payload.
crc = crc32c::Extend(crc, ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage
TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
&crc);
EncodeFixed32(buf, crc);
// Write the header and the payload

View File

@ -316,7 +316,10 @@ class VersionBuilder::Rep {
}
}
return Status::OK();
Status ret_s;
TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistencyBeforeReturn",
&ret_s);
return ret_s;
}
Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
@ -463,7 +466,10 @@ class VersionBuilder::Rep {
const auto number = del_file.second;
if (level < num_levels_) {
levels_[level].deleted_files.insert(number);
CheckConsistencyForDeletes(edit, number, level);
s = CheckConsistencyForDeletes(edit, number, level);
if (!s.ok()) {
return s;
}
auto exising = levels_[level].added_files.find(number);
if (exising != levels_[level].added_files.end()) {

View File

@ -839,6 +839,36 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbage) {
UnrefFilesInVersion(&new_vstorage);
}
TEST_F(VersionBuilderTest, CheckConsistencyForFileDeletedTwice) {
Add(0, 1U, "150", "200", 100U);
UpdateVersionStorageInfo();
VersionEdit version_edit;
version_edit.DeleteFile(0, 1U);
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
constexpr VersionSet* version_set = nullptr;
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
&vstorage_, version_set);
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, nullptr,
true /* force_consistency_checks */);
ASSERT_OK(version_builder.Apply(&version_edit));
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
VersionBuilder version_builder2(env_options, &ioptions_, table_cache,
&new_vstorage, version_set);
VersionStorageInfo new_vstorage2(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, nullptr,
true /* force_consistency_checks */);
ASSERT_NOK(version_builder2.Apply(&version_edit));
UnrefFilesInVersion(&new_vstorage);
UnrefFilesInVersion(&new_vstorage2);
}
TEST_F(VersionBuilderTest, EstimatedActiveKeys) {
const uint32_t kTotalSamples = 20;
const uint32_t kNumLevels = 5;

View File

@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler(
assert(version_set_ != nullptr);
}
Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id) {
Slice record;
std::string scratch;
assert(log_read_status);
assert(log_read_status->ok());
size_t recovered_edits = 0;
Status s = Initialize();
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
log_read_status->ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
}
}
}
if (!log_read_status->ok()) {
s = *log_read_status;
}
CheckIterationResult(reader, &s);
if (!s.ok()) {
status_ = s;
}
return s;
}
Status VersionEditHandler::Initialize() {
@ -377,6 +384,7 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
ColumnFamilyData* cfd,
bool force_create_version) {
assert(cfd->initialized());
Status s;
if (force_create_version) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
@ -384,13 +392,18 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
auto* v = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
version_set_->current_version_number_++);
builder->SaveTo(v->storage_info());
// Install new version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(version_set_->db_options_->skip_stats_update_on_db_open));
version_set_->AppendVersion(cfd, v);
s = builder->SaveTo(v->storage_info());
if (s.ok()) {
// Install new version
v->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!(version_set_->db_options_->skip_stats_update_on_db_open));
version_set_->AppendVersion(cfd, v);
} else {
delete v;
}
}
return Status::OK();
return s;
}
Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
@ -398,18 +411,18 @@ Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
bool is_initial_load) {
assert(cfd != nullptr);
assert(!cfd->IsDropped());
Status s;
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
assert(builder_iter->second != nullptr);
VersionBuilder* builder = builder_iter->second->version_builder();
assert(builder);
s = builder->LoadTableHandlers(
Status s = builder->LoadTableHandlers(
cfd->internal_stats(),
version_set_->db_options_->max_file_opening_threads,
prefetch_index_and_filter_in_cache, is_initial_load,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
if (s.IsPathNotFound() && no_error_if_table_files_missing_) {
if ((s.IsPathNotFound() || s.IsCorruption()) &&
no_error_if_table_files_missing_) {
s = Status::OK();
}
if (!s.ok() && !version_set_->db_options_->paranoid_checks) {
@ -540,9 +553,11 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
const std::string fpath =
MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num);
s = version_set_->VerifyFileMetadata(fpath, meta);
if (s.IsPathNotFound() || s.IsNotFound()) {
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
missing_files.insert(file_num);
s = Status::OK();
} else if (!s.ok()) {
break;
}
}
bool missing_info = !version_edit_params_.has_log_number_ ||
@ -550,24 +565,29 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
!version_edit_params_.has_last_sequence_;
// Create version before apply edit
if (!missing_info && ((!missing_files.empty() && !prev_has_missing_files) ||
(missing_files.empty() && force_create_version))) {
if (s.ok() && !missing_info &&
((!missing_files.empty() && !prev_has_missing_files) ||
(missing_files.empty() && force_create_version))) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
auto* version = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
version_set_->current_version_number_++);
builder->SaveTo(version->storage_info());
version->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!version_set_->db_options_->skip_stats_update_on_db_open);
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
delete v_iter->second;
v_iter->second = version;
s = builder->SaveTo(version->storage_info());
if (s.ok()) {
version->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!version_set_->db_options_->skip_stats_update_on_db_open);
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
delete v_iter->second;
v_iter->second = version;
} else {
versions_.emplace(cfd->GetID(), version);
}
} else {
versions_.emplace(cfd->GetID(), version);
delete version;
}
}
return s;

View File

@ -40,7 +40,8 @@ class VersionEditHandler {
virtual ~VersionEditHandler() {}
Status Iterate(log::Reader& reader, std::string* db_id);
void Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id);
const Status& status() const { return status_; }

View File

@ -10,6 +10,7 @@
#include "db/version_set.h"
#include <stdio.h>
#include <algorithm>
#include <array>
#include <cinttypes>
@ -19,6 +20,7 @@
#include <string>
#include <unordered_map>
#include <vector>
#include "compaction/compaction.h"
#include "db/internal_stats.h"
#include "db/log_reader.h"
@ -50,6 +52,7 @@
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
@ -2087,6 +2090,9 @@ void VersionStorageInfo::GenerateLevelFilesBrief() {
void Version::PrepareApply(
const MutableCFOptions& mutable_cf_options,
bool update_stats) {
TEST_SYNC_POINT_CALLBACK(
"Version::PrepareApply:forced_check",
reinterpret_cast<void*>(&storage_info_.force_consistency_checks_));
UpdateAccumulatedStats(update_stats);
storage_info_.UpdateNumNonEmptyLevels();
storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
@ -2388,6 +2394,11 @@ void VersionStorageInfo::ComputeCompactionScore(
// compaction score for the whole DB. Adding other levels as if
// they are L0 files.
for (int i = 1; i < num_levels(); i++) {
// Its possible that a subset of the files in a level may be in a
// compaction, due to delete triggered compaction or trivial move.
// In that case, the below check may not catch a level being
// compacted as it only checks the first file. The worst that can
// happen is a scheduled compaction thread will find nothing to do.
if (!files_[i].empty() && !files_[i][0]->being_compacted) {
num_sorted_runs++;
}
@ -4375,24 +4386,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
if (dbname.back() != '/') {
manifest_path->push_back('/');
}
*manifest_path += fname;
manifest_path->append(fname);
return Status::OK();
}
Status VersionSet::ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
builders,
VersionEditParams* version_edit_params, std::string* db_id) {
assert(reader != nullptr);
Status* log_read_status, VersionEditParams* version_edit_params,
std::string* db_id) {
assert(read_buffer != nullptr);
assert(log_read_status != nullptr);
Status s;
Slice record;
std::string scratch;
size_t recovered_edits = 0;
while (reader->ReadRecord(&record, &scratch) && s.ok()) {
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
log_read_status->ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
@ -4436,6 +4449,9 @@ Status VersionSet::ReadAndRecover(
}
}
}
if (!log_read_status->ok()) {
s = *log_read_status;
}
if (!s.ok()) {
// Clear the buffer if we fail to decode/apply an edit.
read_buffer->Clear();
@ -4482,8 +4498,7 @@ Status VersionSet::Recover(
db_options_->log_readahead_size));
}
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
VersionBuilderMap builders;
// add default column family
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
@ -4505,12 +4520,13 @@ Status VersionSet::Recover(
VersionEditParams version_edit_params;
{
VersionSet::LogReporter reporter;
reporter.status = &s;
Status log_read_status;
reporter.status = &log_read_status;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
AtomicGroupReadBuffer read_buffer;
s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
column_families_not_found, builders,
s = ReadAndRecover(reader, &read_buffer, cf_name_to_options,
column_families_not_found, builders, &log_read_status,
&version_edit_params, db_id);
current_manifest_file_size = reader.GetReadOffset();
assert(current_manifest_file_size != 0);
@ -4594,7 +4610,11 @@ Status VersionSet::Recover(
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
s = builder->SaveTo(v->storage_info());
if (!s.ok()) {
delete v;
return s;
}
// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
@ -4771,21 +4791,20 @@ Status VersionSet::TryRecoverFromOneManifest(
db_options_->log_readahead_size));
}
assert(s.ok());
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
/*checksum=*/true, /*log_num=*/0);
{
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
const_cast<VersionSet*>(this));
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
const_cast<VersionSet*>(this));
s = handler_pit.Iterate(reader, db_id);
handler_pit.Iterate(reader, &s, db_id);
assert(nullptr != has_missing_table_file);
*has_missing_table_file = handler_pit.HasMissingFiles();
}
assert(nullptr != has_missing_table_file);
*has_missing_table_file = handler_pit.HasMissingFiles();
return s;
return handler_pit.status();
}
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
@ -5145,7 +5164,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
s = builder->SaveTo(v->storage_info());
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
printf("--------------- Column family \"%s\" (ID %" PRIu32
@ -5866,8 +5885,7 @@ Status ReactiveVersionSet::Recover(
// In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier.
default_cfd->set_initialized();
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
VersionBuilderMap builders;
std::unordered_map<int, std::string> column_families_not_found;
builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
@ -5875,7 +5893,7 @@ Status ReactiveVersionSet::Recover(
manifest_reader_status->reset(new Status());
manifest_reporter->reset(new LogReporter());
static_cast<LogReporter*>(manifest_reporter->get())->status =
static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
manifest_reader_status->get();
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
log::Reader* reader = manifest_reader->get();
@ -5884,10 +5902,9 @@ Status ReactiveVersionSet::Recover(
VersionEdit version_edit;
while (s.ok() && retry < 1) {
assert(reader != nullptr);
Slice record;
std::string scratch;
s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
column_families_not_found, builders, &version_edit);
s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
column_families_not_found, builders,
manifest_reader_status->get(), &version_edit);
if (s.ok()) {
bool enough = version_edit.has_next_file_number_ &&
version_edit.has_log_number_ &&
@ -5966,13 +5983,23 @@ Status ReactiveVersionSet::Recover(
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
s = builder->SaveTo(v->storage_info());
// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(db_options_->skip_stats_update_on_db_open));
AppendVersion(cfd, v);
if (s.ok()) {
// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(db_options_->skip_stats_update_on_db_open));
AppendVersion(cfd, v);
} else {
ROCKS_LOG_ERROR(db_options_->info_log,
"[%s]: inconsistent version: %s\n",
cfd->GetName().c_str(), s.ToString().c_str());
delete v;
break;
}
}
}
if (s.ok()) {
next_file_number_.store(version_edit.next_file_number_ + 1);
last_allocated_sequence_ = version_edit.last_sequence_;
last_published_sequence_ = version_edit.last_sequence_;
@ -6049,6 +6076,8 @@ Status ReactiveVersionSet::ReadAndApply(
s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
if (s.ok()) {
applied_edits++;
} else {
break;
}
}
}
@ -6058,13 +6087,14 @@ Status ReactiveVersionSet::ReadAndApply(
}
// It's possible that:
// 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
// Or the version(s) rebuilt from tailing the MANIFEST is inconsistent.
// 2) we have finished reading the current MANIFEST.
// 3) we have encountered an IOError reading the current MANIFEST.
// We need to look for the next MANIFEST and start from there. If we cannot
// find the next MANIFEST, we should exit the loop.
s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
reader = manifest_reader->get();
if (s.ok()) {
if (tmp_s.ok()) {
if (reader->file()->file_name() == old_manifest_path) {
// Still processing the same MANIFEST, thus no need to continue this
// loop since no record is available if we have reached here.
@ -6094,6 +6124,7 @@ Status ReactiveVersionSet::ReadAndApply(
number_of_edits_to_skip_ += 2;
}
}
s = tmp_s;
}
}
}
@ -6186,12 +6217,16 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
auto version = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(version->storage_info());
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
AppendVersion(cfd, version);
active_version_builders_.erase(builder_iter);
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
s = builder->SaveTo(version->storage_info());
if (s.ok()) {
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
AppendVersion(cfd, version);
active_version_builders_.erase(builder_iter);
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
}
} else {
delete version;
}
} else if (s.IsPathNotFound()) {
s = Status::OK();
@ -6199,23 +6234,25 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
// Some other error has occurred during LoadTableHandlers.
}
if (version_edit->HasNextFile()) {
next_file_number_.store(version_edit->next_file_number_ + 1);
if (s.ok()) {
if (version_edit->HasNextFile()) {
next_file_number_.store(version_edit->next_file_number_ + 1);
}
if (version_edit->has_last_sequence_) {
last_allocated_sequence_ = version_edit->last_sequence_;
last_published_sequence_ = version_edit->last_sequence_;
last_sequence_ = version_edit->last_sequence_;
}
if (version_edit->has_prev_log_number_) {
prev_log_number_ = version_edit->prev_log_number_;
MarkFileNumberUsed(version_edit->prev_log_number_);
}
if (version_edit->has_log_number_) {
MarkFileNumberUsed(version_edit->log_number_);
}
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
}
if (version_edit->has_last_sequence_) {
last_allocated_sequence_ = version_edit->last_sequence_;
last_published_sequence_ = version_edit->last_sequence_;
last_sequence_ = version_edit->last_sequence_;
}
if (version_edit->has_prev_log_number_) {
prev_log_number_ = version_edit->prev_log_number_;
MarkFileNumberUsed(version_edit->prev_log_number_);
}
if (version_edit->has_log_number_) {
MarkFileNumberUsed(version_edit->log_number_);
}
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
return s;
}

View File

@ -1102,6 +1102,10 @@ class VersionSet {
void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
protected:
using VersionBuilderMap =
std::unordered_map<uint32_t,
std::unique_ptr<BaseReferencedVersionBuilder>>;
struct ManifestWriter;
friend class Version;
@ -1113,7 +1117,9 @@ class VersionSet {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t /*bytes*/, const Status& s) override {
if (this->status->ok()) *this->status = s;
if (status->ok()) {
*status = s;
}
}
};
@ -1144,13 +1150,14 @@ class VersionSet {
const VersionEdit* edit);
Status ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>&
name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
VersionEditParams* version_edit, std::string* db_id = nullptr);
Status* log_read_status, VersionEditParams* version_edit,
std::string* db_id = nullptr);
// REQUIRES db mutex
Status ApplyOneVersionEditToBuilder(
@ -1279,8 +1286,7 @@ class ReactiveVersionSet : public VersionSet {
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
private:
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
active_version_builders_;
VersionBuilderMap active_version_builders_;
AtomicGroupReadBuffer read_buffer_;
// Number of version edits to skip by ReadAndApply at the beginning of a new
// MANIFEST created by primary.

View File

@ -1370,7 +1370,7 @@ TEST_F(VersionSetAtomicGroupTest,
// Write the corrupted edits.
AddNewEditsToLog(kAtomicGroupSize);
mu.Lock();
EXPECT_OK(
EXPECT_NOK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
@ -1420,7 +1420,7 @@ TEST_F(VersionSetAtomicGroupTest,
&manifest_reader_status));
AddNewEditsToLog(kAtomicGroupSize);
mu.Lock();
EXPECT_OK(
EXPECT_NOK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
EXPECT_EQ(edits_[1].DebugString(),

10
env/fs_posix.cc vendored
View File

@ -241,6 +241,8 @@ class PosixFileSystem : public FileSystem {
s = IOError("while mmap file for read", fname, errno);
close(fd);
}
} else {
close(fd);
}
} else {
if (options.use_direct_reads && !options.use_mmap_reads) {
@ -889,14 +891,16 @@ class PosixFileSystem : public FileSystem {
if (fd < 0) {
return IOError("While open for IsDirectory()", path, errno);
}
IOStatus io_s;
struct stat sbuf;
if (fstat(fd, &sbuf) < 0) {
return IOError("While doing stat for IsDirectory()", path, errno);
io_s = IOError("While doing stat for IsDirectory()", path, errno);
}
if (nullptr != is_dir) {
close(fd);
if (io_s.ok() && nullptr != is_dir) {
*is_dir = S_ISDIR(sbuf.st_mode);
}
return IOStatus::OK();
return io_s;
}
FileOptions OptimizeForLogWrite(const FileOptions& file_options,

View File

@ -62,13 +62,13 @@ class RandomAccessFileReader {
public:
explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
Env* _env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: file_(std::move(raf)),
file_name_(std::move(_file_name)),
env_(env),
env_(_env),
stats_(stats),
hist_type_(hist_type),
file_read_hist_(file_read_hist),

View File

@ -16,7 +16,17 @@
#pragma once
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
#include <stdio.h>
#include <stdlib.h>
#endif
#include <string>
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
#include "port/stack_trace.h"
#endif
#include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE {
@ -25,7 +35,16 @@ class Status {
public:
// Create a success status.
Status() : code_(kOk), subcode_(kNone), sev_(kNoError), state_(nullptr) {}
~Status() { delete[] state_; }
~Status() {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (!checked_) {
fprintf(stderr, "Failed to check Status\n");
port::PrintStack();
abort();
}
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
delete[] state_;
}
// Copy the specified status.
Status(const Status& s);
@ -362,8 +381,6 @@ inline Status::Status(const Status& s, Severity sev)
state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_);
}
inline Status& Status::operator=(const Status& s) {
// The following condition catches both aliasing (when this == &s),
// and the common case where both s and *this are ok.
if (this != &s) {
code_ = s.code_;
subcode_ = s.subcode_;

View File

@ -25,6 +25,7 @@
#include "rocksdb/cache.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
@ -40,11 +41,8 @@ class TableBuilder;
class TableFactory;
class TableReader;
class WritableFileWriter;
struct ColumnFamilyOptions;
struct ConfigOptions;
struct DBOptions;
struct EnvOptions;
struct Options;
enum ChecksumType : char {
kNoChecksum = 0x0,

View File

@ -59,6 +59,7 @@ class LDBCommand {
static const std::string ARG_FILE_SIZE;
static const std::string ARG_CREATE_IF_MISSING;
static const std::string ARG_NO_VALUE;
static const std::string ARG_DISABLE_CONSISTENCY_CHECKS;
struct ParsedParams {
std::string cmd;
@ -163,6 +164,9 @@ class LDBCommand {
// If true, try to construct options from DB's option files.
bool try_load_options_;
// The value passed to options.force_consistency_checks.
bool force_consistency_checks_;
bool create_if_missing_;
/**

View File

@ -5,8 +5,8 @@
#pragma once
#define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 9
#define ROCKSDB_PATCH 0
#define ROCKSDB_MINOR 10
#define ROCKSDB_PATCH 4
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -647,7 +647,12 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
"block_size_deviation=8;block_restart_interval=4;"
"format_version=5;whole_key_filtering=1;"
"filter_policy=bloomfilter:4.567:false;",
"filter_policy=bloomfilter:4.567:false;"
// A bug caused read_amp_bytes_per_bit to be a large integer in OPTIONS
// file generated by 6.10 to 6.14. Though bug is fixed in these releases,
// we need to handle the case of loading OPTIONS file generated before the
// fix.
"read_amp_bytes_per_bit=17179869185;",
&new_opt));
ASSERT_TRUE(new_opt.cache_index_and_filter_blocks);
ASSERT_EQ(new_opt.index_type, BlockBasedTableOptions::kHashSearch);
@ -668,6 +673,9 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
dynamic_cast<const BloomFilterPolicy&>(*new_opt.filter_policy);
EXPECT_EQ(bfp.GetMillibitsPerKey(), 4567);
EXPECT_EQ(bfp.GetWholeBitsPerKey(), 5);
// Verify that only the lower 32bits are stored in
// new_opt.read_amp_bytes_per_bit.
EXPECT_EQ(1U, new_opt.read_amp_bytes_per_bit);
// unknown option
ASSERT_NOK(GetBlockBasedTableOptionsFromString(
@ -1279,7 +1287,7 @@ TEST_F(OptionsTest, ConvertOptionsTest) {
// This test suite tests the old APIs into the Configure options methods.
// Once those APIs are officially deprecated, this test suite can be deleted.
class OptionsOldApiTest : public testing::Test {};
TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
std::unordered_map<std::string, std::string> cf_options_map = {
{"write_buffer_size", "1"},
@ -1744,7 +1752,7 @@ TEST_F(OptionsOldApiTest, GetColumnFamilyOptionsFromStringTest) {
ASSERT_TRUE(new_cf_opt.memtable_factory != nullptr);
ASSERT_EQ(std::string(new_cf_opt.memtable_factory->Name()), "SkipListFactory");
}
TEST_F(OptionsOldApiTest, GetBlockBasedTableOptionsFromString) {
BlockBasedTableOptions table_opt;
BlockBasedTableOptions new_opt;
@ -1919,7 +1927,7 @@ TEST_F(OptionsOldApiTest, GetBlockBasedTableOptionsFromString) {
->GetHighPriPoolRatio(),
0.5);
}
TEST_F(OptionsOldApiTest, GetPlainTableOptionsFromString) {
PlainTableOptions table_opt;
PlainTableOptions new_opt;
@ -1950,7 +1958,7 @@ TEST_F(OptionsOldApiTest, GetPlainTableOptionsFromString) {
"encoding_type=kPrefixXX",
&new_opt));
}
TEST_F(OptionsOldApiTest, GetOptionsFromStringTest) {
Options base_options, new_options;
base_options.write_buffer_size = 20;
@ -2504,7 +2512,7 @@ TEST_F(OptionsParserTest, Readahead) {
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(kOptionsFileName, &file_size));
assert(file_size > 0);
RocksDBOptionsParser parser;
env_->num_seq_file_read_ = 0;

View File

@ -300,8 +300,24 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionTypeFlags::kNone, 0}},
{"read_amp_bytes_per_bit",
{offsetof(struct BlockBasedTableOptions, read_amp_bytes_per_bit),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone, 0}},
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone, 0,
[](const ConfigOptions& /*opts*/, const std::string& /*name*/,
const std::string& value, char* addr) {
// A workaround to fix a bug in 6.10, 6.11, 6.12, 6.13
// and 6.14. The bug will write out 8 bytes to OPTIONS file from the
// starting address of BlockBasedTableOptions.read_amp_bytes_per_bit
// which is actually a uint32. Consequently, the value of
// read_amp_bytes_per_bit written in the OPTIONS file is wrong.
// From 6.15, RocksDB will try to parse the read_amp_bytes_per_bit
// from OPTIONS file as a uint32. To be able to load OPTIONS file
// generated by affected releases before the fix, we need to
// manually parse read_amp_bytes_per_bit with this special hack.
uint64_t read_amp_bytes_per_bit = ParseUint64(value);
*(reinterpret_cast<uint32_t*>(addr)) =
static_cast<uint32_t>(read_amp_bytes_per_bit);
return Status::OK();
}}},
{"enable_index_compression",
{offsetof(struct BlockBasedTableOptions, enable_index_compression),
OptionType::kBoolean, OptionVerificationType::kNormal,

View File

@ -2461,13 +2461,16 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
ExtractUserKey(v.first_internal_key)) < 0)) {
// The requested key falls between highest key in previous block and
// lowest key in current block.
*(miter->s) = iiter->status();
if (!iiter->status().IsNotFound()) {
*(miter->s) = iiter->status();
}
data_block_range.SkipKey(miter);
sst_file_range.SkipKey(miter);
continue;
}
if (!uncompression_dict_status.ok()) {
assert(!uncompression_dict_status.IsNotFound());
*(miter->s) = uncompression_dict_status;
data_block_range.SkipKey(miter);
sst_file_range.SkipKey(miter);
@ -2680,7 +2683,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
rep_->level);
}
if (s.ok()) {
if (s.ok() && !iiter->status().IsNotFound()) {
s = iiter->status();
}
*(miter->s) = s;
@ -2815,6 +2818,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
break;
}
}
if (s.ok()) {
// In the case of two level indexes, we would have exited the above loop
// by checking index_iter->Valid(), but Valid() might have returned false
// due to an IO error. So check the index_iter status
s = index_iter->status();
}
return s;
}

View File

@ -252,6 +252,7 @@ class BlockBasedTable : public TableReader {
private:
friend class MockedBlockBasedTable;
friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
static std::atomic<uint64_t> next_cache_key_id_;
BlockCacheTracer* const block_cache_tracer_;

View File

@ -0,0 +1,341 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "table/block_based/block_based_table_reader.h"
#include "rocksdb/file_system.h"
#include "table/block_based/partitioned_index_iterator.h"
#include "db/table_properties_collector.h"
#include "options/options_helper.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "table/block_based/block_based_table_builder.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/format.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
namespace ROCKSDB_NAMESPACE {
class BlockBasedTableReaderTest
: public testing::Test,
public testing::WithParamInterface<std::tuple<
CompressionType, bool, BlockBasedTableOptions::IndexType, bool>> {
protected:
CompressionType compression_type_;
bool use_direct_reads_;
void SetUp() override {
BlockBasedTableOptions::IndexType index_type;
bool no_block_cache;
std::tie(compression_type_, use_direct_reads_, index_type, no_block_cache) =
GetParam();
test::SetupSyncPointsToMockDirectIO();
test_dir_ = test::PerThreadDBPath("block_based_table_reader_test");
env_ = Env::Default();
fs_ = FileSystem::Default();
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
BlockBasedTableOptions opts;
opts.index_type = index_type;
opts.no_block_cache = no_block_cache;
table_factory_.reset(
static_cast<BlockBasedTableFactory*>(NewBlockBasedTableFactory(opts)));
}
void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); }
// Creates a table with the specificied key value pairs (kv).
void CreateTable(const std::string& table_name,
const CompressionType& compression_type,
const std::map<std::string, std::string>& kv) {
std::unique_ptr<WritableFileWriter> writer;
NewFileWriter(table_name, &writer);
// Create table builder.
Options options;
ImmutableCFOptions ioptions(options);
InternalKeyComparator comparator(options.comparator);
ColumnFamilyOptions cf_options;
MutableCFOptions moptions(cf_options);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>> factories;
std::unique_ptr<TableBuilder> table_builder(table_factory_->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, comparator, &factories,
compression_type, 0 /* sample_for_compression */,
CompressionOptions(), false /* skip_filters */,
kDefaultColumnFamilyName, -1 /* level */),
0 /* column_family_id */, writer.get()));
// Build table.
for (auto it = kv.begin(); it != kv.end(); it++) {
std::string k = ToInternalKey(it->first);
std::string v = it->second;
table_builder->Add(k, v);
}
ASSERT_OK(table_builder->Finish());
}
void NewBlockBasedTableReader(const FileOptions& foptions,
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& comparator,
const std::string& table_name,
std::unique_ptr<BlockBasedTable>* table) {
std::unique_ptr<RandomAccessFileReader> file;
NewFileReader(table_name, foptions, &file);
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size));
std::unique_ptr<TableReader> table_reader;
ASSERT_OK(BlockBasedTable::Open(ioptions, EnvOptions(),
table_factory_->table_options(), comparator,
std::move(file), file_size, &table_reader));
table->reset(reinterpret_cast<BlockBasedTable*>(table_reader.release()));
}
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
const std::shared_ptr<FileSystem>& fs() const { return fs_; }
private:
std::string test_dir_;
Env* env_;
std::shared_ptr<FileSystem> fs_;
std::unique_ptr<BlockBasedTableFactory> table_factory_;
void WriteToFile(const std::string& content, const std::string& filename) {
std::unique_ptr<FSWritableFile> f;
ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr));
ASSERT_OK(f->Append(content, IOOptions(), nullptr));
ASSERT_OK(f->Close(IOOptions(), nullptr));
}
void NewFileWriter(const std::string& filename,
std::unique_ptr<WritableFileWriter>* writer) {
std::string path = Path(filename);
EnvOptions env_options;
FileOptions foptions;
std::unique_ptr<FSWritableFile> file;
ASSERT_OK(fs_->NewWritableFile(path, foptions, &file, nullptr));
writer->reset(new WritableFileWriter(std::move(file), path, env_options));
}
void NewFileReader(const std::string& filename, const FileOptions& opt,
std::unique_ptr<RandomAccessFileReader>* reader) {
std::string path = Path(filename);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
reader->reset(new RandomAccessFileReader(std::move(f), path, env_));
}
std::string ToInternalKey(const std::string& key) {
InternalKey internal_key(key, 0, ValueType::kTypeValue);
return internal_key.Encode().ToString();
}
};
// Tests MultiGet in both direct IO and non-direct IO mode.
// The keys should be in cache after MultiGet.
TEST_P(BlockBasedTableReaderTest, MultiGet) {
// Prepare key-value pairs to occupy multiple blocks.
// Each value is 256B, every 16 pairs constitute 1 block.
// Adjacent blocks contain values with different compression complexity:
// human readable strings are easier to compress than random strings.
std::map<std::string, std::string> kv;
{
Random rnd(101);
uint32_t key = 0;
for (int block = 0; block < 100; block++) {
for (int i = 0; i < 16; i++) {
char k[9] = {0};
// Internal key is constructed directly from this key,
// and internal key size is required to be >= 8 bytes,
// so use %08u as the format string.
sprintf(k, "%08u", key);
std::string v;
if (block % 2) {
v = test::RandomHumanReadableString(&rnd, 256);
} else {
test::RandomString(&rnd, 256, &v);
}
kv[std::string(k)] = v;
key++;
}
}
}
// Prepare keys, values, and statuses for MultiGet.
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys;
autovector<PinnableSlice, MultiGetContext::MAX_BATCH_SIZE> values;
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
{
const int step =
static_cast<int>(kv.size()) / MultiGetContext::MAX_BATCH_SIZE;
auto it = kv.begin();
for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE; i++) {
keys.emplace_back(it->first);
values.emplace_back();
statuses.emplace_back();
std::advance(it, step);
}
}
std::string table_name =
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
CreateTable(table_name, compression_type_, kv);
std::unique_ptr<BlockBasedTable> table;
Options options;
ImmutableCFOptions ioptions(options);
FileOptions foptions;
foptions.use_direct_reads = use_direct_reads_;
InternalKeyComparator comparator(options.comparator);
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
// Ensure that keys are not in cache before MultiGet.
for (auto& key : keys) {
ASSERT_FALSE(table->TEST_KeyInCache(ReadOptions(), key));
}
// Prepare MultiGetContext.
autovector<GetContext, MultiGetContext::MAX_BATCH_SIZE> get_context;
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
for (size_t i = 0; i < keys.size(); ++i) {
get_context.emplace_back(
BytewiseComparator(), nullptr, nullptr, nullptr, GetContext::kNotFound,
keys[i], &values[i], nullptr, nullptr, nullptr, true /* do_merge */,
nullptr, nullptr, nullptr, nullptr, nullptr, nullptr);
key_context.emplace_back(nullptr, keys[i], &values[i], nullptr,
&statuses.back());
key_context.back().get_context = &get_context.back();
}
for (auto& key_ctx : key_context) {
sorted_keys.emplace_back(&key_ctx);
}
MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, ReadOptions());
// Execute MultiGet.
MultiGetContext::Range range = ctx.GetMultiGetRange();
table->MultiGet(ReadOptions(), &range, nullptr);
for (const Status& status : statuses) {
ASSERT_OK(status);
}
// Check that keys are in cache after MultiGet.
for (size_t i = 0; i < keys.size(); i++) {
ASSERT_TRUE(table->TEST_KeyInCache(ReadOptions(), keys[i]));
ASSERT_EQ(values[i].ToString(), kv[keys[i].ToString()]);
}
}
class BlockBasedTableReaderTestVerifyChecksum
: public BlockBasedTableReaderTest {
public:
BlockBasedTableReaderTestVerifyChecksum() : BlockBasedTableReaderTest() {}
};
TEST_P(BlockBasedTableReaderTestVerifyChecksum, ChecksumMismatch) {
// Prepare key-value pairs to occupy multiple blocks.
// Each value is 256B, every 16 pairs constitute 1 block.
// Adjacent blocks contain values with different compression complexity:
// human readable strings are easier to compress than random strings.
Random rnd(101);
std::map<std::string, std::string> kv;
{
uint32_t key = 0;
for (int block = 0; block < 800; block++) {
for (int i = 0; i < 16; i++) {
char k[9] = {0};
// Internal key is constructed directly from this key,
// and internal key size is required to be >= 8 bytes,
// so use %08u as the format string.
sprintf(k, "%08u", key);
std::string v;
test::RandomString(&rnd, 256, &v);
kv[std::string(k)] = v;
key++;
}
}
}
std::string table_name =
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
CreateTable(table_name, compression_type_, kv);
std::unique_ptr<BlockBasedTable> table;
Options options;
ImmutableCFOptions ioptions(options);
FileOptions foptions;
foptions.use_direct_reads = use_direct_reads_;
InternalKeyComparator comparator(options.comparator);
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
// Use the top level iterator to find the offset/size of the first
// 2nd level index block and corrupt the block
IndexBlockIter iiter_on_stack;
BlockCacheLookupContext context{TableReaderCaller::kUserVerifyChecksum};
InternalIteratorBase<IndexValue>* iiter = table->NewIndexIterator(
ReadOptions(), /*disable_prefix_seek=*/false, &iiter_on_stack,
/*get_context=*/nullptr, &context);
std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
if (iiter != &iiter_on_stack) {
iiter_unique_ptr = std::unique_ptr<InternalIteratorBase<IndexValue>>(iiter);
}
ASSERT_OK(iiter->status());
iiter->SeekToFirst();
BlockHandle handle = static_cast<ParititionedIndexIterator*>(iiter)
->index_iter_->value()
.handle;
table.reset();
// Corrupt the block pointed to by handle
test::CorruptFile(Path(table_name), static_cast<int>(handle.offset()), 128);
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
Status s = table->VerifyChecksum(ReadOptions(),
TableReaderCaller::kUserVerifyChecksum);
ASSERT_EQ(s.code(), Status::kCorruption);
}
// Param 1: compression type
// Param 2: whether to use direct reads
// Param 3: Block Based Table Index type
// Param 4: BBTO no_block_cache option
#ifdef ROCKSDB_LITE
// Skip direct I/O tests in lite mode since direct I/O is unsupported.
INSTANTIATE_TEST_CASE_P(
MultiGet, BlockBasedTableReaderTest,
::testing::Combine(
::testing::ValuesIn(GetSupportedCompressions()),
::testing::Values(false),
::testing::Values(BlockBasedTableOptions::IndexType::kBinarySearch),
::testing::Values(false)));
#else // ROCKSDB_LITE
INSTANTIATE_TEST_CASE_P(
MultiGet, BlockBasedTableReaderTest,
::testing::Combine(
::testing::ValuesIn(GetSupportedCompressions()), ::testing::Bool(),
::testing::Values(BlockBasedTableOptions::IndexType::kBinarySearch),
::testing::Values(false)));
#endif // ROCKSDB_LITE
INSTANTIATE_TEST_CASE_P(
VerifyChecksum, BlockBasedTableReaderTestVerifyChecksum,
::testing::Combine(
::testing::ValuesIn(GetSupportedCompressions()),
::testing::Values(false),
::testing::Values(
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch),
::testing::Values(true)));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -122,6 +122,7 @@ class ParititionedIndexIterator : public InternalIteratorBase<IndexValue> {
}
private:
friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
const BlockBasedTable* table_;
const ReadOptions read_options_;
#ifndef NDEBUG

View File

@ -10,6 +10,7 @@
#include "test_util/testutil.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <array>
#include <cctype>
#include <fstream>
@ -540,5 +541,46 @@ void SetupSyncPointsToMockDirectIO() {
#endif
}
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt) {
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
// strerror is not thread-safe so should not be used in the "passing" path
// of unit tests (sometimes parallelized) but is OK here where test fails
const char* msg = strerror(errno);
fprintf(stderr, "%s:%s\n", fname.c_str(), msg);
assert(false);
}
if (offset < 0) {
// Relative to end of file; make it absolute
if (-offset > sbuf.st_size) {
offset = 0;
} else {
offset = static_cast<int>(sbuf.st_size + offset);
}
}
if (offset > sbuf.st_size) {
offset = static_cast<int>(sbuf.st_size);
}
if (offset + bytes_to_corrupt > sbuf.st_size) {
bytes_to_corrupt = static_cast<int>(sbuf.st_size - offset);
}
// Do it
std::string contents;
Status s = ReadFileToString(Env::Default(), fname, &contents);
assert(s.ok());
for (int i = 0; i < bytes_to_corrupt; i++) {
contents[i + offset] ^= 0x80;
}
s = WriteStringToFile(Env::Default(), contents, fname);
assert(s.ok());
Options options;
EnvOptions env_options;
#ifndef ROCKSDB_LITE
assert(!VerifySstFileChecksum(options, env_options, fname).ok());
#endif
}
} // namespace test
} // namespace ROCKSDB_NAMESPACE

View File

@ -809,5 +809,7 @@ void ResetTmpDirForDirectIO();
// to the file system.
void SetupSyncPointsToMockDirectIO();
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt);
} // namespace test
} // namespace ROCKSDB_NAMESPACE

View File

@ -22,6 +22,15 @@
#include <folly/Portability.h>
#include <folly/ConstexprMath.h>
// Work around bug https://gcc.gnu.org/bugzilla/show_bug.cgi?id=56019
#ifdef __GNUC__
#if __GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 9)
namespace std {
using ::max_align_t;
}
#endif
#endif
namespace folly {
// has_extended_alignment

View File

@ -64,6 +64,8 @@ const std::string LDBCommand::ARG_TTL_START = "start_time";
const std::string LDBCommand::ARG_TTL_END = "end_time";
const std::string LDBCommand::ARG_TIMESTAMP = "timestamp";
const std::string LDBCommand::ARG_TRY_LOAD_OPTIONS = "try_load_options";
const std::string LDBCommand::ARG_DISABLE_CONSISTENCY_CHECKS =
"disable_consistency_checks";
const std::string LDBCommand::ARG_IGNORE_UNKNOWN_OPTIONS =
"ignore_unknown_options";
const std::string LDBCommand::ARG_FROM = "from";
@ -362,6 +364,8 @@ LDBCommand::LDBCommand(const std::map<std::string, std::string>& options,
is_db_ttl_ = IsFlagPresent(flags, ARG_TTL);
timestamp_ = IsFlagPresent(flags, ARG_TIMESTAMP);
try_load_options_ = IsFlagPresent(flags, ARG_TRY_LOAD_OPTIONS);
force_consistency_checks_ =
!IsFlagPresent(flags, ARG_DISABLE_CONSISTENCY_CHECKS);
config_options_.ignore_unknown_options =
IsFlagPresent(flags, ARG_IGNORE_UNKNOWN_OPTIONS);
}
@ -527,6 +531,7 @@ std::vector<std::string> LDBCommand::BuildCmdLineOptions(
ARG_FILE_SIZE,
ARG_FIX_PREFIX_LEN,
ARG_TRY_LOAD_OPTIONS,
ARG_DISABLE_CONSISTENCY_CHECKS,
ARG_IGNORE_UNKNOWN_OPTIONS,
ARG_CF_NAME};
ret.insert(ret.end(), options.begin(), options.end());
@ -622,6 +627,7 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
}
}
cf_opts->force_consistency_checks = force_consistency_checks_;
if (use_table_options) {
cf_opts->table_factory.reset(NewBlockBasedTableFactory(table_options));
}
@ -2839,7 +2845,7 @@ CheckConsistencyCommand::CheckConsistencyCommand(
const std::vector<std::string>& /*params*/,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(options, flags, false, BuildCmdLineOptions({})) {}
: LDBCommand(options, flags, true, BuildCmdLineOptions({})) {}
void CheckConsistencyCommand::Help(std::string& ret) {
ret.append(" ");
@ -2848,19 +2854,13 @@ void CheckConsistencyCommand::Help(std::string& ret) {
}
void CheckConsistencyCommand::DoCommand() {
Options opt = PrepareOptionsForOpenDB();
opt.paranoid_checks = true;
if (!exec_state_.IsNotStarted()) {
return;
}
DB* db;
Status st = DB::OpenForReadOnly(opt, db_path_, &db, false);
delete db;
if (st.ok()) {
options_.paranoid_checks = true;
options_.num_levels = 64;
OpenDB();
if (exec_state_.IsSucceed() || exec_state_.IsNotStarted()) {
fprintf(stdout, "OK\n");
} else {
exec_state_ = LDBCommandExecuteResult::Failed(st.ToString());
}
CloseDB();
}
// ----------------------------------------------------------------------------

View File

@ -551,6 +551,98 @@ TEST_F(LdbCmdTest, ListFileTombstone) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
}
TEST_F(LdbCmdTest, DisableConsistencyChecks) {
Env* base_env = TryLoadCustomOrDefaultEnv();
std::unique_ptr<Env> env(NewMemEnv(base_env));
Options opts;
opts.env = env.get();
opts.create_if_missing = true;
std::string dbname = test::TmpDir();
{
DB* db = nullptr;
ASSERT_OK(DB::Open(opts, dbname, &db));
WriteOptions wopts;
FlushOptions fopts;
fopts.wait = true;
ASSERT_OK(db->Put(wopts, "foo1", "1"));
ASSERT_OK(db->Put(wopts, "bar1", "2"));
ASSERT_OK(db->Flush(fopts));
ASSERT_OK(db->Put(wopts, "foo2", "3"));
ASSERT_OK(db->Put(wopts, "bar2", "4"));
ASSERT_OK(db->Flush(fopts));
delete db;
}
{
char arg1[] = "./ldb";
char arg2[1024];
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
char arg3[] = "checkconsistency";
char* argv[] = {arg1, arg2, arg3};
SyncPoint::GetInstance()->SetCallBack(
"Version::PrepareApply:forced_check", [&](void* arg) {
bool* forced = reinterpret_cast<bool*>(arg);
ASSERT_TRUE(*forced);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_EQ(
0, LDBCommandRunner::RunCommand(3, argv, opts, LDBOptions(), nullptr));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
{
char arg1[] = "./ldb";
char arg2[1024];
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
char arg3[] = "scan";
char* argv[] = {arg1, arg2, arg3};
SyncPoint::GetInstance()->SetCallBack(
"Version::PrepareApply:forced_check", [&](void* arg) {
bool* forced = reinterpret_cast<bool*>(arg);
ASSERT_TRUE(*forced);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_EQ(
0, LDBCommandRunner::RunCommand(3, argv, opts, LDBOptions(), nullptr));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
{
char arg1[] = "./ldb";
char arg2[1024];
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
char arg3[] = "scan";
char arg4[] = "--disable_consistency_checks";
char* argv[] = {arg1, arg2, arg3, arg4};
SyncPoint::GetInstance()->SetCallBack(
"ColumnFamilyData::ColumnFamilyData", [&](void* arg) {
ColumnFamilyOptions* cfo =
reinterpret_cast<ColumnFamilyOptions*>(arg);
ASSERT_FALSE(cfo->force_consistency_checks);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_EQ(
0, LDBCommandRunner::RunCommand(4, argv, opts, LDBOptions(), nullptr));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

View File

@ -46,6 +46,8 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
" : DB supports ttl and value is internally timestamp-suffixed\n");
ret.append(" --" + LDBCommand::ARG_TRY_LOAD_OPTIONS +
" : Try to load option file from DB.\n");
ret.append(" --" + LDBCommand::ARG_DISABLE_CONSISTENCY_CHECKS +
" : Set options.force_consistency_checks = false.\n");
ret.append(" --" + LDBCommand::ARG_IGNORE_UNKNOWN_OPTIONS +
" : Ignore unknown options when loading option file.\n");
ret.append(" --" + LDBCommand::ARG_BLOOM_BITS + "=<int,e.g.:14>\n");