Compare commits

...

28 Commits

Author SHA1 Message Date
Yanqin Jin
c509dd3249 Fix the logic of setting read_amp_bytes_per_bit from OPTIONS file (#7680)
Summary:
Instead of using `EncodeFixed32` which always serialize a integer to
little endian, we should use the local machine's endianness when
populating a native data structure during options parsing.
Without this fix, `read_amp_bytes_per_bit` may be populated incorrectly
on big-endian machines.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7680

Test Plan: make check

Reviewed By: pdillinger

Differential Revision: D24999166

Pulled By: riversand963

fbshipit-source-id: dc603cff6e17f8fa32479ce6df93b93082e6b0c4
2020-11-17 10:18:19 -08:00
Yanqin Jin
19c8025386 Bump version and update HISTORY 2020-11-15 15:04:28 -08:00
Yanqin Jin
63462c3a94 Hack to load OPTIONS file for read_amp_bytes_per_bit (#7659)
Summary:
A temporary hack to work around a bug in 6.10, 6.11, 6.12, 6.13 and
6.14. The bug will write out 8 bytes to OPTIONS file from the starting
address of BlockBasedTableOptions.read_amp_bytes_per_bit which is
actually a uint32. Consequently, the value of read_amp_bytes_per_bit
written in the OPTIONS file is wrong. From 6.15, RocksDB will
try to parse the read_amp_bytes_per_bit from OPTIONS file as a uint32.
To be able to load OPTIONS file generated by affected releases before
the fix, we need to manually parse read_amp_bytes_per_bit with this hack.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7659

Test Plan:
Generate a db with current 6.14.fb (head at b6db05dbb5). Maybe use db_stress.

Checkout this PR, run
```
 ~/rocksdb/ldb --db=. --try_load_options --ignore_unknown_options idump --count_only
```
Expect success, and should not see
```
Failed: Invalid argument: Error parsing read_amp_bytes_per_bit:17179869184
```

Also
make check

Reviewed By: anand1976

Differential Revision: D24954752

Pulled By: riversand963

fbshipit-source-id: c7b802fc3e52acd050a4fc1cd475016122234394
2020-11-15 14:52:54 -08:00
Huisheng Liu
62f110f527 fix read_amp_bytes_per_bit field size (#7651)
Summary:
The field in BlockBasedTableOptions is 4 bytes:
  // Default: 0 (disabled)
  uint32_t read_amp_bytes_per_bit = 0;

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7651

Reviewed By: ltamasi

Differential Revision: D24844994

Pulled By: riversand963

fbshipit-source-id: e2695e55532256ef8996dd6939cad06987a80293
2020-11-11 12:21:58 -08:00
Andrew Kryczka
878b3a41de update HISTORY.md and version.h for 6.11.6 2020-10-12 13:15:15 -07:00
Andrew Kryczka
f36f44c5e6 Fix bug in pinned partitioned indexes with some reads bypassing block cache
Backports part of 75d3b6fdf0.
2020-10-12 13:14:45 -07:00
Andrew Kryczka
8d70e53114 Fix bug in pinned partitioned user key indexes
Backports part of 75d3b6fdf0.
2020-10-12 13:14:45 -07:00
Yanqin Jin
5a4b8005a7 Update HISTORY and version 2020-07-23 14:42:25 -07:00
Yanqin Jin
4cd964946a Report corruption on unrecognized value type (#7121)
Summary:
During memtable lookup, an unrecognized value type should be reported as
Status::Corruption.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7121

Test Plan: make check

Reviewed By: cheng-chang

Differential Revision: D22512124

Pulled By: riversand963

fbshipit-source-id: 9b97be7d9b230c5aae9205f96054420e5ea09066
2020-07-23 14:37:30 -07:00
Andrew Kryczka
48bfca38f6 update HISTORY.md and version.h for 6.11.4 2020-07-15 17:54:15 -07:00
Adam Retter
c8b9556cb6 Only check for python location once (#7123)
Summary:
This fixes an issue introduced in 0c56fc4 whereby the location of Python is evaluated many times and leads to excessive logging of unknown python locations of CentOS 6.

The location is now only checked once.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7123

Reviewed By: zhichao-cao

Differential Revision: D22532274

Pulled By: ajkr

fbshipit-source-id: cade71b4b46e9a23d63ecb4dd36a4ac8ae217970
2020-07-15 17:51:52 -07:00
Yanqin Jin
30bfa2a44e Report corrupted keys during compaction (#7124)
Summary:
Currently, RocksDB lets compaction to go through even in case of
corrupted keys, the number of which is reported in CompactionJobStats.
However, RocksDB does not check this value. We should let compaction run
in a stricter mode.

Temporarily disable two tests that allow corrupted keys in compaction.
With this PR, the two tests will assert(false) and terminate. Still need
to investigate what is the recommended google-test way of doing it.
Death test (EXPECT_DEATH) in gtest has warnings now.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7124

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D22530722

Pulled By: riversand963

fbshipit-source-id: 6a5a6a992028c6d4f92cb74693c92db462ae4ad6
2020-07-15 17:51:08 -07:00
Andrew Kryczka
3e1eb99ab7 bump version to 6.11.3 2020-07-09 15:51:37 -07:00
Yanqin Jin
3514cf51fa Fix data race to VersionSet::io_status_ (#7034)
Summary:
After https://github.com/facebook/rocksdb/issues/6949 , VersionSet::io_status_ can be concurrently accessed by multiple
threads without lock, causing tsan test to fail. For example, a bg flush thread
resets io_status_ before calling LogAndApply(), while another thread already in
the process of LogAndApply() reads io_status_. This is a bug.

We do not have to reset io_status_ each time we call LogAndApply(). io_status_
is part of the state of VersionSet, and it indicates the outcome of preceding
MANIFEST/CURRENT files IO operations. Its value should be updated only when:

1. MANIFEST/CURRENT files IO fail for the first time.
2. MANIFEST/CURRENT files IO succeed as part of recovering from a prior
   failure without process restart, e.g. calling Resume().

Test Plan (devserver):
COMPILE_WITH_TSAN=1 make check
COMPILE_WITH_TSAN=1 make db_test2
./db_test2 --gtest_filter=DBTest2.CompactionStall
Pull Request resolved: https://github.com/facebook/rocksdb/pull/7034

Reviewed By: zhichao-cao

Differential Revision: D22247137

Pulled By: riversand963

fbshipit-source-id: 77b83e05390f3ee3cd2d96d3fdd6fe4f225e3216
2020-07-09 15:51:01 -07:00
Yanqin Jin
dd63f04c83 First step towards handling MANIFEST write error (#6949)
Summary:
This PR provides preliminary support for handling IO error during MANIFEST write.
File write/sync is not guaranteed to be atomic. If we encounter an IOError while writing/syncing to the MANIFEST file, we cannot be sure about the state of the MANIFEST file. The version edits may or may not have reached the file. During cleanup, if we delete the newly-generated SST files referenced by the pending version edit(s), but the version edit(s) actually are persistent in the MANIFEST, then next recovery attempt will process the version edits(s) and then fail since the SST files have already been deleted.
One approach is to truncate the MANIFEST after write/sync error, so that it is safe to delete the SST files. However, file truncation may not be supported on certain file systems. Therefore, we take the following approach.
If an IOError is detected during MANIFEST write/sync, we disable file deletions for the faulty database. Depending on whether the IOError is retryable (set by underlying file system), either RocksDB or application can call `DB::Resume()`, or simply shutdown and restart. During `Resume()`, RocksDB will try to switch to a new MANIFEST and write all existing in-memory version storage in the new file. If this succeeds, then RocksDB may proceed. If all recovery is completed, then file deletions will be re-enabled.
Note that multiple threads can call `LogAndApply()` at the same time, though only one of them will be going through the process MANIFEST write, possibly batching the version edits of other threads. When the leading MANIFEST writer finishes, all of the MANIFEST writing threads in this batch will have the same IOError. They will all call `ErrorHandler::SetBGError()` in which file deletion will be disabled.

Possible future directions:
- Add an `ErrorContext` structure so that it is easier to pass more info to `ErrorHandler`. Currently, as in this example, a new `BackgroundErrorReason` has to be added.

Test plan (dev server):
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6949

Reviewed By: anand1976

Differential Revision: D22026020

Pulled By: riversand963

fbshipit-source-id: f3c68a2ef45d9b505d0d625c7c5e0c88495b91c8
2020-07-09 15:50:33 -07:00
Akanksha Mahajan
115c9113ca Update Flush policy in PartitionedIndexBuilder on switching from user-key to internal-key mode (#7096)
Summary:
When format_version is high enough to support user-key and
there are index entries for same user key that spans multiple data
blocks then it changes from user-key mode to internal-key mode. But the
flush policy is not reset to point to Block Builder of internal-keys.
After this switch, no entries are added to user key index partition
result, thus it never triggers flushing the block.

Fix: 1. After adding the entry in sub_builder_index_, if there is a switch
from user-key to internal-key, then flush policy is updated to point to
Block Builder of internal-keys index partition.
2. Set sub_builder_index_->seperator_is_key_plus_seq_ = true if
seperator_is_key_plus_seq_  is set to true so that subsequent partitions
can also use internal key mode.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7096

Test Plan: make check -j64

Reviewed By: ajkr

Differential Revision: D22416598

Pulled By: akankshamahajan15

fbshipit-source-id: 01fc2dc07ea1b32f8fb803995ebe6e9a3fbe67ac
2020-07-09 15:48:54 -07:00
Andrew Kryczka
07e2ca100a bump patch version to 6.11.2 2020-07-07 09:34:16 -07:00
Peter Dillinger
9912ebe1b5 Exclude c_test from buck build opt mode
Summary: Fix a Facebook internal build

Test Plan: buck build @mode/opt :c_test :c_test_bin (was compilation
failure, now "not found")
buck build @mode/dev :c_test :c_test_bin (still passes)
2020-07-07 09:31:44 -07:00
sdong
0fdd019f62 Fix test in buck test (#7076)
Summary:
This is to fix special logic to run tests inside FB.
Buck test is broken after moving to cpp_unittest(). Move c_test back to the previous approach.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7076

Test Plan: Watch the Sandcastle run

Reviewed By: ajkr

Differential Revision: D22370096

fbshipit-source-id: 4a464d0903f2c76ae2de3a8ad373ffc9bedec64c
2020-07-07 09:12:18 -07:00
Levi Tamasi
43107e0a9a Move kNoExpiration to blob_db.h (#7018)
Summary:
The constant `kNoExpiration` is currently defined in an
internal/implementation header (`blob_log_format.h`); the patch moves it
to the public header `blob_db.h` so it is accessible to users.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/7018

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D22191354

Pulled By: ltamasi

fbshipit-source-id: 98c8012a83b999a3f1a30e955ce6bb71ba29dc5c
2020-06-23 14:13:48 -07:00
Andrew Kryczka
c0a8d89c27 add missing release note 2020-06-23 13:45:02 -07:00
Andrew Kryczka
85189fd64f skip 6.11.0 and go straight to 6.11.1 for clarity of HISTORY.md 2020-06-23 13:04:41 -07:00
sdong
1910560c2c Fix a bug that causes iterator to return wrong result in a rare data race (#6973)
Summary:
The bug fixed in https://github.com/facebook/rocksdb/pull/1816/ is now applicable to iterator too. This was not an issue but https://github.com/facebook/rocksdb/pull/2886 caused the regression. If a put and DB flush happens just between iterator to get latest sequence number and getting super version, empty result for the key or an older value can be returned, which is wrong.
Fix it in the same way as the fix in https://github.com/facebook/rocksdb/issues/1816, that is to get the sequence number after referencing the super version.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6973

Test Plan: Will run stress tests for a while to make sure there is no general regression.

Reviewed By: ajkr

Differential Revision: D22029348

fbshipit-source-id: 94390f93630906796d6e2fec321f44a920953fd1
2020-06-18 10:43:00 -07:00
Yanqin Jin
61cc9ef76f Fail recovery when MANIFEST record checksum mismatch (#6996)
Summary:
https://github.com/facebook/rocksdb/issues/5411 refactored `VersionSet::Recover` but introduced a bug, explained as follows.
Before, once a checksum mismatch happens, `reporter` will set `s` to be non-ok. Therefore, Recover will stop processing the MANIFEST any further.
```
// Correct
// Inside Recover
LogReporter reporter;
reporter.status = &s;
log::Reader reader(..., reporter);
while (reader.ReadRecord() && s.ok()) {
...
}
```
The bug is that, the local variable `s` in `ReadAndRecover` won't be updated by `reporter` while reading the MANIFEST. It is possible that the reader sees a checksum mismatch in a record, but `ReadRecord` retries internally read and finds the next valid record. The mismatched record will be ignored and no error is reported.
```
// Incorrect
// Inside Recover
LogReporter reporter;
reporter.status = &s;
log::Reader reader(..., reporter);
s = ReadAndRecover(reader, ...);

// Inside ReadAndRecover
  Status s;  // Shadows the s in Recover.
  while (reader.ReadRecord() && s.ok()) {
   ...
  }
```
`LogReporter` can use a separate `log_read_status` to track the errors while reading the MANIFEST. RocksDB can process more MANIFEST entries only if `log_read_status.ok()`.
Test plan (devserver):
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6996

Reviewed By: ajkr

Differential Revision: D22105746

Pulled By: riversand963

fbshipit-source-id: b22f717a423457a41ca152a242abbb64cf91fc38
2020-06-18 10:38:16 -07:00
sdong
f5d4dbbeef Fix the bug that compressed cache is disabled in read-only DBs (#6990)
Summary:
Compressed block cache is disabled in https://github.com/facebook/rocksdb/pull/4650 for no good reason. Re-enable it.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6990

Test Plan: Add a unit test to make sure a general function works with read-only DB + compressed block cache.

Reviewed By: ltamasi

Differential Revision: D22072755

fbshipit-source-id: 2a55df6363de23a78979cf6c747526359e5dc7a1
2020-06-18 10:30:53 -07:00
Andrew Kryczka
84dcfe1a5f update minor version for 6.11 release (#6994)
Summary:
The 6.11.fb branch is already cut so I will also backport this PR to
that branch.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6994

Reviewed By: riversand963

Differential Revision: D22084532

Pulled By: ajkr

fbshipit-source-id: 0b025f738cc31c65c673cbf89302359e88a34d19
2020-06-17 09:41:11 -07:00
Yanqin Jin
a818699f2f Fix a bug of overwriting return code (#6989)
Summary:
In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6989

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D22071418

Pulled By: riversand963

fbshipit-source-id: 5a4ea5dfb1a41f41c7a3fdaf62b163007b42f04b
2020-06-16 14:00:15 -07:00
Yanqin Jin
97a69f4372 Let best-efforts recovery ignore CURRENT file (#6970)
Summary:
Best-efforts recovery does not check the content of CURRENT file to determine which MANIFEST to recover from. However, it still checks the presence of CURRENT file to determine whether to create a new DB during `open()`. Therefore, we can tweak the logic in `open()` a little bit so that best-efforts recovery does not rely on CURRENT file at all.

Test plan (dev server):
make check
./db_basic_test --gtest_filter=DBBasicTest.RecoverWithNoCurrentFile
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6970

Reviewed By: anand1976

Differential Revision: D22013990

Pulled By: riversand963

fbshipit-source-id: db552a1868c60ed70e1f7cd252a3a076eb8ea58f
2020-06-16 09:35:52 -07:00
39 changed files with 763 additions and 194 deletions

View File

@ -1,4 +1,38 @@
# Rocksdb Change Log
## 6.11.7 (11/15/2020)
### Bug Fixes
* Fix a bug of encoding and parsing BlockBasedTableOptions::read_amp_bytes_per_bit as a 64-bit integer.
* Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected.
## 6.11.6 (10/12/2020)
### Bug Fixes
* Fixed a bug in the following combination of features: indexes with user keys (`format_version >= 3`), indexes are partitioned (`index_type == kTwoLevelIndexSearch`), and some index partitions are pinned in memory (`BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache`). The bug could cause keys to be truncated when read from the index leading to wrong read results or other unexpected behavior.
* Fixed a bug when indexes are partitioned (`index_type == kTwoLevelIndexSearch`), some index partitions are pinned in memory (`BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache`), and partitions reads could be mixed between block cache and directly from the file (e.g., with `enable_index_compression == 1` and `mmap_read == 1`, partitions that were stored uncompressed due to poor compression ratio would be read directly from the file via mmap, while partitions that were stored compressed would be read from block cache). The bug could cause index partitions to be mistakenly considered empty during reads leading to wrong read results.
## 6.11.5 (7/23/2020)
### Bug Fixes
* Memtable lookup should report unrecognized value_type as corruption (#7121).
## 6.11.4 (7/15/2020)
### Bug Fixes
* Make compaction report InternalKey corruption while iterating over the input.
## 6.11.3 (7/9/2020)
### Bug Fixes
* Fix a bug when index_type == kTwoLevelIndexSearch in PartitionedIndexBuilder to update FlushPolicy to point to internal key partitioner when it changes from user-key mode to internal-key mode in index partition.
* Disable file deletion after MANIFEST write/sync failure until db re-open or Resume() so that subsequent re-open will not see MANIFEST referencing deleted SSTs.
## 6.11.1 (6/23/2020)
### Bug Fixes
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
* Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too.
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement.
### Public API Change
* `DB::OpenForReadOnly()` now returns `Status::NotFound` when the specified DB directory does not exist. Previously the error returned depended on the underlying `Env`.
## 6.11 (6/12/2020)
### Bug Fixes
* Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true.

View File

@ -9,7 +9,9 @@
BASH_EXISTS := $(shell which bash)
SHELL := $(shell which bash)
# Default to python3. Some distros like CentOS 8 do not have `python`.
PYTHON?=$(shell which python3 || which python || echo python3)
ifeq ($(origin PYTHON), undefined)
PYTHON := $(shell which python3 || which python || echo python3)
endif
export PYTHON
CLEAN_FILES = # deliberately empty, so we can append below.

28
TARGETS
View File

@ -447,6 +447,27 @@ cpp_library(
external_deps = ROCKSDB_EXTERNAL_DEPS,
)
if not is_opt_mode:
cpp_binary(
name = "c_test_bin",
srcs = ["db/c_test.c"],
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
compiler_flags = ROCKSDB_COMPILER_FLAGS,
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
deps = [":rocksdb_test_lib"],
)
if not is_opt_mode:
custom_unittest(
"c_test",
command = [
native.package_name() + "/buckifier/rocks_test_runner.sh",
"$(location :{})".format("c_test_bin"),
],
type = "simple",
)
cpp_library(
name = "env_basic_test_lib",
srcs = ["env/env_basic_test.cc"],
@ -560,13 +581,6 @@ ROCKS_TESTS = [
[],
[],
],
[
"c_test",
"db/c_test.c",
"serial",
[],
[],
],
[
"cache_simulator_test",
"utilities/simulator_cache/cache_simulator_test.cc",

View File

@ -64,8 +64,6 @@ def get_cc_files(repo_path):
continue
for filename in fnmatch.filter(filenames, '*.cc'):
cc_files.append(os.path.join(root, filename))
for filename in fnmatch.filter(filenames, '*.c'):
cc_files.append(os.path.join(root, filename))
return cc_files
@ -178,10 +176,18 @@ def generate_targets(repo_path, deps_map):
+ ["test_util/testutil.cc"])
print("Extra dependencies:\n{0}".format(json.dumps(deps_map)))
# test for every test we found in the Makefile
# c_test.c is added through TARGETS.add_c_test(). If there
# are more than one .c test file, we need to extend
# TARGETS.add_c_test() to include other C tests too.
TARGETS.add_c_test()
# test for every .cc test we found in the Makefile
for target_alias, deps in deps_map.items():
for test in sorted(tests):
match_src = [src for src in cc_files if ("/%s.c" % test) in src]
if test == 'c_test':
continue
match_src = [src for src in cc_files if ("/%s.cc" % test) in src]
if len(match_src) == 0:
print(ColorString.warning("Cannot find .cc file for %s" % test))
continue

View File

@ -76,6 +76,30 @@ class TARGETSBuilder(object):
pretty_list(deps)))
self.total_bin = self.total_bin + 1
def add_c_test(self):
self.targets_file.write("""
if not is_opt_mode:
cpp_binary(
name = "c_test_bin",
srcs = ["db/c_test.c"],
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
compiler_flags = ROCKSDB_COMPILER_FLAGS,
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
deps = [":rocksdb_test_lib"],
)
if not is_opt_mode:
custom_unittest(
"c_test",
command = [
native.package_name() + "/buckifier/rocks_test_runner.sh",
"$(location :{})".format("c_test_bin"),
],
type = "simple",
)
""")
def register_test(self,
test_name,
src,

View File

@ -56,8 +56,9 @@ Status ArenaWrappedDBIter::Refresh() {
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
if (sv_number_ != cur_sv_number) {
Env* env = db_iter_->env();
db_iter_->~DBIter();
@ -65,6 +66,7 @@ Status ArenaWrappedDBIter::Refresh() {
new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
if (read_callback_) {
read_callback_->Refresh(latest_seq);
}
@ -78,7 +80,7 @@ Status ArenaWrappedDBIter::Refresh() {
latest_seq, /* allow_unprepared_value */ true);
SetIterUnderDBIter(internal_iter);
} else {
db_iter_->set_sequence(latest_seq);
db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber());
db_iter_->set_valid(false);
}
return Status::OK();

View File

@ -9,7 +9,6 @@
#ifndef ROCKSDB_LITE
#include <limits>
#include <memory>
#include <utility>
@ -23,7 +22,6 @@ namespace blob_db {
constexpr uint32_t kMagicNumber = 2395959; // 0x00248f37
constexpr uint32_t kVersion1 = 1;
constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
using ExpirationRange = std::pair<uint64_t, uint64_t>;

View File

@ -246,6 +246,7 @@ void CompactionIterator::NextFromInput() {
iter_stats_.num_input_records++;
if (!ParseInternalKey(key_, &ikey_)) {
iter_stats_.num_input_corrupt_records++;
// If `expect_valid_internal_key_` is false, return the corrupted key
// and let the caller decide what to do with it.
// TODO(noetzli): We should have a more elegant solution for this.
@ -258,7 +259,6 @@ void CompactionIterator::NextFromInput() {
has_current_user_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
iter_stats_.num_input_corrupt_records++;
valid_ = true;
break;
}

View File

@ -721,7 +721,6 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), thread_pri_, compaction_stats_);
versions_->SetIOStatusOK();
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
}
@ -896,9 +895,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
&range_del_agg, sub_compact->compaction, compaction_filter,
shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
sub_compact->compaction, compaction_filter, shutting_down_,
preserve_deletes_seqnum_, manual_compaction_paused_,
db_options_.info_log));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();

View File

@ -395,7 +395,7 @@ TEST_F(CompactionJobTest, Simple) {
RunCompaction({ files }, expected_results);
}
TEST_F(CompactionJobTest, SimpleCorrupted) {
TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) {
NewDB();
auto expected_results = CreateTwoFiles(true);
@ -989,7 +989,7 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
// single deletion and the (single) deletion gets removed while the corrupt key
// gets written out. TODO(noetzli): We probably want a better way to treat
// corrupt keys.
TEST_F(CompactionJobTest, CorruptionAfterDeletion) {
TEST_F(CompactionJobTest, DISABLED_CorruptionAfterDeletion) {
NewDB();
auto file1 =

View File

@ -2199,6 +2199,35 @@ TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
}
}
TEST_F(DBBasicTest, RecoverWithNoManifest) {
Options options = CurrentOptions();
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
Close();
{
// Delete all MANIFEST.
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
for (const auto& file : files) {
uint64_t number = 0;
FileType type = kLogFile;
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
ASSERT_OK(env_->DeleteFile(dbname_ + "/" + file));
}
}
}
options.best_efforts_recovery = true;
options.create_if_missing = false;
Status s = TryReopen(options);
ASSERT_TRUE(s.IsInvalidArgument());
options.create_if_missing = true;
Reopen(options);
// Since no MANIFEST exists, best-efforts recovery creates a new, empty db.
ASSERT_EQ("NOT_FOUND", Get("foo"));
}
TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
Options options = CurrentOptions();
DestroyAndReopen(options);
@ -2239,6 +2268,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
}
#endif // !ROCKSDB_LITE
TEST_F(DBBasicTest, ManifestChecksumMismatch) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("bar", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
auto* crc = reinterpret_cast<uint32_t*>(arg);
*crc = *crc + 1;
});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions write_opts;
write_opts.disableWAL = true;
Status s = db_->Put(write_opts, "foo", "value");
ASSERT_OK(s);
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_OK(Put("foo", "value1"));
ASSERT_OK(Flush());
s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
}
class DBBasicTestMultiGet : public DBTestBase {
public:
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
@ -2967,6 +3023,33 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
Close();
}
TEST_F(DBBasicTest, ManifestWriteFailure) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto* s = reinterpret_cast<Status*>(arg);
ASSERT_OK(*s);
// Manually overwrite return status
*s = Status::IOError();
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key", "value"));
ASSERT_NOK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

View File

@ -23,57 +23,6 @@
namespace ROCKSDB_NAMESPACE {
Status DBImpl::DisableFileDeletions() {
InstrumentedMutexLock l(&mutex_);
++disable_delete_obsolete_files_;
if (disable_delete_obsolete_files_ == 1) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Disabled, but already disabled. Counter: %d",
disable_delete_obsolete_files_);
}
return Status::OK();
}
Status DBImpl::EnableFileDeletions(bool force) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
bool file_deletion_enabled = false;
{
InstrumentedMutexLock l(&mutex_);
if (force) {
// if force, we need to enable file deletions right away
disable_delete_obsolete_files_ = 0;
} else if (disable_delete_obsolete_files_ > 0) {
--disable_delete_obsolete_files_;
}
if (disable_delete_obsolete_files_ == 0) {
file_deletion_enabled = true;
FindObsoleteFiles(&job_context, true);
bg_cv_.SignalAll();
}
}
if (file_deletion_enabled) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Enable, but not really enabled. Counter: %d",
disable_delete_obsolete_files_);
}
job_context.Clean();
LogFlush(immutable_db_options_.info_log);
return Status::OK();
}
int DBImpl::IsFileDeletionsEnabled() const {
return !disable_delete_obsolete_files_;
}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
bool flush_memtable) {

View File

@ -312,8 +312,36 @@ Status DBImpl::ResumeImpl() {
}
// Make sure the IO Status stored in version set is set to OK.
bool file_deletion_disabled = !IsFileDeletionsEnabled();
if (s.ok()) {
versions_->SetIOStatusOK();
IOStatus io_s = versions_->io_status();
if (io_s.IsIOError()) {
// If resuming from IOError resulted from MANIFEST write, then assert
// that we must have already set the MANIFEST writer to nullptr during
// clean-up phase MANIFEST writing. We must have also disabled file
// deletions.
assert(!versions_->descriptor_log_);
assert(file_deletion_disabled);
// Since we are trying to recover from MANIFEST write error, we need to
// switch to a new MANIFEST anyway. The old MANIFEST can be corrupted.
// Therefore, force writing a dummy version edit because we do not know
// whether there are flush jobs with non-empty data to flush, triggering
// appends to MANIFEST.
VersionEdit edit;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(default_cf_handle_);
assert(cfh);
ColumnFamilyData* cfd = cfh->cfd();
const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions();
s = versions_->LogAndApply(cfd, cf_opts, &edit, &mutex_,
directories_.GetDbDir());
if (!s.ok()) {
io_s = versions_->io_status();
if (!io_s.ok()) {
s = error_handler_.SetBGError(io_s,
BackgroundErrorReason::kManifestWrite);
}
}
}
}
// We cannot guarantee consistency of the WAL. So force flush Memtables of
@ -364,6 +392,13 @@ Status DBImpl::ResumeImpl() {
job_context.Clean();
if (s.ok()) {
assert(versions_->io_status().ok());
// If we reach here, we should re-enable file deletions if it was disabled
// during previous error handling.
if (file_deletion_disabled) {
// Always return ok
EnableFileDeletions(/*force=*/true);
}
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
}
mutex_.Lock();
@ -2670,7 +2705,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
" guaranteed to be preserved, try larger iter_start_seqnum opt."));
}
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
ColumnFamilyData* cfd = cfh->cfd();
assert(cfd != nullptr);
ReadCallback* read_callback = nullptr; // No read callback provided.
if (read_options.tailing) {
#ifdef ROCKSDB_LITE
@ -2691,10 +2727,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
// WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
result = NewIteratorImpl(read_options, cfd,
(read_options.snapshot != nullptr)
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber,
read_callback);
}
return result;
}
@ -2707,6 +2744,24 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
TEST_SYNC_POINT("DBImpl::NewIterator:1");
TEST_SYNC_POINT("DBImpl::NewIterator:2");
if (snapshot == kMaxSequenceNumber) {
// Note that the snapshot is assigned AFTER referencing the super
// version because otherwise a flush happening in between may compact away
// data for the snapshot, so the reader would see neither data that was be
// visible to the snapshot before compaction nor the newer data inserted
// afterwards.
// Note that the super version might not contain all the data available
// to this snapshot, but in that case it can see all the data in the
// super version, which is a valid consistent state after the user
// calls NewIterator().
snapshot = versions_->LastSequence();
TEST_SYNC_POINT("DBImpl::NewIterator:3");
TEST_SYNC_POINT("DBImpl::NewIterator:4");
}
// Try to generate a DB iterator tree in continuous memory area to be
// cache friendly. Here is an example of result:
// +-------------------------------+
@ -4369,6 +4424,14 @@ Status DBImpl::IngestExternalFiles(
#endif // !NDEBUG
}
}
} else if (versions_->io_status().IsIOError()) {
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
const IOStatus& io_s = versions_->io_status();
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
}
// Resume writes to the DB

View File

@ -356,6 +356,12 @@ class DBImpl : public DB {
virtual Status Close() override;
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual bool IsFileDeletionsEnabled() const;
Status GetStatsHistory(
uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;
@ -363,9 +369,6 @@ class DBImpl : public DB {
#ifndef ROCKSDB_LITE
using DB::ResetStats;
virtual Status ResetStats() override;
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual int IsFileDeletionsEnabled() const;
// All the returned filenames start with "/"
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
@ -477,6 +480,7 @@ class DBImpl : public DB {
Status GetImpl(const ReadOptions& options, const Slice& key,
GetImplOptions& get_impl_options);
// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
@ -1779,6 +1783,8 @@ class DBImpl : public DB {
SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback,
bool* is_blob_index);
Status DisableFileDeletionsWithLock();
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;

View File

@ -210,7 +210,15 @@ Status DBImpl::FlushMemTableToOutputFile(
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
!io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kFlush
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
@ -574,7 +582,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// it is not because of CF drop.
if (!s.ok() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kFlush
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
@ -2687,7 +2703,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
}
versions_->SetIOStatusOK();
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
@ -2745,7 +2760,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
}
versions_->SetIOStatusOK();
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
@ -2877,7 +2891,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str());
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kCompaction
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}

View File

@ -36,6 +36,62 @@ uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
return std::numeric_limits<uint64_t>::max();
}
Status DBImpl::DisableFileDeletions() {
InstrumentedMutexLock l(&mutex_);
return DisableFileDeletionsWithLock();
}
Status DBImpl::DisableFileDeletionsWithLock() {
mutex_.AssertHeld();
++disable_delete_obsolete_files_;
if (disable_delete_obsolete_files_ == 1) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Disabled, but already disabled. Counter: %d",
disable_delete_obsolete_files_);
}
return Status::OK();
}
Status DBImpl::EnableFileDeletions(bool force) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
bool file_deletion_enabled = false;
{
InstrumentedMutexLock l(&mutex_);
if (force) {
// if force, we need to enable file deletions right away
disable_delete_obsolete_files_ = 0;
} else if (disable_delete_obsolete_files_ > 0) {
--disable_delete_obsolete_files_;
}
if (disable_delete_obsolete_files_ == 0) {
file_deletion_enabled = true;
FindObsoleteFiles(&job_context, true);
bg_cv_.SignalAll();
}
}
if (file_deletion_enabled) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Enable, but not really enabled. Counter: %d",
disable_delete_obsolete_files_);
}
job_context.Clean();
LogFlush(immutable_db_options_.info_log);
return Status::OK();
}
bool DBImpl::IsFileDeletionsEnabled() const {
return 0 == disable_delete_obsolete_files_;
}
// * Returns the list of live files in 'sst_live' and 'blob_live'.
// If it's doing full scan:
// * Returns the list of all files in the filesystem in

View File

@ -370,7 +370,30 @@ Status DBImpl::Recover(
}
std::string current_fname = CurrentFileName(dbname_);
s = env_->FileExists(current_fname);
// Path to any MANIFEST file in the db dir. It does not matter which one.
// Since best-efforts recovery ignores CURRENT file, existence of a
// MANIFEST indicates the recovery to recover existing db. If no MANIFEST
// can be found, a new db will be created.
std::string manifest_path;
if (!immutable_db_options_.best_efforts_recovery) {
s = env_->FileExists(current_fname);
} else {
s = Status::NotFound();
std::vector<std::string> files;
// No need to check return value
env_->GetChildren(dbname_, &files);
for (const std::string& file : files) {
uint64_t number = 0;
FileType type = kLogFile; // initialize
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
// Found MANIFEST (descriptor log), thus best-efforts recovery does
// not have to treat the db as empty.
s = Status::OK();
manifest_path = dbname_ + "/" + file;
break;
}
}
}
if (s.IsNotFound()) {
if (immutable_db_options_.create_if_missing) {
s = NewDB();
@ -398,14 +421,14 @@ Status DBImpl::Recover(
FileOptions customized_fs(file_options_);
customized_fs.use_direct_reads |=
immutable_db_options_.use_direct_io_for_flush_and_compaction;
s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
nullptr);
const std::string& fname =
manifest_path.empty() ? current_fname : manifest_path;
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
if (!s.ok()) {
std::string error_str = s.ToString();
// Check if unsupported Direct I/O is the root cause
customized_fs.use_direct_reads = false;
s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
nullptr);
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
if (s.ok()) {
return Status::InvalidArgument(
"Direct I/O is not supported by the specified DB.");

View File

@ -2990,10 +2990,11 @@ class ModelDB : public DB {
Status SyncWAL() override { return Status::OK(); }
#ifndef ROCKSDB_LITE
Status DisableFileDeletions() override { return Status::OK(); }
Status EnableFileDeletions(bool /*force*/) override { return Status::OK(); }
#ifndef ROCKSDB_LITE
Status GetLiveFiles(std::vector<std::string>&, uint64_t* /*size*/,
bool /*flush_memtable*/ = true) override {
return Status::OK();

View File

@ -86,6 +86,96 @@ TEST_F(DBTest2, OpenForReadOnlyWithColumnFamilies) {
// With create_if_missing false, there should not be a dir in the file system
ASSERT_NOK(env_->FileExists(dbname));
}
class TestReadOnlyWithCompressedCache
: public DBTestBase,
public testing::WithParamInterface<std::tuple<int, bool>> {
public:
TestReadOnlyWithCompressedCache()
: DBTestBase("/test_readonly_with_compressed_cache") {
max_open_files_ = std::get<0>(GetParam());
use_mmap_ = std::get<1>(GetParam());
}
int max_open_files_;
bool use_mmap_;
};
TEST_P(TestReadOnlyWithCompressedCache, ReadOnlyWithCompressedCache) {
if (use_mmap_ && !IsMemoryMappedAccessSupported()) {
return;
}
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("foo2", "barbarbarbarbarbarbarbar"));
ASSERT_OK(Flush());
DB* db_ptr = nullptr;
Options options = CurrentOptions();
options.allow_mmap_reads = use_mmap_;
options.max_open_files = max_open_files_;
options.compression = kSnappyCompression;
BlockBasedTableOptions table_options;
table_options.block_cache_compressed = NewLRUCache(8 * 1024 * 1024);
table_options.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.statistics = CreateDBStatistics();
ASSERT_OK(DB::OpenForReadOnly(options, dbname_, &db_ptr));
std::string v;
ASSERT_OK(db_ptr->Get(ReadOptions(), "foo", &v));
ASSERT_EQ("bar", v);
ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_COMPRESSED_HIT));
ASSERT_OK(db_ptr->Get(ReadOptions(), "foo", &v));
ASSERT_EQ("bar", v);
if (Snappy_Supported()) {
if (use_mmap_) {
ASSERT_EQ(0,
options.statistics->getTickerCount(BLOCK_CACHE_COMPRESSED_HIT));
} else {
ASSERT_EQ(1,
options.statistics->getTickerCount(BLOCK_CACHE_COMPRESSED_HIT));
}
}
delete db_ptr;
}
INSTANTIATE_TEST_CASE_P(TestReadOnlyWithCompressedCache,
TestReadOnlyWithCompressedCache,
::testing::Combine(::testing::Values(-1, 100),
::testing::Bool()));
class PartitionedIndexTestListener : public EventListener {
public:
void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
ASSERT_GT(info.table_properties.index_partitions, 1);
ASSERT_EQ(info.table_properties.index_key_is_user_key, 0);
}
};
TEST_F(DBTest2, PartitionedIndexUserToInternalKey) {
BlockBasedTableOptions table_options;
Options options = CurrentOptions();
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
PartitionedIndexTestListener* listener = new PartitionedIndexTestListener();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.listeners.emplace_back(listener);
std::vector<const Snapshot*> snapshots;
Reopen(options);
Random rnd(301);
for (int i = 0; i < 3000; i++) {
int j = i % 30;
std::string value = RandomString(&rnd, 10500);
ASSERT_OK(Put("keykey_" + std::to_string(j), value));
snapshots.push_back(db_->GetSnapshot());
}
Flush();
for (auto s : snapshots) {
db_->ReleaseSnapshot(s);
}
}
#endif // ROCKSDB_LITE
class PrefixFullBloomWithReverseComparator
@ -2897,6 +2987,94 @@ TEST_F(DBTest2, OptimizeForSmallDB) {
#endif // ROCKSDB_LITE
TEST_F(DBTest2, IterRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
{"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
});
// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, IterRaceFlush2) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
{"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
});
// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, IterRefreshRaceFlush) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"ArenaWrappedDBIter::Refresh:1", "DBTest2::IterRefreshRaceFlush:1"},
{"DBTest2::IterRefreshRaceFlush:2", "ArenaWrappedDBIter::Refresh:2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
});
// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Refresh();
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, GetRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));

View File

@ -51,9 +51,19 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kHardError},
// Errors during MANIFEST write
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, Status::SubCode::kNoSpace,
true),
Status::Severity::kHardError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kHardError},
};
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity>
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>,
Status::Severity>
DefaultErrorSeverityMap = {
// Errors during BG compaction
{std::make_tuple(BackgroundErrorReason::kCompaction,
@ -75,11 +85,11 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kCorruption, false),
Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kIOError, true),
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kIOError, false),
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
false),
Status::Severity::kNoError},
// Errors during Write
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
@ -94,30 +104,36 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, false),
Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, false),
Status::Severity::kFatalError},
};
std::map<std::tuple<BackgroundErrorReason, bool>, Status::Severity>
DefaultReasonMap = {
// Errors during BG compaction
{std::make_tuple(BackgroundErrorReason::kCompaction, true),
Status::Severity::kFatalError},
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kCompaction, false),
Status::Severity::kNoError},
Status::Severity::kNoError},
// Errors during BG flush
{std::make_tuple(BackgroundErrorReason::kFlush, true),
Status::Severity::kFatalError},
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kFlush, false),
Status::Severity::kNoError},
Status::Severity::kNoError},
// Errors during Write
{std::make_tuple(BackgroundErrorReason::kWriteCallback, true),
Status::Severity::kFatalError},
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kWriteCallback, false),
Status::Severity::kFatalError},
Status::Severity::kFatalError},
// Errors during Memtable update
{std::make_tuple(BackgroundErrorReason::kMemTable, true),
Status::Severity::kFatalError},
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kMemTable, false),
Status::Severity::kFatalError},
Status::Severity::kFatalError},
};
void ErrorHandler::CancelErrorRecovery() {
@ -247,6 +263,10 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
if (recovery_in_prog_ && recovery_error_.ok()) {
recovery_error_ = bg_io_err;
}
if (BackgroundErrorReason::kManifestWrite == reason) {
// Always returns ok
db_->DisableFileDeletionsWithLock();
}
Status new_bg_io_err = bg_io_err;
Status s;
if (bg_io_err.GetDataLoss()) {

View File

@ -798,7 +798,7 @@ bool InternalStats::HandleCurrentSuperVersionNumber(uint64_t* value,
bool InternalStats::HandleIsFileDeletionsEnabled(uint64_t* value, DBImpl* db,
Version* /*version*/) {
*value = db->IsFileDeletionsEnabled();
*value = db->IsFileDeletionsEnabled() ? 1 : 0;
return true;
}

View File

@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
// Compute the crc of the record type and the payload.
crc = crc32c::Extend(crc, ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage
TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
&crc);
EncodeFixed32(buf, crc);
// Write the header and the payload

View File

@ -766,7 +766,13 @@ static bool SaveValue(void* arg, const char* entry) {
}
return true;
}
default:
default: {
std::string msg("Unrecognized value type: " +
std::to_string(static_cast<int>(type)) + ". ");
msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) + ". ");
msg.append("seq: " + std::to_string(seq) + ".");
*(s->status) = Status::Corruption(msg.c_str());
}
assert(false);
return true;
}

View File

@ -470,7 +470,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
}
// this can release and reacquire the mutex.
vset->SetIOStatusOK();
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory);
*io_s = vset->io_status();

View File

@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler(
assert(version_set_ != nullptr);
}
Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id) {
Slice record;
std::string scratch;
assert(log_read_status);
assert(log_read_status->ok());
size_t recovered_edits = 0;
Status s = Initialize();
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
log_read_status->ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
}
}
}
if (!log_read_status->ok()) {
s = *log_read_status;
}
CheckIterationResult(reader, &s);
if (!s.ok()) {
status_ = s;
}
return s;
}
Status VersionEditHandler::Initialize() {
@ -404,13 +411,12 @@ Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
bool is_initial_load) {
assert(cfd != nullptr);
assert(!cfd->IsDropped());
Status s;
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
assert(builder_iter->second != nullptr);
VersionBuilder* builder = builder_iter->second->version_builder();
assert(builder);
s = builder->LoadTableHandlers(
Status s = builder->LoadTableHandlers(
cfd->internal_stats(),
version_set_->db_options_->max_file_opening_threads,
prefetch_index_and_filter_in_cache, is_initial_load,
@ -551,6 +557,8 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
missing_files.insert(file_num);
s = Status::OK();
} else if (!s.ok()) {
break;
}
}
bool missing_info = !version_edit_params_.has_log_number_ ||
@ -558,8 +566,9 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
!version_edit_params_.has_last_sequence_;
// Create version before apply edit
if (!missing_info && ((!missing_files.empty() && !prev_has_missing_files) ||
(missing_files.empty() && force_create_version))) {
if (s.ok() && !missing_info &&
((!missing_files.empty() && !prev_has_missing_files) ||
(missing_files.empty() && force_create_version))) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();

View File

@ -40,7 +40,8 @@ class VersionEditHandler {
virtual ~VersionEditHandler() {}
Status Iterate(log::Reader& reader, std::string* db_id);
void Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id);
const Status& status() const { return status_; }

View File

@ -10,6 +10,7 @@
#include "db/version_set.h"
#include <stdio.h>
#include <algorithm>
#include <array>
#include <cinttypes>
@ -19,6 +20,7 @@
#include <string>
#include <unordered_map>
#include <vector>
#include "compaction/compaction.h"
#include "db/internal_stats.h"
#include "db/log_reader.h"
@ -50,6 +52,7 @@
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
@ -3875,10 +3878,6 @@ Status VersionSet::ProcessManifestWrites(
}
#endif // NDEBUG
uint64_t new_manifest_file_size = 0;
Status s;
IOStatus io_s;
assert(pending_manifest_file_number_ == 0);
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) {
@ -3908,6 +3907,9 @@ Status VersionSet::ProcessManifestWrites(
}
}
uint64_t new_manifest_file_size = 0;
Status s;
IOStatus io_s;
{
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
mu->Unlock();
@ -3944,9 +3946,9 @@ Status VersionSet::ProcessManifestWrites(
std::string descriptor_fname =
DescriptorFileName(dbname_, pending_manifest_file_number_);
std::unique_ptr<FSWritableFile> descriptor_file;
s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
opt_file_opts);
if (s.ok()) {
io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
opt_file_opts);
if (io_s.ok()) {
descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size);
@ -3955,7 +3957,10 @@ Status VersionSet::ProcessManifestWrites(
nullptr, db_options_->listeners));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(),
io_s);
} else {
s = io_s;
}
}
@ -3991,16 +3996,16 @@ Status VersionSet::ProcessManifestWrites(
#endif /* !NDEBUG */
io_s = descriptor_log_->AddRecord(record);
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
break;
}
}
if (s.ok()) {
io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
}
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str());
@ -4013,7 +4018,6 @@ Status VersionSet::ProcessManifestWrites(
io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
db_directory);
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
}
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
@ -4035,6 +4039,14 @@ Status VersionSet::ProcessManifestWrites(
mu->Lock();
}
if (!io_s.ok()) {
if (io_status_.ok()) {
io_status_ = io_s;
}
} else if (!io_status_.ok()) {
io_status_ = io_s;
}
// Append the old manifest file to the obsolete_manifest_ list to be deleted
// by PurgeObsoleteFiles later.
if (s.ok() && new_descriptor_log) {
@ -4444,24 +4456,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
if (dbname.back() != '/') {
manifest_path->push_back('/');
}
*manifest_path += fname;
manifest_path->append(fname);
return Status::OK();
}
Status VersionSet::ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
builders,
VersionEditParams* version_edit_params, std::string* db_id) {
assert(reader != nullptr);
Status* log_read_status, VersionEditParams* version_edit_params,
std::string* db_id) {
assert(read_buffer != nullptr);
assert(log_read_status != nullptr);
Status s;
Slice record;
std::string scratch;
size_t recovered_edits = 0;
while (reader->ReadRecord(&record, &scratch) && s.ok()) {
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
log_read_status->ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
@ -4505,6 +4519,9 @@ Status VersionSet::ReadAndRecover(
}
}
}
if (!log_read_status->ok()) {
s = *log_read_status;
}
if (!s.ok()) {
// Clear the buffer if we fail to decode/apply an edit.
read_buffer->Clear();
@ -4551,8 +4568,7 @@ Status VersionSet::Recover(
db_options_->log_readahead_size));
}
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
VersionBuilderMap builders;
// add default column family
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
@ -4574,12 +4590,13 @@ Status VersionSet::Recover(
VersionEditParams version_edit_params;
{
VersionSet::LogReporter reporter;
reporter.status = &s;
Status log_read_status;
reporter.status = &log_read_status;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
AtomicGroupReadBuffer read_buffer;
s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
column_families_not_found, builders,
s = ReadAndRecover(reader, &read_buffer, cf_name_to_options,
column_families_not_found, builders, &log_read_status,
&version_edit_params, db_id);
current_manifest_file_size = reader.GetReadOffset();
assert(current_manifest_file_size != 0);
@ -4845,21 +4862,20 @@ Status VersionSet::TryRecoverFromOneManifest(
db_options_->log_readahead_size));
}
assert(s.ok());
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
/*checksum=*/true, /*log_num=*/0);
{
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
const_cast<VersionSet*>(this));
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
const_cast<VersionSet*>(this));
s = handler_pit.Iterate(reader, db_id);
handler_pit.Iterate(reader, &s, db_id);
assert(nullptr != has_missing_table_file);
*has_missing_table_file = handler_pit.HasMissingFiles();
}
assert(nullptr != has_missing_table_file);
*has_missing_table_file = handler_pit.HasMissingFiles();
return s;
return handler_pit.status();
}
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
@ -5284,7 +5300,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
Status VersionSet::WriteCurrentStateToManifest(
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
log::Writer* log) {
log::Writer* log, IOStatus& io_s) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
// WARNING: This method doesn't hold a mutex!!
@ -5293,6 +5309,7 @@ Status VersionSet::WriteCurrentStateToManifest(
// LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe to iterate.
assert(io_s.ok());
if (db_options_->write_dbid_to_manifest) {
VersionEdit edit_for_db_id;
assert(!db_id_.empty());
@ -5302,10 +5319,9 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption("Unable to Encode VersionEdit:" +
edit_for_db_id.DebugString(true));
}
IOStatus io_s = log->AddRecord(db_id_record);
io_s = log->AddRecord(db_id_record);
if (!io_s.ok()) {
io_status_ = io_s;
return std::move(io_s);
return io_s;
}
}
@ -5332,10 +5348,9 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
IOStatus io_s = log->AddRecord(record);
io_s = log->AddRecord(record);
if (!io_s.ok()) {
io_status_ = io_s;
return std::move(io_s);
return io_s;
}
}
@ -5385,10 +5400,9 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
IOStatus io_s = log->AddRecord(record);
io_s = log->AddRecord(record);
if (!io_s.ok()) {
io_status_ = io_s;
return std::move(io_s);
return io_s;
}
}
}
@ -5980,8 +5994,7 @@ Status ReactiveVersionSet::Recover(
// In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier.
default_cfd->set_initialized();
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
VersionBuilderMap builders;
std::unordered_map<int, std::string> column_families_not_found;
builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
@ -5989,7 +6002,7 @@ Status ReactiveVersionSet::Recover(
manifest_reader_status->reset(new Status());
manifest_reporter->reset(new LogReporter());
static_cast<LogReporter*>(manifest_reporter->get())->status =
static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
manifest_reader_status->get();
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
log::Reader* reader = manifest_reader->get();
@ -5998,10 +6011,9 @@ Status ReactiveVersionSet::Recover(
VersionEdit version_edit;
while (s.ok() && retry < 1) {
assert(reader != nullptr);
Slice record;
std::string scratch;
s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
column_families_not_found, builders, &version_edit);
s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
column_families_not_found, builders,
manifest_reader_status->get(), &version_edit);
if (s.ok()) {
bool enough = version_edit.has_next_file_number_ &&
version_edit.has_log_number_ &&

View File

@ -1159,12 +1159,13 @@ class VersionSet {
static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
// Get the IO Status returned by written Manifest.
IOStatus io_status() const { return io_status_; }
// Set the IO Status to OK. Called before Manifest write if needed.
void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
const IOStatus& io_status() const { return io_status_; }
protected:
using VersionBuilderMap =
std::unordered_map<uint32_t,
std::unique_ptr<BaseReferencedVersionBuilder>>;
struct ManifestWriter;
friend class Version;
@ -1176,7 +1177,9 @@ class VersionSet {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t /*bytes*/, const Status& s) override {
if (this->status->ok()) *this->status = s;
if (status->ok()) {
*status = s;
}
}
};
@ -1199,7 +1202,7 @@ class VersionSet {
// Save current contents to *log
Status WriteCurrentStateToManifest(
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
log::Writer* log);
log::Writer* log, IOStatus& io_s);
void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
@ -1207,13 +1210,14 @@ class VersionSet {
const VersionEdit* edit);
Status ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>&
name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
VersionEditParams* version_edit, std::string* db_id = nullptr);
Status* log_read_status, VersionEditParams* version_edit,
std::string* db_id = nullptr);
// REQUIRES db mutex
Status ApplyOneVersionEditToBuilder(
@ -1342,8 +1346,7 @@ class ReactiveVersionSet : public VersionSet {
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
private:
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
active_version_builders_;
VersionBuilderMap active_version_builders_;
AtomicGroupReadBuffer read_buffer_;
// Number of version edits to skip by ReadAndApply at the beginning of a new
// MANIFEST created by primary.

View File

@ -1266,8 +1266,6 @@ class DB {
// updated, false if user attempted to call if with seqnum <= current value.
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) = 0;
#ifndef ROCKSDB_LITE
// Prevent file deletions. Compactions will continue to occur,
// but no obsolete files will be deleted. Calling this multiple
// times have the same effect as calling it once.
@ -1284,6 +1282,7 @@ class DB {
// threads call EnableFileDeletions()
virtual Status EnableFileDeletions(bool force = true) = 0;
#ifndef ROCKSDB_LITE
// GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup
// Retrieve the list of all files in the database. The files are

View File

@ -117,6 +117,7 @@ enum class BackgroundErrorReason {
kCompaction,
kWriteCallback,
kMemTable,
kManifestWrite,
};
enum class WriteStallCondition {

View File

@ -5,8 +5,8 @@
#pragma once
#define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 10
#define ROCKSDB_PATCH 0
#define ROCKSDB_MINOR 11
#define ROCKSDB_PATCH 7
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -647,7 +647,12 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
"block_size_deviation=8;block_restart_interval=4;"
"format_version=5;whole_key_filtering=1;"
"filter_policy=bloomfilter:4.567:false;",
"filter_policy=bloomfilter:4.567:false;"
// A bug caused read_amp_bytes_per_bit to be a large integer in OPTIONS
// file generated by 6.10 to 6.14. Though bug is fixed in these releases,
// we need to handle the case of loading OPTIONS file generated before the
// fix.
"read_amp_bytes_per_bit=17179869185;",
&new_opt));
ASSERT_TRUE(new_opt.cache_index_and_filter_blocks);
ASSERT_EQ(new_opt.index_type, BlockBasedTableOptions::kHashSearch);
@ -668,6 +673,9 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
dynamic_cast<const BloomFilterPolicy&>(*new_opt.filter_policy);
EXPECT_EQ(bfp.GetMillibitsPerKey(), 4567);
EXPECT_EQ(bfp.GetWholeBitsPerKey(), 5);
// Verify that only the lower 32bits are stored in
// new_opt.read_amp_bytes_per_bit.
EXPECT_EQ(1U, new_opt.read_amp_bytes_per_bit);
// unknown option
ASSERT_NOK(GetBlockBasedTableOptionsFromString(
@ -1279,7 +1287,7 @@ TEST_F(OptionsTest, ConvertOptionsTest) {
// This test suite tests the old APIs into the Configure options methods.
// Once those APIs are officially deprecated, this test suite can be deleted.
class OptionsOldApiTest : public testing::Test {};
TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
std::unordered_map<std::string, std::string> cf_options_map = {
{"write_buffer_size", "1"},
@ -1744,7 +1752,7 @@ TEST_F(OptionsOldApiTest, GetColumnFamilyOptionsFromStringTest) {
ASSERT_TRUE(new_cf_opt.memtable_factory != nullptr);
ASSERT_EQ(std::string(new_cf_opt.memtable_factory->Name()), "SkipListFactory");
}
TEST_F(OptionsOldApiTest, GetBlockBasedTableOptionsFromString) {
BlockBasedTableOptions table_opt;
BlockBasedTableOptions new_opt;
@ -1919,7 +1927,7 @@ TEST_F(OptionsOldApiTest, GetBlockBasedTableOptionsFromString) {
->GetHighPriPoolRatio(),
0.5);
}
TEST_F(OptionsOldApiTest, GetPlainTableOptionsFromString) {
PlainTableOptions table_opt;
PlainTableOptions new_opt;
@ -1950,7 +1958,7 @@ TEST_F(OptionsOldApiTest, GetPlainTableOptionsFromString) {
"encoding_type=kPrefixXX",
&new_opt));
}
TEST_F(OptionsOldApiTest, GetOptionsFromStringTest) {
Options base_options, new_options;
base_options.write_buffer_size = 20;
@ -2508,7 +2516,7 @@ TEST_F(OptionsParserTest, Readahead) {
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(kOptionsFileName, &file_size));
assert(file_size > 0);
RocksDBOptionsParser parser;
env_->num_seq_file_read_ = 0;

View File

@ -327,8 +327,24 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionTypeFlags::kNone, 0}},
{"read_amp_bytes_per_bit",
{offsetof(struct BlockBasedTableOptions, read_amp_bytes_per_bit),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone, 0}},
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone, 0,
[](const ConfigOptions& /*opts*/, const std::string& /*name*/,
const std::string& value, char* addr) {
// A workaround to fix a bug in 6.10, 6.11, 6.12, 6.13
// and 6.14. The bug will write out 8 bytes to OPTIONS file from the
// starting address of BlockBasedTableOptions.read_amp_bytes_per_bit
// which is actually a uint32. Consequently, the value of
// read_amp_bytes_per_bit written in the OPTIONS file is wrong.
// From 6.15, RocksDB will try to parse the read_amp_bytes_per_bit
// from OPTIONS file as a uint32. To be able to load OPTIONS file
// generated by affected releases before the fix, we need to
// manually parse read_amp_bytes_per_bit with this special hack.
uint64_t read_amp_bytes_per_bit = ParseUint64(value);
*(reinterpret_cast<uint32_t*>(addr)) =
static_cast<uint32_t>(read_amp_bytes_per_bit);
return Status::OK();
}}},
{"enable_index_compression",
{offsetof(struct BlockBasedTableOptions, enable_index_compression),
OptionType::kBoolean, OptionVerificationType::kNormal,

View File

@ -1426,10 +1426,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
assert(block_entry != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep_->table_options.block_cache.get();
// No point to cache compressed blocks if it never goes away
Cache* block_cache_compressed =
rep_->immortal_table ? nullptr
: rep_->table_options.block_cache_compressed.get();
rep_->table_options.block_cache_compressed.get();
// First, try to get the block from the cache
//

View File

@ -104,6 +104,15 @@ void PartitionedIndexBuilder::MakeNewSubIndexBuilder() {
comparator_, table_opt_.index_block_restart_interval,
table_opt_.format_version, use_value_delta_encoding_,
table_opt_.index_shortening, /* include_first_key */ false);
// Set sub_index_builder_->seperator_is_key_plus_seq_ to true if
// seperator_is_key_plus_seq_ is true (internal-key mode) (set to false by
// default on Creation) so that flush policy can point to
// sub_index_builder_->index_block_builder_
if (seperator_is_key_plus_seq_) {
sub_index_builder_->seperator_is_key_plus_seq_ = true;
}
flush_policy_.reset(FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
table_opt_.metadata_block_size, table_opt_.block_size_deviation,
// Note: this is sub-optimal since sub_index_builder_ could later reset
@ -129,9 +138,15 @@ void PartitionedIndexBuilder::AddIndexEntry(
}
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
first_key_in_next_block, block_handle);
if (sub_index_builder_->seperator_is_key_plus_seq_) {
// then we need to apply it to all sub-index builders
if (!seperator_is_key_plus_seq_ &&
sub_index_builder_->seperator_is_key_plus_seq_) {
// then we need to apply it to all sub-index builders and reset
// flush_policy to point to Block Builder of sub_index_builder_ that store
// internal keys.
seperator_is_key_plus_seq_ = true;
flush_policy_.reset(FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
table_opt_.metadata_block_size, table_opt_.block_size_deviation,
sub_index_builder_->index_block_builder_));
}
sub_index_last_key_ = std::string(*last_key_in_current_block);
entries_.push_back(
@ -161,9 +176,15 @@ void PartitionedIndexBuilder::AddIndexEntry(
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
first_key_in_next_block, block_handle);
sub_index_last_key_ = std::string(*last_key_in_current_block);
if (sub_index_builder_->seperator_is_key_plus_seq_) {
// then we need to apply it to all sub-index builders
if (!seperator_is_key_plus_seq_ &&
sub_index_builder_->seperator_is_key_plus_seq_) {
// then we need to apply it to all sub-index builders and reset
// flush_policy to point to Block Builder of sub_index_builder_ that store
// internal keys.
seperator_is_key_plus_seq_ = true;
flush_policy_.reset(FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
table_opt_.metadata_block_size, table_opt_.block_size_deviation,
sub_index_builder_->index_block_builder_));
}
}
}

View File

@ -167,7 +167,7 @@ void PartitionIndexReader::CacheDependencies(bool pin) {
assert(s.ok() || block.GetValue() == nullptr);
if (s.ok() && block.GetValue() != nullptr) {
if (block.IsCached()) {
if (block.IsCached() || block.GetOwnValue()) {
if (pin) {
partition_map_[handle.offset()] = std::move(block);
}

View File

@ -142,6 +142,11 @@ class IteratorWrapperBase {
return result_.value_prepared;
}
Slice user_key() const {
assert(Valid());
return iter_->user_key();
}
private:
void Update() {
valid_ = iter_->Valid();

View File

@ -43,6 +43,10 @@ class TwoLevelIndexIterator : public InternalIteratorBase<IndexValue> {
assert(Valid());
return second_level_iter_.key();
}
Slice user_key() const override {
assert(Valid());
return second_level_iter_.user_key();
}
IndexValue value() const override {
assert(Valid());
return second_level_iter_.value();

View File

@ -8,8 +8,10 @@
#ifndef ROCKSDB_LITE
#include <functional>
#include <limits>
#include <string>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/stackable_db.h"
@ -24,6 +26,8 @@ namespace blob_db {
// The factory needs to be moved to include/rocksdb/utilities to allow
// users to use blob DB.
constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
struct BlobDBOptions {
// Name of the directory under the base DB where blobs will be stored. Using
// a directory where the base DB stores its SST files is not supported.