Do not swallow error returned from SaveTo() (#6801)
Summary: With consistency check enabled, VersionBuilder::SaveTo() may return error once corruption is detected while building versions. We should handle these errors. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6801 Test Plan: make check Reviewed By: siying Differential Revision: D21385045 Pulled By: riversand963 fbshipit-source-id: 98f6424e2a4699b62befa21e9fe00e70a771118e
This commit is contained in:
parent
ec33e9378c
commit
0ef925153d
@ -1778,6 +1778,28 @@ TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
|
||||
ASSERT_NE(nullptr, arg);
|
||||
*(reinterpret_cast<Status*>(arg)) =
|
||||
Status::Corruption("Inject corruption");
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
options.best_efforts_recovery = true;
|
||||
Status s = TryReopen(options);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
namespace {
|
||||
class TableFileListener : public EventListener {
|
||||
|
@ -45,6 +45,8 @@ class DBSecondaryTest : public DBTestBase {
|
||||
|
||||
void OpenSecondary(const Options& options);
|
||||
|
||||
Status TryOpenSecondary(const Options& options);
|
||||
|
||||
void OpenSecondaryWithColumnFamilies(
|
||||
const std::vector<std::string>& column_families, const Options& options);
|
||||
|
||||
@ -70,9 +72,13 @@ class DBSecondaryTest : public DBTestBase {
|
||||
};
|
||||
|
||||
void DBSecondaryTest::OpenSecondary(const Options& options) {
|
||||
ASSERT_OK(TryOpenSecondary(options));
|
||||
}
|
||||
|
||||
Status DBSecondaryTest::TryOpenSecondary(const Options& options) {
|
||||
Status s =
|
||||
DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
|
||||
ASSERT_OK(s);
|
||||
return s;
|
||||
}
|
||||
|
||||
void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
|
||||
@ -858,6 +864,56 @@ TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) {
|
||||
thread.join();
|
||||
ASSERT_TRUE(called);
|
||||
}
|
||||
|
||||
TEST_F(DBSecondaryTest, StartFromInconsistent) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
|
||||
ASSERT_NE(nullptr, arg);
|
||||
*(reinterpret_cast<Status*>(arg)) =
|
||||
Status::Corruption("Inject corruption");
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
Options options1;
|
||||
Status s = TryOpenSecondary(options1);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
}
|
||||
|
||||
TEST_F(DBSecondaryTest, InconsistencyDuringCatchUp) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
ASSERT_OK(Put("foo", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
Options options1;
|
||||
OpenSecondary(options1);
|
||||
|
||||
{
|
||||
std::string value;
|
||||
ASSERT_OK(db_secondary_->Get(ReadOptions(), "foo", &value));
|
||||
ASSERT_EQ("value", value);
|
||||
}
|
||||
|
||||
ASSERT_OK(Put("bar", "value1"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
|
||||
ASSERT_NE(nullptr, arg);
|
||||
*(reinterpret_cast<Status*>(arg)) =
|
||||
Status::Corruption("Inject corruption");
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
Status s = db_secondary_->TryCatchUpWithPrimary();
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
}
|
||||
#endif //! ROCKSDB_LITE
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -377,6 +377,7 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
|
||||
ColumnFamilyData* cfd,
|
||||
bool force_create_version) {
|
||||
assert(cfd->initialized());
|
||||
Status s;
|
||||
if (force_create_version) {
|
||||
auto builder_iter = builders_.find(cfd->GetID());
|
||||
assert(builder_iter != builders_.end());
|
||||
@ -384,13 +385,18 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
|
||||
auto* v = new Version(cfd, version_set_, version_set_->file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
version_set_->current_version_number_++);
|
||||
builder->SaveTo(v->storage_info());
|
||||
s = builder->SaveTo(v->storage_info());
|
||||
if (s.ok()) {
|
||||
// Install new version
|
||||
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
|
||||
v->PrepareApply(
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
!(version_set_->db_options_->skip_stats_update_on_db_open));
|
||||
version_set_->AppendVersion(cfd, v);
|
||||
} else {
|
||||
delete v;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
|
||||
@ -558,7 +564,8 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
|
||||
auto* version = new Version(cfd, version_set_, version_set_->file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
version_set_->current_version_number_++);
|
||||
builder->SaveTo(version->storage_info());
|
||||
s = builder->SaveTo(version->storage_info());
|
||||
if (s.ok()) {
|
||||
version->PrepareApply(
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
!version_set_->db_options_->skip_stats_update_on_db_open);
|
||||
@ -569,6 +576,9 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
|
||||
} else {
|
||||
versions_.emplace(cfd->GetID(), version);
|
||||
}
|
||||
} else {
|
||||
delete version;
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -5920,13 +5920,23 @@ Status ReactiveVersionSet::Recover(
|
||||
Version* v = new Version(cfd, this, file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
current_version_number_++);
|
||||
builder->SaveTo(v->storage_info());
|
||||
s = builder->SaveTo(v->storage_info());
|
||||
|
||||
if (s.ok()) {
|
||||
// Install recovered version
|
||||
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
|
||||
!(db_options_->skip_stats_update_on_db_open));
|
||||
AppendVersion(cfd, v);
|
||||
} else {
|
||||
ROCKS_LOG_ERROR(db_options_->info_log,
|
||||
"[%s]: inconsistent version: %s\n",
|
||||
cfd->GetName().c_str(), s.ToString().c_str());
|
||||
delete v;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
next_file_number_.store(version_edit.next_file_number_ + 1);
|
||||
last_allocated_sequence_ = version_edit.last_sequence_;
|
||||
last_published_sequence_ = version_edit.last_sequence_;
|
||||
@ -6003,6 +6013,8 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
|
||||
if (s.ok()) {
|
||||
applied_edits++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6012,13 +6024,14 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
}
|
||||
// It's possible that:
|
||||
// 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
|
||||
// Or the version(s) rebuilt from tailing the MANIFEST is inconsistent.
|
||||
// 2) we have finished reading the current MANIFEST.
|
||||
// 3) we have encountered an IOError reading the current MANIFEST.
|
||||
// We need to look for the next MANIFEST and start from there. If we cannot
|
||||
// find the next MANIFEST, we should exit the loop.
|
||||
s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
|
||||
Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
|
||||
reader = manifest_reader->get();
|
||||
if (s.ok()) {
|
||||
if (tmp_s.ok()) {
|
||||
if (reader->file()->file_name() == old_manifest_path) {
|
||||
// Still processing the same MANIFEST, thus no need to continue this
|
||||
// loop since no record is available if we have reached here.
|
||||
@ -6048,6 +6061,7 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
number_of_edits_to_skip_ += 2;
|
||||
}
|
||||
}
|
||||
s = tmp_s;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6140,19 +6154,24 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
|
||||
auto version = new Version(cfd, this, file_options_,
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
current_version_number_++);
|
||||
builder->SaveTo(version->storage_info());
|
||||
s = builder->SaveTo(version->storage_info());
|
||||
if (s.ok()) {
|
||||
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
|
||||
AppendVersion(cfd, version);
|
||||
active_version_builders_.erase(builder_iter);
|
||||
if (cfds_changed->count(cfd) == 0) {
|
||||
cfds_changed->insert(cfd);
|
||||
}
|
||||
} else {
|
||||
delete version;
|
||||
}
|
||||
} else if (s.IsPathNotFound()) {
|
||||
s = Status::OK();
|
||||
}
|
||||
// Some other error has occurred during LoadTableHandlers.
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
if (version_edit->HasNextFile()) {
|
||||
next_file_number_.store(version_edit->next_file_number_ + 1);
|
||||
}
|
||||
@ -6170,6 +6189,7 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
|
||||
}
|
||||
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
|
||||
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -1155,7 +1155,7 @@ TEST_F(VersionSetAtomicGroupTest,
|
||||
// Write the corrupted edits.
|
||||
AddNewEditsToLog(kAtomicGroupSize);
|
||||
mu.Lock();
|
||||
EXPECT_OK(
|
||||
EXPECT_NOK(
|
||||
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
|
||||
mu.Unlock();
|
||||
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
|
||||
@ -1205,7 +1205,7 @@ TEST_F(VersionSetAtomicGroupTest,
|
||||
&manifest_reader_status));
|
||||
AddNewEditsToLog(kAtomicGroupSize);
|
||||
mu.Lock();
|
||||
EXPECT_OK(
|
||||
EXPECT_NOK(
|
||||
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
|
||||
mu.Unlock();
|
||||
EXPECT_EQ(edits_[1].DebugString(),
|
||||
|
Loading…
Reference in New Issue
Block a user