Compare commits
28 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
c509dd3249 | ||
|
19c8025386 | ||
|
63462c3a94 | ||
|
62f110f527 | ||
|
878b3a41de | ||
|
f36f44c5e6 | ||
|
8d70e53114 | ||
|
5a4b8005a7 | ||
|
4cd964946a | ||
|
48bfca38f6 | ||
|
c8b9556cb6 | ||
|
30bfa2a44e | ||
|
3e1eb99ab7 | ||
|
3514cf51fa | ||
|
dd63f04c83 | ||
|
115c9113ca | ||
|
07e2ca100a | ||
|
9912ebe1b5 | ||
|
0fdd019f62 | ||
|
43107e0a9a | ||
|
c0a8d89c27 | ||
|
85189fd64f | ||
|
1910560c2c | ||
|
61cc9ef76f | ||
|
f5d4dbbeef | ||
|
84dcfe1a5f | ||
|
a818699f2f | ||
|
97a69f4372 |
34
HISTORY.md
34
HISTORY.md
@ -1,4 +1,38 @@
|
||||
# Rocksdb Change Log
|
||||
## 6.11.7 (11/15/2020)
|
||||
### Bug Fixes
|
||||
* Fix a bug of encoding and parsing BlockBasedTableOptions::read_amp_bytes_per_bit as a 64-bit integer.
|
||||
* Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected.
|
||||
|
||||
## 6.11.6 (10/12/2020)
|
||||
### Bug Fixes
|
||||
* Fixed a bug in the following combination of features: indexes with user keys (`format_version >= 3`), indexes are partitioned (`index_type == kTwoLevelIndexSearch`), and some index partitions are pinned in memory (`BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache`). The bug could cause keys to be truncated when read from the index leading to wrong read results or other unexpected behavior.
|
||||
* Fixed a bug when indexes are partitioned (`index_type == kTwoLevelIndexSearch`), some index partitions are pinned in memory (`BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache`), and partitions reads could be mixed between block cache and directly from the file (e.g., with `enable_index_compression == 1` and `mmap_read == 1`, partitions that were stored uncompressed due to poor compression ratio would be read directly from the file via mmap, while partitions that were stored compressed would be read from block cache). The bug could cause index partitions to be mistakenly considered empty during reads leading to wrong read results.
|
||||
|
||||
## 6.11.5 (7/23/2020)
|
||||
### Bug Fixes
|
||||
* Memtable lookup should report unrecognized value_type as corruption (#7121).
|
||||
|
||||
## 6.11.4 (7/15/2020)
|
||||
### Bug Fixes
|
||||
* Make compaction report InternalKey corruption while iterating over the input.
|
||||
|
||||
## 6.11.3 (7/9/2020)
|
||||
### Bug Fixes
|
||||
* Fix a bug when index_type == kTwoLevelIndexSearch in PartitionedIndexBuilder to update FlushPolicy to point to internal key partitioner when it changes from user-key mode to internal-key mode in index partition.
|
||||
* Disable file deletion after MANIFEST write/sync failure until db re-open or Resume() so that subsequent re-open will not see MANIFEST referencing deleted SSTs.
|
||||
|
||||
## 6.11.1 (6/23/2020)
|
||||
### Bug Fixes
|
||||
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
|
||||
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
|
||||
* Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too.
|
||||
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
|
||||
* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement.
|
||||
|
||||
### Public API Change
|
||||
* `DB::OpenForReadOnly()` now returns `Status::NotFound` when the specified DB directory does not exist. Previously the error returned depended on the underlying `Env`.
|
||||
|
||||
## 6.11 (6/12/2020)
|
||||
### Bug Fixes
|
||||
* Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true.
|
||||
|
4
Makefile
4
Makefile
@ -9,7 +9,9 @@
|
||||
BASH_EXISTS := $(shell which bash)
|
||||
SHELL := $(shell which bash)
|
||||
# Default to python3. Some distros like CentOS 8 do not have `python`.
|
||||
PYTHON?=$(shell which python3 || which python || echo python3)
|
||||
ifeq ($(origin PYTHON), undefined)
|
||||
PYTHON := $(shell which python3 || which python || echo python3)
|
||||
endif
|
||||
export PYTHON
|
||||
|
||||
CLEAN_FILES = # deliberately empty, so we can append below.
|
||||
|
28
TARGETS
28
TARGETS
@ -447,6 +447,27 @@ cpp_library(
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
)
|
||||
|
||||
if not is_opt_mode:
|
||||
cpp_binary(
|
||||
name = "c_test_bin",
|
||||
srcs = ["db/c_test.c"],
|
||||
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [":rocksdb_test_lib"],
|
||||
)
|
||||
|
||||
if not is_opt_mode:
|
||||
custom_unittest(
|
||||
"c_test",
|
||||
command = [
|
||||
native.package_name() + "/buckifier/rocks_test_runner.sh",
|
||||
"$(location :{})".format("c_test_bin"),
|
||||
],
|
||||
type = "simple",
|
||||
)
|
||||
|
||||
cpp_library(
|
||||
name = "env_basic_test_lib",
|
||||
srcs = ["env/env_basic_test.cc"],
|
||||
@ -560,13 +581,6 @@ ROCKS_TESTS = [
|
||||
[],
|
||||
[],
|
||||
],
|
||||
[
|
||||
"c_test",
|
||||
"db/c_test.c",
|
||||
"serial",
|
||||
[],
|
||||
[],
|
||||
],
|
||||
[
|
||||
"cache_simulator_test",
|
||||
"utilities/simulator_cache/cache_simulator_test.cc",
|
||||
|
@ -64,8 +64,6 @@ def get_cc_files(repo_path):
|
||||
continue
|
||||
for filename in fnmatch.filter(filenames, '*.cc'):
|
||||
cc_files.append(os.path.join(root, filename))
|
||||
for filename in fnmatch.filter(filenames, '*.c'):
|
||||
cc_files.append(os.path.join(root, filename))
|
||||
return cc_files
|
||||
|
||||
|
||||
@ -178,10 +176,18 @@ def generate_targets(repo_path, deps_map):
|
||||
+ ["test_util/testutil.cc"])
|
||||
|
||||
print("Extra dependencies:\n{0}".format(json.dumps(deps_map)))
|
||||
# test for every test we found in the Makefile
|
||||
|
||||
# c_test.c is added through TARGETS.add_c_test(). If there
|
||||
# are more than one .c test file, we need to extend
|
||||
# TARGETS.add_c_test() to include other C tests too.
|
||||
TARGETS.add_c_test()
|
||||
|
||||
# test for every .cc test we found in the Makefile
|
||||
for target_alias, deps in deps_map.items():
|
||||
for test in sorted(tests):
|
||||
match_src = [src for src in cc_files if ("/%s.c" % test) in src]
|
||||
if test == 'c_test':
|
||||
continue
|
||||
match_src = [src for src in cc_files if ("/%s.cc" % test) in src]
|
||||
if len(match_src) == 0:
|
||||
print(ColorString.warning("Cannot find .cc file for %s" % test))
|
||||
continue
|
||||
|
@ -76,6 +76,30 @@ class TARGETSBuilder(object):
|
||||
pretty_list(deps)))
|
||||
self.total_bin = self.total_bin + 1
|
||||
|
||||
def add_c_test(self):
|
||||
self.targets_file.write("""
|
||||
if not is_opt_mode:
|
||||
cpp_binary(
|
||||
name = "c_test_bin",
|
||||
srcs = ["db/c_test.c"],
|
||||
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [":rocksdb_test_lib"],
|
||||
)
|
||||
|
||||
if not is_opt_mode:
|
||||
custom_unittest(
|
||||
"c_test",
|
||||
command = [
|
||||
native.package_name() + "/buckifier/rocks_test_runner.sh",
|
||||
"$(location :{})".format("c_test_bin"),
|
||||
],
|
||||
type = "simple",
|
||||
)
|
||||
""")
|
||||
|
||||
def register_test(self,
|
||||
test_name,
|
||||
src,
|
||||
|
@ -56,8 +56,9 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
|
||||
// correct behavior. Will be corrected automatically when we take a snapshot
|
||||
// here for the case of WritePreparedTxnDB.
|
||||
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
|
||||
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
|
||||
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
|
||||
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
|
||||
if (sv_number_ != cur_sv_number) {
|
||||
Env* env = db_iter_->env();
|
||||
db_iter_->~DBIter();
|
||||
@ -65,6 +66,7 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
new (&arena_) Arena();
|
||||
|
||||
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
|
||||
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
|
||||
if (read_callback_) {
|
||||
read_callback_->Refresh(latest_seq);
|
||||
}
|
||||
@ -78,7 +80,7 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
latest_seq, /* allow_unprepared_value */ true);
|
||||
SetIterUnderDBIter(internal_iter);
|
||||
} else {
|
||||
db_iter_->set_sequence(latest_seq);
|
||||
db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber());
|
||||
db_iter_->set_valid(false);
|
||||
}
|
||||
return Status::OK();
|
||||
|
@ -9,7 +9,6 @@
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
@ -23,7 +22,6 @@ namespace blob_db {
|
||||
|
||||
constexpr uint32_t kMagicNumber = 2395959; // 0x00248f37
|
||||
constexpr uint32_t kVersion1 = 1;
|
||||
constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
using ExpirationRange = std::pair<uint64_t, uint64_t>;
|
||||
|
||||
|
@ -246,6 +246,7 @@ void CompactionIterator::NextFromInput() {
|
||||
iter_stats_.num_input_records++;
|
||||
|
||||
if (!ParseInternalKey(key_, &ikey_)) {
|
||||
iter_stats_.num_input_corrupt_records++;
|
||||
// If `expect_valid_internal_key_` is false, return the corrupted key
|
||||
// and let the caller decide what to do with it.
|
||||
// TODO(noetzli): We should have a more elegant solution for this.
|
||||
@ -258,7 +259,6 @@ void CompactionIterator::NextFromInput() {
|
||||
has_current_user_key_ = false;
|
||||
current_user_key_sequence_ = kMaxSequenceNumber;
|
||||
current_user_key_snapshot_ = 0;
|
||||
iter_stats_.num_input_corrupt_records++;
|
||||
valid_ = true;
|
||||
break;
|
||||
}
|
||||
|
@ -721,7 +721,6 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
|
||||
cfd->internal_stats()->AddCompactionStats(
|
||||
compact_->compaction->output_level(), thread_pri_, compaction_stats_);
|
||||
|
||||
versions_->SetIOStatusOK();
|
||||
if (status.ok()) {
|
||||
status = InstallCompactionResults(mutable_cf_options);
|
||||
}
|
||||
@ -896,9 +895,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
sub_compact->c_iter.reset(new CompactionIterator(
|
||||
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
|
||||
&existing_snapshots_, earliest_write_conflict_snapshot_,
|
||||
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
|
||||
&range_del_agg, sub_compact->compaction, compaction_filter,
|
||||
shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_,
|
||||
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
|
||||
/*expect_valid_internal_key=*/true, &range_del_agg,
|
||||
sub_compact->compaction, compaction_filter, shutting_down_,
|
||||
preserve_deletes_seqnum_, manual_compaction_paused_,
|
||||
db_options_.info_log));
|
||||
auto c_iter = sub_compact->c_iter.get();
|
||||
c_iter->SeekToFirst();
|
||||
|
@ -395,7 +395,7 @@ TEST_F(CompactionJobTest, Simple) {
|
||||
RunCompaction({ files }, expected_results);
|
||||
}
|
||||
|
||||
TEST_F(CompactionJobTest, SimpleCorrupted) {
|
||||
TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) {
|
||||
NewDB();
|
||||
|
||||
auto expected_results = CreateTwoFiles(true);
|
||||
@ -989,7 +989,7 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
|
||||
// single deletion and the (single) deletion gets removed while the corrupt key
|
||||
// gets written out. TODO(noetzli): We probably want a better way to treat
|
||||
// corrupt keys.
|
||||
TEST_F(CompactionJobTest, CorruptionAfterDeletion) {
|
||||
TEST_F(CompactionJobTest, DISABLED_CorruptionAfterDeletion) {
|
||||
NewDB();
|
||||
|
||||
auto file1 =
|
||||
|
@ -2199,6 +2199,35 @@ TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, RecoverWithNoManifest) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
Close();
|
||||
{
|
||||
// Delete all MANIFEST.
|
||||
std::vector<std::string> files;
|
||||
ASSERT_OK(env_->GetChildren(dbname_, &files));
|
||||
for (const auto& file : files) {
|
||||
uint64_t number = 0;
|
||||
FileType type = kLogFile;
|
||||
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
|
||||
ASSERT_OK(env_->DeleteFile(dbname_ + "/" + file));
|
||||
}
|
||||
}
|
||||
}
|
||||
options.best_efforts_recovery = true;
|
||||
options.create_if_missing = false;
|
||||
Status s = TryReopen(options);
|
||||
ASSERT_TRUE(s.IsInvalidArgument());
|
||||
options.create_if_missing = true;
|
||||
Reopen(options);
|
||||
// Since no MANIFEST exists, best-efforts recovery creates a new, empty db.
|
||||
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
@ -2239,6 +2268,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBBasicTest, ManifestChecksumMismatch) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("bar", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
|
||||
auto* crc = reinterpret_cast<uint32_t*>(arg);
|
||||
*crc = *crc + 1;
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions write_opts;
|
||||
write_opts.disableWAL = true;
|
||||
Status s = db_->Put(write_opts, "foo", "value");
|
||||
ASSERT_OK(s);
|
||||
ASSERT_OK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
ASSERT_OK(Put("foo", "value1"));
|
||||
ASSERT_OK(Flush());
|
||||
s = TryReopen(options);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
}
|
||||
|
||||
class DBBasicTestMultiGet : public DBTestBase {
|
||||
public:
|
||||
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
|
||||
@ -2967,6 +3023,33 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, ManifestWriteFailure) {
|
||||
Options options = GetDefaultOptions();
|
||||
options.create_if_missing = true;
|
||||
options.disable_auto_compactions = true;
|
||||
options.env = env_;
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
|
||||
ASSERT_NE(nullptr, arg);
|
||||
auto* s = reinterpret_cast<Status*>(arg);
|
||||
ASSERT_OK(*s);
|
||||
// Manually overwrite return status
|
||||
*s = Status::IOError();
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
ASSERT_OK(Put("key", "value"));
|
||||
ASSERT_NOK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
Reopen(options);
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
@ -23,57 +23,6 @@
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
Status DBImpl::DisableFileDeletions() {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
++disable_delete_obsolete_files_;
|
||||
if (disable_delete_obsolete_files_ == 1) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
|
||||
} else {
|
||||
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
||||
"File Deletions Disabled, but already disabled. Counter: %d",
|
||||
disable_delete_obsolete_files_);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBImpl::EnableFileDeletions(bool force) {
|
||||
// Job id == 0 means that this is not our background process, but rather
|
||||
// user thread
|
||||
JobContext job_context(0);
|
||||
bool file_deletion_enabled = false;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
if (force) {
|
||||
// if force, we need to enable file deletions right away
|
||||
disable_delete_obsolete_files_ = 0;
|
||||
} else if (disable_delete_obsolete_files_ > 0) {
|
||||
--disable_delete_obsolete_files_;
|
||||
}
|
||||
if (disable_delete_obsolete_files_ == 0) {
|
||||
file_deletion_enabled = true;
|
||||
FindObsoleteFiles(&job_context, true);
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
}
|
||||
if (file_deletion_enabled) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
|
||||
if (job_context.HaveSomethingToDelete()) {
|
||||
PurgeObsoleteFiles(job_context);
|
||||
}
|
||||
} else {
|
||||
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
||||
"File Deletions Enable, but not really enabled. Counter: %d",
|
||||
disable_delete_obsolete_files_);
|
||||
}
|
||||
job_context.Clean();
|
||||
LogFlush(immutable_db_options_.info_log);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int DBImpl::IsFileDeletionsEnabled() const {
|
||||
return !disable_delete_obsolete_files_;
|
||||
}
|
||||
|
||||
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
||||
uint64_t* manifest_file_size,
|
||||
bool flush_memtable) {
|
||||
|
@ -312,8 +312,36 @@ Status DBImpl::ResumeImpl() {
|
||||
}
|
||||
|
||||
// Make sure the IO Status stored in version set is set to OK.
|
||||
bool file_deletion_disabled = !IsFileDeletionsEnabled();
|
||||
if (s.ok()) {
|
||||
versions_->SetIOStatusOK();
|
||||
IOStatus io_s = versions_->io_status();
|
||||
if (io_s.IsIOError()) {
|
||||
// If resuming from IOError resulted from MANIFEST write, then assert
|
||||
// that we must have already set the MANIFEST writer to nullptr during
|
||||
// clean-up phase MANIFEST writing. We must have also disabled file
|
||||
// deletions.
|
||||
assert(!versions_->descriptor_log_);
|
||||
assert(file_deletion_disabled);
|
||||
// Since we are trying to recover from MANIFEST write error, we need to
|
||||
// switch to a new MANIFEST anyway. The old MANIFEST can be corrupted.
|
||||
// Therefore, force writing a dummy version edit because we do not know
|
||||
// whether there are flush jobs with non-empty data to flush, triggering
|
||||
// appends to MANIFEST.
|
||||
VersionEdit edit;
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(default_cf_handle_);
|
||||
assert(cfh);
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions();
|
||||
s = versions_->LogAndApply(cfd, cf_opts, &edit, &mutex_,
|
||||
directories_.GetDbDir());
|
||||
if (!s.ok()) {
|
||||
io_s = versions_->io_status();
|
||||
if (!io_s.ok()) {
|
||||
s = error_handler_.SetBGError(io_s,
|
||||
BackgroundErrorReason::kManifestWrite);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We cannot guarantee consistency of the WAL. So force flush Memtables of
|
||||
@ -364,6 +392,13 @@ Status DBImpl::ResumeImpl() {
|
||||
job_context.Clean();
|
||||
|
||||
if (s.ok()) {
|
||||
assert(versions_->io_status().ok());
|
||||
// If we reach here, we should re-enable file deletions if it was disabled
|
||||
// during previous error handling.
|
||||
if (file_deletion_disabled) {
|
||||
// Always return ok
|
||||
EnableFileDeletions(/*force=*/true);
|
||||
}
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
|
||||
}
|
||||
mutex_.Lock();
|
||||
@ -2670,7 +2705,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
||||
" guaranteed to be preserved, try larger iter_start_seqnum opt."));
|
||||
}
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
auto cfd = cfh->cfd();
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
assert(cfd != nullptr);
|
||||
ReadCallback* read_callback = nullptr; // No read callback provided.
|
||||
if (read_options.tailing) {
|
||||
#ifdef ROCKSDB_LITE
|
||||
@ -2691,10 +2727,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
||||
// Note: no need to consider the special case of
|
||||
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
|
||||
// WritePreparedTxnDB
|
||||
auto snapshot = read_options.snapshot != nullptr
|
||||
? read_options.snapshot->GetSequenceNumber()
|
||||
: versions_->LastSequence();
|
||||
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
|
||||
result = NewIteratorImpl(read_options, cfd,
|
||||
(read_options.snapshot != nullptr)
|
||||
? read_options.snapshot->GetSequenceNumber()
|
||||
: kMaxSequenceNumber,
|
||||
read_callback);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@ -2707,6 +2744,24 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
|
||||
bool allow_refresh) {
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:1");
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:2");
|
||||
|
||||
if (snapshot == kMaxSequenceNumber) {
|
||||
// Note that the snapshot is assigned AFTER referencing the super
|
||||
// version because otherwise a flush happening in between may compact away
|
||||
// data for the snapshot, so the reader would see neither data that was be
|
||||
// visible to the snapshot before compaction nor the newer data inserted
|
||||
// afterwards.
|
||||
// Note that the super version might not contain all the data available
|
||||
// to this snapshot, but in that case it can see all the data in the
|
||||
// super version, which is a valid consistent state after the user
|
||||
// calls NewIterator().
|
||||
snapshot = versions_->LastSequence();
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:3");
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:4");
|
||||
}
|
||||
|
||||
// Try to generate a DB iterator tree in continuous memory area to be
|
||||
// cache friendly. Here is an example of result:
|
||||
// +-------------------------------+
|
||||
@ -4369,6 +4424,14 @@ Status DBImpl::IngestExternalFiles(
|
||||
#endif // !NDEBUG
|
||||
}
|
||||
}
|
||||
} else if (versions_->io_status().IsIOError()) {
|
||||
// Error while writing to MANIFEST.
|
||||
// In fact, versions_->io_status() can also be the result of renaming
|
||||
// CURRENT file. With current code, it's just difficult to tell. So just
|
||||
// be pessimistic and try write to a new MANIFEST.
|
||||
// TODO: distinguish between MANIFEST write and CURRENT renaming
|
||||
const IOStatus& io_s = versions_->io_status();
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
|
||||
}
|
||||
|
||||
// Resume writes to the DB
|
||||
|
@ -356,6 +356,12 @@ class DBImpl : public DB {
|
||||
|
||||
virtual Status Close() override;
|
||||
|
||||
virtual Status DisableFileDeletions() override;
|
||||
|
||||
virtual Status EnableFileDeletions(bool force) override;
|
||||
|
||||
virtual bool IsFileDeletionsEnabled() const;
|
||||
|
||||
Status GetStatsHistory(
|
||||
uint64_t start_time, uint64_t end_time,
|
||||
std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;
|
||||
@ -363,9 +369,6 @@ class DBImpl : public DB {
|
||||
#ifndef ROCKSDB_LITE
|
||||
using DB::ResetStats;
|
||||
virtual Status ResetStats() override;
|
||||
virtual Status DisableFileDeletions() override;
|
||||
virtual Status EnableFileDeletions(bool force) override;
|
||||
virtual int IsFileDeletionsEnabled() const;
|
||||
// All the returned filenames start with "/"
|
||||
virtual Status GetLiveFiles(std::vector<std::string>&,
|
||||
uint64_t* manifest_file_size,
|
||||
@ -477,6 +480,7 @@ class DBImpl : public DB {
|
||||
Status GetImpl(const ReadOptions& options, const Slice& key,
|
||||
GetImplOptions& get_impl_options);
|
||||
|
||||
// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
|
||||
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
|
||||
ColumnFamilyData* cfd,
|
||||
SequenceNumber snapshot,
|
||||
@ -1779,6 +1783,8 @@ class DBImpl : public DB {
|
||||
SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback,
|
||||
bool* is_blob_index);
|
||||
|
||||
Status DisableFileDeletionsWithLock();
|
||||
|
||||
// table_cache_ provides its own synchronization
|
||||
std::shared_ptr<Cache> table_cache_;
|
||||
|
||||
|
@ -210,7 +210,15 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
|
||||
if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
|
||||
!io_s.IsColumnFamilyDropped()) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
|
||||
// Error while writing to MANIFEST.
|
||||
// In fact, versions_->io_status() can also be the result of renaming
|
||||
// CURRENT file. With current code, it's just difficult to tell. So just
|
||||
// be pessimistic and try write to a new MANIFEST.
|
||||
// TODO: distinguish between MANIFEST write and CURRENT renaming
|
||||
auto err_reason = versions_->io_status().ok()
|
||||
? BackgroundErrorReason::kFlush
|
||||
: BackgroundErrorReason::kManifestWrite;
|
||||
error_handler_.SetBGError(io_s, err_reason);
|
||||
} else {
|
||||
Status new_bg_error = s;
|
||||
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
||||
@ -574,7 +582,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
// it is not because of CF drop.
|
||||
if (!s.ok() && !s.IsColumnFamilyDropped()) {
|
||||
if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
|
||||
// Error while writing to MANIFEST.
|
||||
// In fact, versions_->io_status() can also be the result of renaming
|
||||
// CURRENT file. With current code, it's just difficult to tell. So just
|
||||
// be pessimistic and try write to a new MANIFEST.
|
||||
// TODO: distinguish between MANIFEST write and CURRENT renaming
|
||||
auto err_reason = versions_->io_status().ok()
|
||||
? BackgroundErrorReason::kFlush
|
||||
: BackgroundErrorReason::kManifestWrite;
|
||||
error_handler_.SetBGError(io_s, err_reason);
|
||||
} else {
|
||||
Status new_bg_error = s;
|
||||
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
||||
@ -2687,7 +2703,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
for (const auto& f : *c->inputs(0)) {
|
||||
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
|
||||
}
|
||||
versions_->SetIOStatusOK();
|
||||
status = versions_->LogAndApply(c->column_family_data(),
|
||||
*c->mutable_cf_options(), c->edit(),
|
||||
&mutex_, directories_.GetDbDir());
|
||||
@ -2745,7 +2760,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
}
|
||||
}
|
||||
|
||||
versions_->SetIOStatusOK();
|
||||
status = versions_->LogAndApply(c->column_family_data(),
|
||||
*c->mutable_cf_options(), c->edit(),
|
||||
&mutex_, directories_.GetDbDir());
|
||||
@ -2877,7 +2891,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
|
||||
status.ToString().c_str());
|
||||
if (!io_s.ok()) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
|
||||
// Error while writing to MANIFEST.
|
||||
// In fact, versions_->io_status() can also be the result of renaming
|
||||
// CURRENT file. With current code, it's just difficult to tell. So just
|
||||
// be pessimistic and try write to a new MANIFEST.
|
||||
// TODO: distinguish between MANIFEST write and CURRENT renaming
|
||||
auto err_reason = versions_->io_status().ok()
|
||||
? BackgroundErrorReason::kCompaction
|
||||
: BackgroundErrorReason::kManifestWrite;
|
||||
error_handler_.SetBGError(io_s, err_reason);
|
||||
} else {
|
||||
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
|
||||
}
|
||||
|
@ -36,6 +36,62 @@ uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
|
||||
return std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
Status DBImpl::DisableFileDeletions() {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return DisableFileDeletionsWithLock();
|
||||
}
|
||||
|
||||
Status DBImpl::DisableFileDeletionsWithLock() {
|
||||
mutex_.AssertHeld();
|
||||
++disable_delete_obsolete_files_;
|
||||
if (disable_delete_obsolete_files_ == 1) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
|
||||
} else {
|
||||
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
||||
"File Deletions Disabled, but already disabled. Counter: %d",
|
||||
disable_delete_obsolete_files_);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBImpl::EnableFileDeletions(bool force) {
|
||||
// Job id == 0 means that this is not our background process, but rather
|
||||
// user thread
|
||||
JobContext job_context(0);
|
||||
bool file_deletion_enabled = false;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
if (force) {
|
||||
// if force, we need to enable file deletions right away
|
||||
disable_delete_obsolete_files_ = 0;
|
||||
} else if (disable_delete_obsolete_files_ > 0) {
|
||||
--disable_delete_obsolete_files_;
|
||||
}
|
||||
if (disable_delete_obsolete_files_ == 0) {
|
||||
file_deletion_enabled = true;
|
||||
FindObsoleteFiles(&job_context, true);
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
}
|
||||
if (file_deletion_enabled) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
|
||||
if (job_context.HaveSomethingToDelete()) {
|
||||
PurgeObsoleteFiles(job_context);
|
||||
}
|
||||
} else {
|
||||
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
||||
"File Deletions Enable, but not really enabled. Counter: %d",
|
||||
disable_delete_obsolete_files_);
|
||||
}
|
||||
job_context.Clean();
|
||||
LogFlush(immutable_db_options_.info_log);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool DBImpl::IsFileDeletionsEnabled() const {
|
||||
return 0 == disable_delete_obsolete_files_;
|
||||
}
|
||||
|
||||
// * Returns the list of live files in 'sst_live' and 'blob_live'.
|
||||
// If it's doing full scan:
|
||||
// * Returns the list of all files in the filesystem in
|
||||
|
@ -370,7 +370,30 @@ Status DBImpl::Recover(
|
||||
}
|
||||
|
||||
std::string current_fname = CurrentFileName(dbname_);
|
||||
s = env_->FileExists(current_fname);
|
||||
// Path to any MANIFEST file in the db dir. It does not matter which one.
|
||||
// Since best-efforts recovery ignores CURRENT file, existence of a
|
||||
// MANIFEST indicates the recovery to recover existing db. If no MANIFEST
|
||||
// can be found, a new db will be created.
|
||||
std::string manifest_path;
|
||||
if (!immutable_db_options_.best_efforts_recovery) {
|
||||
s = env_->FileExists(current_fname);
|
||||
} else {
|
||||
s = Status::NotFound();
|
||||
std::vector<std::string> files;
|
||||
// No need to check return value
|
||||
env_->GetChildren(dbname_, &files);
|
||||
for (const std::string& file : files) {
|
||||
uint64_t number = 0;
|
||||
FileType type = kLogFile; // initialize
|
||||
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
|
||||
// Found MANIFEST (descriptor log), thus best-efforts recovery does
|
||||
// not have to treat the db as empty.
|
||||
s = Status::OK();
|
||||
manifest_path = dbname_ + "/" + file;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (s.IsNotFound()) {
|
||||
if (immutable_db_options_.create_if_missing) {
|
||||
s = NewDB();
|
||||
@ -398,14 +421,14 @@ Status DBImpl::Recover(
|
||||
FileOptions customized_fs(file_options_);
|
||||
customized_fs.use_direct_reads |=
|
||||
immutable_db_options_.use_direct_io_for_flush_and_compaction;
|
||||
s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
|
||||
nullptr);
|
||||
const std::string& fname =
|
||||
manifest_path.empty() ? current_fname : manifest_path;
|
||||
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
|
||||
if (!s.ok()) {
|
||||
std::string error_str = s.ToString();
|
||||
// Check if unsupported Direct I/O is the root cause
|
||||
customized_fs.use_direct_reads = false;
|
||||
s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
|
||||
nullptr);
|
||||
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
|
||||
if (s.ok()) {
|
||||
return Status::InvalidArgument(
|
||||
"Direct I/O is not supported by the specified DB.");
|
||||
|
@ -2990,10 +2990,11 @@ class ModelDB : public DB {
|
||||
|
||||
Status SyncWAL() override { return Status::OK(); }
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
Status DisableFileDeletions() override { return Status::OK(); }
|
||||
|
||||
Status EnableFileDeletions(bool /*force*/) override { return Status::OK(); }
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
Status GetLiveFiles(std::vector<std::string>&, uint64_t* /*size*/,
|
||||
bool /*flush_memtable*/ = true) override {
|
||||
return Status::OK();
|
||||
|
178
db/db_test2.cc
178
db/db_test2.cc
@ -86,6 +86,96 @@ TEST_F(DBTest2, OpenForReadOnlyWithColumnFamilies) {
|
||||
// With create_if_missing false, there should not be a dir in the file system
|
||||
ASSERT_NOK(env_->FileExists(dbname));
|
||||
}
|
||||
|
||||
class TestReadOnlyWithCompressedCache
|
||||
: public DBTestBase,
|
||||
public testing::WithParamInterface<std::tuple<int, bool>> {
|
||||
public:
|
||||
TestReadOnlyWithCompressedCache()
|
||||
: DBTestBase("/test_readonly_with_compressed_cache") {
|
||||
max_open_files_ = std::get<0>(GetParam());
|
||||
use_mmap_ = std::get<1>(GetParam());
|
||||
}
|
||||
int max_open_files_;
|
||||
bool use_mmap_;
|
||||
};
|
||||
|
||||
TEST_P(TestReadOnlyWithCompressedCache, ReadOnlyWithCompressedCache) {
|
||||
if (use_mmap_ && !IsMemoryMappedAccessSupported()) {
|
||||
return;
|
||||
}
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("foo2", "barbarbarbarbarbarbarbar"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
DB* db_ptr = nullptr;
|
||||
Options options = CurrentOptions();
|
||||
options.allow_mmap_reads = use_mmap_;
|
||||
options.max_open_files = max_open_files_;
|
||||
options.compression = kSnappyCompression;
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.block_cache_compressed = NewLRUCache(8 * 1024 * 1024);
|
||||
table_options.no_block_cache = true;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
options.statistics = CreateDBStatistics();
|
||||
|
||||
ASSERT_OK(DB::OpenForReadOnly(options, dbname_, &db_ptr));
|
||||
|
||||
std::string v;
|
||||
ASSERT_OK(db_ptr->Get(ReadOptions(), "foo", &v));
|
||||
ASSERT_EQ("bar", v);
|
||||
ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_COMPRESSED_HIT));
|
||||
ASSERT_OK(db_ptr->Get(ReadOptions(), "foo", &v));
|
||||
ASSERT_EQ("bar", v);
|
||||
if (Snappy_Supported()) {
|
||||
if (use_mmap_) {
|
||||
ASSERT_EQ(0,
|
||||
options.statistics->getTickerCount(BLOCK_CACHE_COMPRESSED_HIT));
|
||||
} else {
|
||||
ASSERT_EQ(1,
|
||||
options.statistics->getTickerCount(BLOCK_CACHE_COMPRESSED_HIT));
|
||||
}
|
||||
}
|
||||
|
||||
delete db_ptr;
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(TestReadOnlyWithCompressedCache,
|
||||
TestReadOnlyWithCompressedCache,
|
||||
::testing::Combine(::testing::Values(-1, 100),
|
||||
::testing::Bool()));
|
||||
|
||||
class PartitionedIndexTestListener : public EventListener {
|
||||
public:
|
||||
void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
|
||||
ASSERT_GT(info.table_properties.index_partitions, 1);
|
||||
ASSERT_EQ(info.table_properties.index_key_is_user_key, 0);
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(DBTest2, PartitionedIndexUserToInternalKey) {
|
||||
BlockBasedTableOptions table_options;
|
||||
Options options = CurrentOptions();
|
||||
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
|
||||
PartitionedIndexTestListener* listener = new PartitionedIndexTestListener();
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
options.listeners.emplace_back(listener);
|
||||
std::vector<const Snapshot*> snapshots;
|
||||
Reopen(options);
|
||||
Random rnd(301);
|
||||
|
||||
for (int i = 0; i < 3000; i++) {
|
||||
int j = i % 30;
|
||||
std::string value = RandomString(&rnd, 10500);
|
||||
ASSERT_OK(Put("keykey_" + std::to_string(j), value));
|
||||
snapshots.push_back(db_->GetSnapshot());
|
||||
}
|
||||
Flush();
|
||||
for (auto s : snapshots) {
|
||||
db_->ReleaseSnapshot(s);
|
||||
}
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
class PrefixFullBloomWithReverseComparator
|
||||
@ -2897,6 +2987,94 @@ TEST_F(DBTest2, OptimizeForSmallDB) {
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBTest2, IterRaceFlush1) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
|
||||
{"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ROCKSDB_NAMESPACE::port::Thread t1([&] {
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Flush();
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
|
||||
});
|
||||
|
||||
// iterator is created after the first Put(), so it should see either
|
||||
// "v1" or "v2".
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->Seek("foo");
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ("foo", it->key().ToString());
|
||||
}
|
||||
|
||||
t1.join();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, IterRaceFlush2) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
|
||||
{"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ROCKSDB_NAMESPACE::port::Thread t1([&] {
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Flush();
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
|
||||
});
|
||||
|
||||
// iterator is created after the first Put(), so it should see either
|
||||
// "v1" or "v2".
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->Seek("foo");
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ("foo", it->key().ToString());
|
||||
}
|
||||
|
||||
t1.join();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, IterRefreshRaceFlush) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"ArenaWrappedDBIter::Refresh:1", "DBTest2::IterRefreshRaceFlush:1"},
|
||||
{"DBTest2::IterRefreshRaceFlush:2", "ArenaWrappedDBIter::Refresh:2"}});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ROCKSDB_NAMESPACE::port::Thread t1([&] {
|
||||
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Flush();
|
||||
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
|
||||
});
|
||||
|
||||
// iterator is created after the first Put(), so it should see either
|
||||
// "v1" or "v2".
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->Refresh();
|
||||
it->Seek("foo");
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ("foo", it->key().ToString());
|
||||
}
|
||||
|
||||
t1.join();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, GetRaceFlush1) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
|
@ -51,9 +51,19 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
false),
|
||||
Status::Severity::kHardError},
|
||||
// Errors during MANIFEST write
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
true),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
false),
|
||||
Status::Severity::kHardError},
|
||||
};
|
||||
|
||||
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity>
|
||||
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>,
|
||||
Status::Severity>
|
||||
DefaultErrorSeverityMap = {
|
||||
// Errors during BG compaction
|
||||
{std::make_tuple(BackgroundErrorReason::kCompaction,
|
||||
@ -75,11 +85,11 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush,
|
||||
Status::Code::kCorruption, false),
|
||||
Status::Severity::kNoError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush,
|
||||
Status::Code::kIOError, true),
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush,
|
||||
Status::Code::kIOError, false),
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
false),
|
||||
Status::Severity::kNoError},
|
||||
// Errors during Write
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
|
||||
@ -94,30 +104,36 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
|
||||
Status::Code::kIOError, false),
|
||||
Status::Severity::kNoError},
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, false),
|
||||
Status::Severity::kFatalError},
|
||||
};
|
||||
|
||||
std::map<std::tuple<BackgroundErrorReason, bool>, Status::Severity>
|
||||
DefaultReasonMap = {
|
||||
// Errors during BG compaction
|
||||
{std::make_tuple(BackgroundErrorReason::kCompaction, true),
|
||||
Status::Severity::kFatalError},
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kCompaction, false),
|
||||
Status::Severity::kNoError},
|
||||
Status::Severity::kNoError},
|
||||
// Errors during BG flush
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, true),
|
||||
Status::Severity::kFatalError},
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, false),
|
||||
Status::Severity::kNoError},
|
||||
Status::Severity::kNoError},
|
||||
// Errors during Write
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback, true),
|
||||
Status::Severity::kFatalError},
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback, false),
|
||||
Status::Severity::kFatalError},
|
||||
Status::Severity::kFatalError},
|
||||
// Errors during Memtable update
|
||||
{std::make_tuple(BackgroundErrorReason::kMemTable, true),
|
||||
Status::Severity::kFatalError},
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kMemTable, false),
|
||||
Status::Severity::kFatalError},
|
||||
Status::Severity::kFatalError},
|
||||
};
|
||||
|
||||
void ErrorHandler::CancelErrorRecovery() {
|
||||
@ -247,6 +263,10 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
|
||||
if (recovery_in_prog_ && recovery_error_.ok()) {
|
||||
recovery_error_ = bg_io_err;
|
||||
}
|
||||
if (BackgroundErrorReason::kManifestWrite == reason) {
|
||||
// Always returns ok
|
||||
db_->DisableFileDeletionsWithLock();
|
||||
}
|
||||
Status new_bg_io_err = bg_io_err;
|
||||
Status s;
|
||||
if (bg_io_err.GetDataLoss()) {
|
||||
|
@ -798,7 +798,7 @@ bool InternalStats::HandleCurrentSuperVersionNumber(uint64_t* value,
|
||||
|
||||
bool InternalStats::HandleIsFileDeletionsEnabled(uint64_t* value, DBImpl* db,
|
||||
Version* /*version*/) {
|
||||
*value = db->IsFileDeletionsEnabled();
|
||||
*value = db->IsFileDeletionsEnabled() ? 1 : 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
|
||||
// Compute the crc of the record type and the payload.
|
||||
crc = crc32c::Extend(crc, ptr, n);
|
||||
crc = crc32c::Mask(crc); // Adjust for storage
|
||||
TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
|
||||
&crc);
|
||||
EncodeFixed32(buf, crc);
|
||||
|
||||
// Write the header and the payload
|
||||
|
@ -766,7 +766,13 @@ static bool SaveValue(void* arg, const char* entry) {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
default:
|
||||
default: {
|
||||
std::string msg("Unrecognized value type: " +
|
||||
std::to_string(static_cast<int>(type)) + ". ");
|
||||
msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) + ". ");
|
||||
msg.append("seq: " + std::to_string(seq) + ".");
|
||||
*(s->status) = Status::Corruption(msg.c_str());
|
||||
}
|
||||
assert(false);
|
||||
return true;
|
||||
}
|
||||
|
@ -470,7 +470,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
||||
}
|
||||
|
||||
// this can release and reacquire the mutex.
|
||||
vset->SetIOStatusOK();
|
||||
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
|
||||
db_directory);
|
||||
*io_s = vset->io_status();
|
||||
|
@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler(
|
||||
assert(version_set_ != nullptr);
|
||||
}
|
||||
|
||||
Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
|
||||
void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
|
||||
std::string* db_id) {
|
||||
Slice record;
|
||||
std::string scratch;
|
||||
assert(log_read_status);
|
||||
assert(log_read_status->ok());
|
||||
|
||||
size_t recovered_edits = 0;
|
||||
Status s = Initialize();
|
||||
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
|
||||
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
|
||||
log_read_status->ok()) {
|
||||
VersionEdit edit;
|
||||
s = edit.DecodeFrom(record);
|
||||
if (!s.ok()) {
|
||||
@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!log_read_status->ok()) {
|
||||
s = *log_read_status;
|
||||
}
|
||||
|
||||
CheckIterationResult(reader, &s);
|
||||
|
||||
if (!s.ok()) {
|
||||
status_ = s;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status VersionEditHandler::Initialize() {
|
||||
@ -404,13 +411,12 @@ Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
|
||||
bool is_initial_load) {
|
||||
assert(cfd != nullptr);
|
||||
assert(!cfd->IsDropped());
|
||||
Status s;
|
||||
auto builder_iter = builders_.find(cfd->GetID());
|
||||
assert(builder_iter != builders_.end());
|
||||
assert(builder_iter->second != nullptr);
|
||||
VersionBuilder* builder = builder_iter->second->version_builder();
|
||||
assert(builder);
|
||||
s = builder->LoadTableHandlers(
|
||||
Status s = builder->LoadTableHandlers(
|
||||
cfd->internal_stats(),
|
||||
version_set_->db_options_->max_file_opening_threads,
|
||||
prefetch_index_and_filter_in_cache, is_initial_load,
|
||||
@ -551,6 +557,8 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
|
||||
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
|
||||
missing_files.insert(file_num);
|
||||
s = Status::OK();
|
||||
} else if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
bool missing_info = !version_edit_params_.has_log_number_ ||
|
||||
@ -558,8 +566,9 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
|
||||
!version_edit_params_.has_last_sequence_;
|
||||
|
||||
// Create version before apply edit
|
||||
if (!missing_info && ((!missing_files.empty() && !prev_has_missing_files) ||
|
||||
(missing_files.empty() && force_create_version))) {
|
||||
if (s.ok() && !missing_info &&
|
||||
((!missing_files.empty() && !prev_has_missing_files) ||
|
||||
(missing_files.empty() && force_create_version))) {
|
||||
auto builder_iter = builders_.find(cfd->GetID());
|
||||
assert(builder_iter != builders_.end());
|
||||
auto* builder = builder_iter->second->version_builder();
|
||||
|
@ -40,7 +40,8 @@ class VersionEditHandler {
|
||||
|
||||
virtual ~VersionEditHandler() {}
|
||||
|
||||
Status Iterate(log::Reader& reader, std::string* db_id);
|
||||
void Iterate(log::Reader& reader, Status* log_read_status,
|
||||
std::string* db_id);
|
||||
|
||||
const Status& status() const { return status_; }
|
||||
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include "db/version_set.h"
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <array>
|
||||
#include <cinttypes>
|
||||
@ -19,6 +20,7 @@
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "compaction/compaction.h"
|
||||
#include "db/internal_stats.h"
|
||||
#include "db/log_reader.h"
|
||||
@ -50,6 +52,7 @@
|
||||
#include "table/table_reader.h"
|
||||
#include "table/two_level_iterator.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "util/cast_util.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/string_util.h"
|
||||
@ -3875,10 +3878,6 @@ Status VersionSet::ProcessManifestWrites(
|
||||
}
|
||||
#endif // NDEBUG
|
||||
|
||||
uint64_t new_manifest_file_size = 0;
|
||||
Status s;
|
||||
IOStatus io_s;
|
||||
|
||||
assert(pending_manifest_file_number_ == 0);
|
||||
if (!descriptor_log_ ||
|
||||
manifest_file_size_ > db_options_->max_manifest_file_size) {
|
||||
@ -3908,6 +3907,9 @@ Status VersionSet::ProcessManifestWrites(
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t new_manifest_file_size = 0;
|
||||
Status s;
|
||||
IOStatus io_s;
|
||||
{
|
||||
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
|
||||
mu->Unlock();
|
||||
@ -3944,9 +3946,9 @@ Status VersionSet::ProcessManifestWrites(
|
||||
std::string descriptor_fname =
|
||||
DescriptorFileName(dbname_, pending_manifest_file_number_);
|
||||
std::unique_ptr<FSWritableFile> descriptor_file;
|
||||
s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
|
||||
opt_file_opts);
|
||||
if (s.ok()) {
|
||||
io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
|
||||
opt_file_opts);
|
||||
if (io_s.ok()) {
|
||||
descriptor_file->SetPreallocationBlockSize(
|
||||
db_options_->manifest_preallocation_size);
|
||||
|
||||
@ -3955,7 +3957,10 @@ Status VersionSet::ProcessManifestWrites(
|
||||
nullptr, db_options_->listeners));
|
||||
descriptor_log_.reset(
|
||||
new log::Writer(std::move(file_writer), 0, false));
|
||||
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
|
||||
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(),
|
||||
io_s);
|
||||
} else {
|
||||
s = io_s;
|
||||
}
|
||||
}
|
||||
|
||||
@ -3991,16 +3996,16 @@ Status VersionSet::ProcessManifestWrites(
|
||||
#endif /* !NDEBUG */
|
||||
io_s = descriptor_log_->AddRecord(record);
|
||||
if (!io_s.ok()) {
|
||||
io_status_ = io_s;
|
||||
s = io_s;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
|
||||
}
|
||||
if (!io_s.ok()) {
|
||||
io_status_ = io_s;
|
||||
s = io_s;
|
||||
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
|
||||
s.ToString().c_str());
|
||||
@ -4013,7 +4018,6 @@ Status VersionSet::ProcessManifestWrites(
|
||||
io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
|
||||
db_directory);
|
||||
if (!io_s.ok()) {
|
||||
io_status_ = io_s;
|
||||
s = io_s;
|
||||
}
|
||||
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
|
||||
@ -4035,6 +4039,14 @@ Status VersionSet::ProcessManifestWrites(
|
||||
mu->Lock();
|
||||
}
|
||||
|
||||
if (!io_s.ok()) {
|
||||
if (io_status_.ok()) {
|
||||
io_status_ = io_s;
|
||||
}
|
||||
} else if (!io_status_.ok()) {
|
||||
io_status_ = io_s;
|
||||
}
|
||||
|
||||
// Append the old manifest file to the obsolete_manifest_ list to be deleted
|
||||
// by PurgeObsoleteFiles later.
|
||||
if (s.ok() && new_descriptor_log) {
|
||||
@ -4444,24 +4456,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
|
||||
if (dbname.back() != '/') {
|
||||
manifest_path->push_back('/');
|
||||
}
|
||||
*manifest_path += fname;
|
||||
manifest_path->append(fname);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VersionSet::ReadAndRecover(
|
||||
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
|
||||
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
|
||||
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
|
||||
std::unordered_map<int, std::string>& column_families_not_found,
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
|
||||
builders,
|
||||
VersionEditParams* version_edit_params, std::string* db_id) {
|
||||
assert(reader != nullptr);
|
||||
Status* log_read_status, VersionEditParams* version_edit_params,
|
||||
std::string* db_id) {
|
||||
assert(read_buffer != nullptr);
|
||||
assert(log_read_status != nullptr);
|
||||
Status s;
|
||||
Slice record;
|
||||
std::string scratch;
|
||||
size_t recovered_edits = 0;
|
||||
while (reader->ReadRecord(&record, &scratch) && s.ok()) {
|
||||
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
|
||||
log_read_status->ok()) {
|
||||
VersionEdit edit;
|
||||
s = edit.DecodeFrom(record);
|
||||
if (!s.ok()) {
|
||||
@ -4505,6 +4519,9 @@ Status VersionSet::ReadAndRecover(
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!log_read_status->ok()) {
|
||||
s = *log_read_status;
|
||||
}
|
||||
if (!s.ok()) {
|
||||
// Clear the buffer if we fail to decode/apply an edit.
|
||||
read_buffer->Clear();
|
||||
@ -4551,8 +4568,7 @@ Status VersionSet::Recover(
|
||||
db_options_->log_readahead_size));
|
||||
}
|
||||
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
||||
builders;
|
||||
VersionBuilderMap builders;
|
||||
|
||||
// add default column family
|
||||
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
|
||||
@ -4574,12 +4590,13 @@ Status VersionSet::Recover(
|
||||
VersionEditParams version_edit_params;
|
||||
{
|
||||
VersionSet::LogReporter reporter;
|
||||
reporter.status = &s;
|
||||
Status log_read_status;
|
||||
reporter.status = &log_read_status;
|
||||
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
|
||||
true /* checksum */, 0 /* log_number */);
|
||||
AtomicGroupReadBuffer read_buffer;
|
||||
s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
|
||||
column_families_not_found, builders,
|
||||
s = ReadAndRecover(reader, &read_buffer, cf_name_to_options,
|
||||
column_families_not_found, builders, &log_read_status,
|
||||
&version_edit_params, db_id);
|
||||
current_manifest_file_size = reader.GetReadOffset();
|
||||
assert(current_manifest_file_size != 0);
|
||||
@ -4845,21 +4862,20 @@ Status VersionSet::TryRecoverFromOneManifest(
|
||||
db_options_->log_readahead_size));
|
||||
}
|
||||
|
||||
assert(s.ok());
|
||||
VersionSet::LogReporter reporter;
|
||||
reporter.status = &s;
|
||||
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
|
||||
/*checksum=*/true, /*log_num=*/0);
|
||||
{
|
||||
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
|
||||
const_cast<VersionSet*>(this));
|
||||
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
|
||||
const_cast<VersionSet*>(this));
|
||||
|
||||
s = handler_pit.Iterate(reader, db_id);
|
||||
handler_pit.Iterate(reader, &s, db_id);
|
||||
|
||||
assert(nullptr != has_missing_table_file);
|
||||
*has_missing_table_file = handler_pit.HasMissingFiles();
|
||||
}
|
||||
assert(nullptr != has_missing_table_file);
|
||||
*has_missing_table_file = handler_pit.HasMissingFiles();
|
||||
|
||||
return s;
|
||||
return handler_pit.status();
|
||||
}
|
||||
|
||||
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
|
||||
@ -5284,7 +5300,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
|
||||
|
||||
Status VersionSet::WriteCurrentStateToManifest(
|
||||
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
|
||||
log::Writer* log) {
|
||||
log::Writer* log, IOStatus& io_s) {
|
||||
// TODO: Break up into multiple records to reduce memory usage on recovery?
|
||||
|
||||
// WARNING: This method doesn't hold a mutex!!
|
||||
@ -5293,6 +5309,7 @@ Status VersionSet::WriteCurrentStateToManifest(
|
||||
// LogAndApply. Column family manipulations can only happen within LogAndApply
|
||||
// (the same single thread), so we're safe to iterate.
|
||||
|
||||
assert(io_s.ok());
|
||||
if (db_options_->write_dbid_to_manifest) {
|
||||
VersionEdit edit_for_db_id;
|
||||
assert(!db_id_.empty());
|
||||
@ -5302,10 +5319,9 @@ Status VersionSet::WriteCurrentStateToManifest(
|
||||
return Status::Corruption("Unable to Encode VersionEdit:" +
|
||||
edit_for_db_id.DebugString(true));
|
||||
}
|
||||
IOStatus io_s = log->AddRecord(db_id_record);
|
||||
io_s = log->AddRecord(db_id_record);
|
||||
if (!io_s.ok()) {
|
||||
io_status_ = io_s;
|
||||
return std::move(io_s);
|
||||
return io_s;
|
||||
}
|
||||
}
|
||||
|
||||
@ -5332,10 +5348,9 @@ Status VersionSet::WriteCurrentStateToManifest(
|
||||
return Status::Corruption(
|
||||
"Unable to Encode VersionEdit:" + edit.DebugString(true));
|
||||
}
|
||||
IOStatus io_s = log->AddRecord(record);
|
||||
io_s = log->AddRecord(record);
|
||||
if (!io_s.ok()) {
|
||||
io_status_ = io_s;
|
||||
return std::move(io_s);
|
||||
return io_s;
|
||||
}
|
||||
}
|
||||
|
||||
@ -5385,10 +5400,9 @@ Status VersionSet::WriteCurrentStateToManifest(
|
||||
return Status::Corruption(
|
||||
"Unable to Encode VersionEdit:" + edit.DebugString(true));
|
||||
}
|
||||
IOStatus io_s = log->AddRecord(record);
|
||||
io_s = log->AddRecord(record);
|
||||
if (!io_s.ok()) {
|
||||
io_status_ = io_s;
|
||||
return std::move(io_s);
|
||||
return io_s;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5980,8 +5994,7 @@ Status ReactiveVersionSet::Recover(
|
||||
// In recovery, nobody else can access it, so it's fine to set it to be
|
||||
// initialized earlier.
|
||||
default_cfd->set_initialized();
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
||||
builders;
|
||||
VersionBuilderMap builders;
|
||||
std::unordered_map<int, std::string> column_families_not_found;
|
||||
builders.insert(
|
||||
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
|
||||
@ -5989,7 +6002,7 @@ Status ReactiveVersionSet::Recover(
|
||||
|
||||
manifest_reader_status->reset(new Status());
|
||||
manifest_reporter->reset(new LogReporter());
|
||||
static_cast<LogReporter*>(manifest_reporter->get())->status =
|
||||
static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
|
||||
manifest_reader_status->get();
|
||||
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
|
||||
log::Reader* reader = manifest_reader->get();
|
||||
@ -5998,10 +6011,9 @@ Status ReactiveVersionSet::Recover(
|
||||
VersionEdit version_edit;
|
||||
while (s.ok() && retry < 1) {
|
||||
assert(reader != nullptr);
|
||||
Slice record;
|
||||
std::string scratch;
|
||||
s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
|
||||
column_families_not_found, builders, &version_edit);
|
||||
s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
|
||||
column_families_not_found, builders,
|
||||
manifest_reader_status->get(), &version_edit);
|
||||
if (s.ok()) {
|
||||
bool enough = version_edit.has_next_file_number_ &&
|
||||
version_edit.has_log_number_ &&
|
||||
|
@ -1159,12 +1159,13 @@ class VersionSet {
|
||||
static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
|
||||
|
||||
// Get the IO Status returned by written Manifest.
|
||||
IOStatus io_status() const { return io_status_; }
|
||||
|
||||
// Set the IO Status to OK. Called before Manifest write if needed.
|
||||
void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
|
||||
const IOStatus& io_status() const { return io_status_; }
|
||||
|
||||
protected:
|
||||
using VersionBuilderMap =
|
||||
std::unordered_map<uint32_t,
|
||||
std::unique_ptr<BaseReferencedVersionBuilder>>;
|
||||
|
||||
struct ManifestWriter;
|
||||
|
||||
friend class Version;
|
||||
@ -1176,7 +1177,9 @@ class VersionSet {
|
||||
struct LogReporter : public log::Reader::Reporter {
|
||||
Status* status;
|
||||
virtual void Corruption(size_t /*bytes*/, const Status& s) override {
|
||||
if (this->status->ok()) *this->status = s;
|
||||
if (status->ok()) {
|
||||
*status = s;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -1199,7 +1202,7 @@ class VersionSet {
|
||||
// Save current contents to *log
|
||||
Status WriteCurrentStateToManifest(
|
||||
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
|
||||
log::Writer* log);
|
||||
log::Writer* log, IOStatus& io_s);
|
||||
|
||||
void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
|
||||
|
||||
@ -1207,13 +1210,14 @@ class VersionSet {
|
||||
const VersionEdit* edit);
|
||||
|
||||
Status ReadAndRecover(
|
||||
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
|
||||
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
|
||||
const std::unordered_map<std::string, ColumnFamilyOptions>&
|
||||
name_to_options,
|
||||
std::unordered_map<int, std::string>& column_families_not_found,
|
||||
std::unordered_map<
|
||||
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
|
||||
VersionEditParams* version_edit, std::string* db_id = nullptr);
|
||||
Status* log_read_status, VersionEditParams* version_edit,
|
||||
std::string* db_id = nullptr);
|
||||
|
||||
// REQUIRES db mutex
|
||||
Status ApplyOneVersionEditToBuilder(
|
||||
@ -1342,8 +1346,7 @@ class ReactiveVersionSet : public VersionSet {
|
||||
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
|
||||
|
||||
private:
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
||||
active_version_builders_;
|
||||
VersionBuilderMap active_version_builders_;
|
||||
AtomicGroupReadBuffer read_buffer_;
|
||||
// Number of version edits to skip by ReadAndApply at the beginning of a new
|
||||
// MANIFEST created by primary.
|
||||
|
@ -1266,8 +1266,6 @@ class DB {
|
||||
// updated, false if user attempted to call if with seqnum <= current value.
|
||||
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) = 0;
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
// Prevent file deletions. Compactions will continue to occur,
|
||||
// but no obsolete files will be deleted. Calling this multiple
|
||||
// times have the same effect as calling it once.
|
||||
@ -1284,6 +1282,7 @@ class DB {
|
||||
// threads call EnableFileDeletions()
|
||||
virtual Status EnableFileDeletions(bool force = true) = 0;
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
// GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup
|
||||
|
||||
// Retrieve the list of all files in the database. The files are
|
||||
|
@ -117,6 +117,7 @@ enum class BackgroundErrorReason {
|
||||
kCompaction,
|
||||
kWriteCallback,
|
||||
kMemTable,
|
||||
kManifestWrite,
|
||||
};
|
||||
|
||||
enum class WriteStallCondition {
|
||||
|
@ -5,8 +5,8 @@
|
||||
#pragma once
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 10
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_MINOR 11
|
||||
#define ROCKSDB_PATCH 7
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -647,7 +647,12 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
||||
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
|
||||
"block_size_deviation=8;block_restart_interval=4;"
|
||||
"format_version=5;whole_key_filtering=1;"
|
||||
"filter_policy=bloomfilter:4.567:false;",
|
||||
"filter_policy=bloomfilter:4.567:false;"
|
||||
// A bug caused read_amp_bytes_per_bit to be a large integer in OPTIONS
|
||||
// file generated by 6.10 to 6.14. Though bug is fixed in these releases,
|
||||
// we need to handle the case of loading OPTIONS file generated before the
|
||||
// fix.
|
||||
"read_amp_bytes_per_bit=17179869185;",
|
||||
&new_opt));
|
||||
ASSERT_TRUE(new_opt.cache_index_and_filter_blocks);
|
||||
ASSERT_EQ(new_opt.index_type, BlockBasedTableOptions::kHashSearch);
|
||||
@ -668,6 +673,9 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
||||
dynamic_cast<const BloomFilterPolicy&>(*new_opt.filter_policy);
|
||||
EXPECT_EQ(bfp.GetMillibitsPerKey(), 4567);
|
||||
EXPECT_EQ(bfp.GetWholeBitsPerKey(), 5);
|
||||
// Verify that only the lower 32bits are stored in
|
||||
// new_opt.read_amp_bytes_per_bit.
|
||||
EXPECT_EQ(1U, new_opt.read_amp_bytes_per_bit);
|
||||
|
||||
// unknown option
|
||||
ASSERT_NOK(GetBlockBasedTableOptionsFromString(
|
||||
@ -1279,7 +1287,7 @@ TEST_F(OptionsTest, ConvertOptionsTest) {
|
||||
// This test suite tests the old APIs into the Configure options methods.
|
||||
// Once those APIs are officially deprecated, this test suite can be deleted.
|
||||
class OptionsOldApiTest : public testing::Test {};
|
||||
|
||||
|
||||
TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
|
||||
std::unordered_map<std::string, std::string> cf_options_map = {
|
||||
{"write_buffer_size", "1"},
|
||||
@ -1744,7 +1752,7 @@ TEST_F(OptionsOldApiTest, GetColumnFamilyOptionsFromStringTest) {
|
||||
ASSERT_TRUE(new_cf_opt.memtable_factory != nullptr);
|
||||
ASSERT_EQ(std::string(new_cf_opt.memtable_factory->Name()), "SkipListFactory");
|
||||
}
|
||||
|
||||
|
||||
TEST_F(OptionsOldApiTest, GetBlockBasedTableOptionsFromString) {
|
||||
BlockBasedTableOptions table_opt;
|
||||
BlockBasedTableOptions new_opt;
|
||||
@ -1919,7 +1927,7 @@ TEST_F(OptionsOldApiTest, GetBlockBasedTableOptionsFromString) {
|
||||
->GetHighPriPoolRatio(),
|
||||
0.5);
|
||||
}
|
||||
|
||||
|
||||
TEST_F(OptionsOldApiTest, GetPlainTableOptionsFromString) {
|
||||
PlainTableOptions table_opt;
|
||||
PlainTableOptions new_opt;
|
||||
@ -1950,7 +1958,7 @@ TEST_F(OptionsOldApiTest, GetPlainTableOptionsFromString) {
|
||||
"encoding_type=kPrefixXX",
|
||||
&new_opt));
|
||||
}
|
||||
|
||||
|
||||
TEST_F(OptionsOldApiTest, GetOptionsFromStringTest) {
|
||||
Options base_options, new_options;
|
||||
base_options.write_buffer_size = 20;
|
||||
@ -2508,7 +2516,7 @@ TEST_F(OptionsParserTest, Readahead) {
|
||||
uint64_t file_size = 0;
|
||||
ASSERT_OK(env_->GetFileSize(kOptionsFileName, &file_size));
|
||||
assert(file_size > 0);
|
||||
|
||||
|
||||
RocksDBOptionsParser parser;
|
||||
|
||||
env_->num_seq_file_read_ = 0;
|
||||
|
@ -327,8 +327,24 @@ static std::unordered_map<std::string, OptionTypeInfo>
|
||||
OptionTypeFlags::kNone, 0}},
|
||||
{"read_amp_bytes_per_bit",
|
||||
{offsetof(struct BlockBasedTableOptions, read_amp_bytes_per_bit),
|
||||
OptionType::kSizeT, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone, 0}},
|
||||
OptionType::kUInt32T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone, 0,
|
||||
[](const ConfigOptions& /*opts*/, const std::string& /*name*/,
|
||||
const std::string& value, char* addr) {
|
||||
// A workaround to fix a bug in 6.10, 6.11, 6.12, 6.13
|
||||
// and 6.14. The bug will write out 8 bytes to OPTIONS file from the
|
||||
// starting address of BlockBasedTableOptions.read_amp_bytes_per_bit
|
||||
// which is actually a uint32. Consequently, the value of
|
||||
// read_amp_bytes_per_bit written in the OPTIONS file is wrong.
|
||||
// From 6.15, RocksDB will try to parse the read_amp_bytes_per_bit
|
||||
// from OPTIONS file as a uint32. To be able to load OPTIONS file
|
||||
// generated by affected releases before the fix, we need to
|
||||
// manually parse read_amp_bytes_per_bit with this special hack.
|
||||
uint64_t read_amp_bytes_per_bit = ParseUint64(value);
|
||||
*(reinterpret_cast<uint32_t*>(addr)) =
|
||||
static_cast<uint32_t>(read_amp_bytes_per_bit);
|
||||
return Status::OK();
|
||||
}}},
|
||||
{"enable_index_compression",
|
||||
{offsetof(struct BlockBasedTableOptions, enable_index_compression),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal,
|
||||
|
@ -1426,10 +1426,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
|
||||
assert(block_entry != nullptr);
|
||||
const bool no_io = (ro.read_tier == kBlockCacheTier);
|
||||
Cache* block_cache = rep_->table_options.block_cache.get();
|
||||
// No point to cache compressed blocks if it never goes away
|
||||
Cache* block_cache_compressed =
|
||||
rep_->immortal_table ? nullptr
|
||||
: rep_->table_options.block_cache_compressed.get();
|
||||
rep_->table_options.block_cache_compressed.get();
|
||||
|
||||
// First, try to get the block from the cache
|
||||
//
|
||||
|
@ -104,6 +104,15 @@ void PartitionedIndexBuilder::MakeNewSubIndexBuilder() {
|
||||
comparator_, table_opt_.index_block_restart_interval,
|
||||
table_opt_.format_version, use_value_delta_encoding_,
|
||||
table_opt_.index_shortening, /* include_first_key */ false);
|
||||
|
||||
// Set sub_index_builder_->seperator_is_key_plus_seq_ to true if
|
||||
// seperator_is_key_plus_seq_ is true (internal-key mode) (set to false by
|
||||
// default on Creation) so that flush policy can point to
|
||||
// sub_index_builder_->index_block_builder_
|
||||
if (seperator_is_key_plus_seq_) {
|
||||
sub_index_builder_->seperator_is_key_plus_seq_ = true;
|
||||
}
|
||||
|
||||
flush_policy_.reset(FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
|
||||
table_opt_.metadata_block_size, table_opt_.block_size_deviation,
|
||||
// Note: this is sub-optimal since sub_index_builder_ could later reset
|
||||
@ -129,9 +138,15 @@ void PartitionedIndexBuilder::AddIndexEntry(
|
||||
}
|
||||
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
|
||||
first_key_in_next_block, block_handle);
|
||||
if (sub_index_builder_->seperator_is_key_plus_seq_) {
|
||||
// then we need to apply it to all sub-index builders
|
||||
if (!seperator_is_key_plus_seq_ &&
|
||||
sub_index_builder_->seperator_is_key_plus_seq_) {
|
||||
// then we need to apply it to all sub-index builders and reset
|
||||
// flush_policy to point to Block Builder of sub_index_builder_ that store
|
||||
// internal keys.
|
||||
seperator_is_key_plus_seq_ = true;
|
||||
flush_policy_.reset(FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
|
||||
table_opt_.metadata_block_size, table_opt_.block_size_deviation,
|
||||
sub_index_builder_->index_block_builder_));
|
||||
}
|
||||
sub_index_last_key_ = std::string(*last_key_in_current_block);
|
||||
entries_.push_back(
|
||||
@ -161,9 +176,15 @@ void PartitionedIndexBuilder::AddIndexEntry(
|
||||
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
|
||||
first_key_in_next_block, block_handle);
|
||||
sub_index_last_key_ = std::string(*last_key_in_current_block);
|
||||
if (sub_index_builder_->seperator_is_key_plus_seq_) {
|
||||
// then we need to apply it to all sub-index builders
|
||||
if (!seperator_is_key_plus_seq_ &&
|
||||
sub_index_builder_->seperator_is_key_plus_seq_) {
|
||||
// then we need to apply it to all sub-index builders and reset
|
||||
// flush_policy to point to Block Builder of sub_index_builder_ that store
|
||||
// internal keys.
|
||||
seperator_is_key_plus_seq_ = true;
|
||||
flush_policy_.reset(FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
|
||||
table_opt_.metadata_block_size, table_opt_.block_size_deviation,
|
||||
sub_index_builder_->index_block_builder_));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ void PartitionIndexReader::CacheDependencies(bool pin) {
|
||||
|
||||
assert(s.ok() || block.GetValue() == nullptr);
|
||||
if (s.ok() && block.GetValue() != nullptr) {
|
||||
if (block.IsCached()) {
|
||||
if (block.IsCached() || block.GetOwnValue()) {
|
||||
if (pin) {
|
||||
partition_map_[handle.offset()] = std::move(block);
|
||||
}
|
||||
|
@ -142,6 +142,11 @@ class IteratorWrapperBase {
|
||||
return result_.value_prepared;
|
||||
}
|
||||
|
||||
Slice user_key() const {
|
||||
assert(Valid());
|
||||
return iter_->user_key();
|
||||
}
|
||||
|
||||
private:
|
||||
void Update() {
|
||||
valid_ = iter_->Valid();
|
||||
|
@ -43,6 +43,10 @@ class TwoLevelIndexIterator : public InternalIteratorBase<IndexValue> {
|
||||
assert(Valid());
|
||||
return second_level_iter_.key();
|
||||
}
|
||||
Slice user_key() const override {
|
||||
assert(Valid());
|
||||
return second_level_iter_.user_key();
|
||||
}
|
||||
IndexValue value() const override {
|
||||
assert(Valid());
|
||||
return second_level_iter_.value();
|
||||
|
@ -8,8 +8,10 @@
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/status.h"
|
||||
#include "rocksdb/utilities/stackable_db.h"
|
||||
@ -24,6 +26,8 @@ namespace blob_db {
|
||||
// The factory needs to be moved to include/rocksdb/utilities to allow
|
||||
// users to use blob DB.
|
||||
|
||||
constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
struct BlobDBOptions {
|
||||
// Name of the directory under the base DB where blobs will be stored. Using
|
||||
// a directory where the base DB stores its SST files is not supported.
|
||||
|
Loading…
Reference in New Issue
Block a user