Summary: Left HISTORY.md and unit tests. Added a new unit test to repro the corruption scenario that this PR fixes, and HISTORY.md line for that. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9906 Reviewed By: riversand963 Differential Revision: D35940093 Pulled By: ajkr fbshipit-source-id: 9816f99e1ce405ba36f316beb4f6378c37c8c86b
This commit is contained in:
parent
34681a1a2a
commit
3620e69404
@ -2,6 +2,7 @@
|
||||
## Unreleased
|
||||
|
||||
### Bug Fixes
|
||||
* Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen.
|
||||
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.
|
||||
|
||||
## 7.2.0 (04/15/2022)
|
||||
|
@ -337,6 +337,68 @@ TEST_F(CorruptionTest, Recovery) {
|
||||
Check(36, 36);
|
||||
}
|
||||
|
||||
TEST_F(CorruptionTest, PostPITRCorruptionWALsRetained) {
|
||||
// Repro for bug where WALs following the point-in-time recovery were not
|
||||
// retained leading to the next recovery failing.
|
||||
CloseDb();
|
||||
|
||||
options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
||||
|
||||
const std::string test_cf_name = "test_cf";
|
||||
std::vector<ColumnFamilyDescriptor> cf_descs;
|
||||
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
|
||||
cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());
|
||||
|
||||
uint64_t log_num;
|
||||
{
|
||||
options_.create_missing_column_families = true;
|
||||
std::vector<ColumnFamilyHandle*> cfhs;
|
||||
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
|
||||
|
||||
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
|
||||
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k", "v"));
|
||||
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
|
||||
std::vector<uint64_t> file_nums;
|
||||
GetSortedWalFiles(file_nums);
|
||||
log_num = file_nums.back();
|
||||
for (auto* cfh : cfhs) {
|
||||
delete cfh;
|
||||
}
|
||||
CloseDb();
|
||||
}
|
||||
|
||||
CorruptFileWithTruncation(FileType::kWalFile, log_num,
|
||||
/*bytes_to_truncate=*/1);
|
||||
|
||||
{
|
||||
// Recover "k" -> "v" for both CFs. "k2" -> "v2" is lost due to truncation.
|
||||
options_.avoid_flush_during_recovery = true;
|
||||
std::vector<ColumnFamilyHandle*> cfhs;
|
||||
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
|
||||
// Flush one but not both CFs and write some data so there's a seqno gap
|
||||
// between the PITR corruption and the next DB session's first WAL.
|
||||
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k2", "v2"));
|
||||
ASSERT_OK(db_->Flush(FlushOptions(), cfhs[1]));
|
||||
|
||||
for (auto* cfh : cfhs) {
|
||||
delete cfh;
|
||||
}
|
||||
CloseDb();
|
||||
}
|
||||
|
||||
// With the bug, this DB open would remove the WALs following the PITR
|
||||
// corruption. Then, the next recovery would fail.
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
std::vector<ColumnFamilyHandle*> cfhs;
|
||||
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
|
||||
|
||||
for (auto* cfh : cfhs) {
|
||||
delete cfh;
|
||||
}
|
||||
CloseDb();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CorruptionTest, RecoverWriteError) {
|
||||
env_->writable_file_error_ = true;
|
||||
Status s = TryReopen();
|
||||
|
@ -1240,43 +1240,6 @@ class DBImpl : public DB {
|
||||
|
||||
std::atomic<bool> shutting_down_;
|
||||
|
||||
// RecoveryContext struct stores the context about version edits along
|
||||
// with corresponding column_family_data and column_family_options.
|
||||
class RecoveryContext {
|
||||
public:
|
||||
~RecoveryContext() {
|
||||
for (auto& edit_list : edit_lists_) {
|
||||
for (auto* edit : edit_list) {
|
||||
delete edit;
|
||||
}
|
||||
edit_list.clear();
|
||||
}
|
||||
cfds_.clear();
|
||||
mutable_cf_opts_.clear();
|
||||
edit_lists_.clear();
|
||||
files_to_delete_.clear();
|
||||
}
|
||||
|
||||
void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) {
|
||||
if (map_.find(cfd->GetID()) == map_.end()) {
|
||||
uint32_t size = static_cast<uint32_t>(map_.size());
|
||||
map_.emplace(cfd->GetID(), size);
|
||||
cfds_.emplace_back(cfd);
|
||||
mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions());
|
||||
edit_lists_.emplace_back(autovector<VersionEdit*>());
|
||||
}
|
||||
uint32_t i = map_[cfd->GetID()];
|
||||
edit_lists_[i].emplace_back(new VersionEdit(edit));
|
||||
}
|
||||
|
||||
std::unordered_map<uint32_t, uint32_t> map_; // cf_id to index;
|
||||
autovector<ColumnFamilyData*> cfds_;
|
||||
autovector<const MutableCFOptions*> mutable_cf_opts_;
|
||||
autovector<autovector<VersionEdit*>> edit_lists_;
|
||||
// files_to_delete_ contains sst files
|
||||
std::set<std::string> files_to_delete_;
|
||||
};
|
||||
|
||||
// Except in DB::Open(), WriteOptionsFile can only be called when:
|
||||
// Persist options to options file.
|
||||
// If need_mutex_lock = false, the method will lock DB mutex.
|
||||
@ -1393,19 +1356,16 @@ class DBImpl : public DB {
|
||||
// be made to the descriptor are added to *edit.
|
||||
// recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
|
||||
// skipped.
|
||||
// recovery_ctx stores the context about version edits and all those
|
||||
// edits are persisted to new Manifest after successfully syncing the new WAL.
|
||||
virtual Status Recover(
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
bool read_only = false, bool error_if_wal_file_exists = false,
|
||||
bool error_if_data_exists_in_wals = false,
|
||||
uint64_t* recovered_seq = nullptr,
|
||||
RecoveryContext* recovery_ctx = nullptr);
|
||||
uint64_t* recovered_seq = nullptr);
|
||||
|
||||
virtual bool OwnTablesAndLogs() const { return true; }
|
||||
|
||||
// Set DB identity file, and write DB ID to manifest if necessary.
|
||||
Status SetDBId(bool read_only, RecoveryContext* recovery_ctx);
|
||||
Status SetDBId(bool read_only);
|
||||
|
||||
// REQUIRES: db mutex held when calling this function, but the db mutex can
|
||||
// be released and re-acquired. Db mutex will be held when the function
|
||||
@ -1414,15 +1374,12 @@ class DBImpl : public DB {
|
||||
// not referenced in the MANIFEST (e.g.
|
||||
// 1. It's best effort recovery;
|
||||
// 2. The VersionEdits referencing the SST files are appended to
|
||||
// RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are
|
||||
// MANIFEST, DB crashes when syncing the MANIFEST, the VersionEdits are
|
||||
// still not synced to MANIFEST during recovery.)
|
||||
// It stores the SST files to be deleted in RecoveryContext. In the
|
||||
// We delete these SST files. In the
|
||||
// meantime, we find out the largest file number present in the paths, and
|
||||
// bump up the version set's next_file_number_ to be 1 + largest_file_number.
|
||||
// recovery_ctx stores the context about version edits and files to be
|
||||
// deleted. All those edits are persisted to new Manifest after successfully
|
||||
// syncing the new WAL.
|
||||
Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx);
|
||||
Status DeleteUnreferencedSstFiles();
|
||||
|
||||
// SetDbSessionId() should be called in the constuctor DBImpl()
|
||||
// to ensure that db_session_id_ gets updated every time the DB is opened
|
||||
@ -1432,11 +1389,6 @@ class DBImpl : public DB {
|
||||
Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family,
|
||||
const Slice& ts) const;
|
||||
|
||||
// recovery_ctx stores the context about version edits and
|
||||
// LogAndApplyForRecovery persist all those edits to new Manifest after
|
||||
// successfully syncing new WAL.
|
||||
Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);
|
||||
|
||||
private:
|
||||
friend class DB;
|
||||
friend class ErrorHandler;
|
||||
@ -1691,10 +1643,9 @@ class DBImpl : public DB {
|
||||
|
||||
// REQUIRES: log_numbers are sorted in ascending order
|
||||
// corrupted_log_found is set to true if we recover from a corrupted log file.
|
||||
Status RecoverLogFiles(std::vector<uint64_t>& log_numbers,
|
||||
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
||||
SequenceNumber* next_sequence, bool read_only,
|
||||
bool* corrupted_log_found,
|
||||
RecoveryContext* recovery_ctx);
|
||||
bool* corrupted_log_found);
|
||||
|
||||
// The following two methods are used to flush a memtable to
|
||||
// storage. The first one is used at database RecoveryTime (when the
|
||||
@ -1704,12 +1655,6 @@ class DBImpl : public DB {
|
||||
Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||
MemTable* mem, VersionEdit* edit);
|
||||
|
||||
// Move all the WAL files starting from corrupted WAL found to
|
||||
// max_wal_number to avoid column family inconsistency error on recovery. It
|
||||
// also removes the deleted file from the vector wal_numbers.
|
||||
void MoveCorruptedWalFiles(std::vector<uint64_t>& wal_numbers,
|
||||
uint64_t corrupted_wal_number);
|
||||
|
||||
// Get the size of a log file and, if truncate is true, truncate the
|
||||
// log file to its actual size, thereby freeing preallocated space.
|
||||
// Return success even if truncate fails
|
||||
|
@ -863,7 +863,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
|
||||
return min_log_number_to_keep;
|
||||
}
|
||||
|
||||
Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
|
||||
Status DBImpl::SetDBId(bool read_only) {
|
||||
Status s;
|
||||
// Happens when immutable_db_options_.write_dbid_to_manifest is set to true
|
||||
// the very first time.
|
||||
@ -890,14 +890,14 @@ Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
|
||||
}
|
||||
s = GetDbIdentityFromIdentityFile(&db_id_);
|
||||
if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
|
||||
assert(!read_only);
|
||||
assert(recovery_ctx != nullptr);
|
||||
assert(versions_->GetColumnFamilySet() != nullptr);
|
||||
VersionEdit edit;
|
||||
edit.SetDBId(db_id_);
|
||||
Options options;
|
||||
MutableCFOptions mutable_cf_options(options);
|
||||
versions_->db_id_ = db_id_;
|
||||
recovery_ctx->UpdateVersionEdits(
|
||||
versions_->GetColumnFamilySet()->GetDefault(), edit);
|
||||
s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
|
||||
mutable_cf_options, &edit, &mutex_, nullptr,
|
||||
/* new_descriptor_log */ false);
|
||||
}
|
||||
} else if (!read_only) {
|
||||
s = SetIdentityFile(env_, dbname_, db_id_);
|
||||
@ -905,7 +905,7 @@ Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
|
||||
return s;
|
||||
}
|
||||
|
||||
Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
|
||||
Status DBImpl::DeleteUnreferencedSstFiles() {
|
||||
mutex_.AssertHeld();
|
||||
std::vector<std::string> paths;
|
||||
paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
|
||||
@ -925,6 +925,7 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
|
||||
|
||||
uint64_t next_file_number = versions_->current_next_file_number();
|
||||
uint64_t largest_file_number = next_file_number;
|
||||
std::set<std::string> files_to_delete;
|
||||
Status s;
|
||||
for (const auto& path : paths) {
|
||||
std::vector<std::string> files;
|
||||
@ -942,9 +943,8 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
|
||||
const std::string normalized_fpath = path + fname;
|
||||
largest_file_number = std::max(largest_file_number, number);
|
||||
if (type == kTableFile && number >= next_file_number &&
|
||||
recovery_ctx->files_to_delete_.find(normalized_fpath) ==
|
||||
recovery_ctx->files_to_delete_.end()) {
|
||||
recovery_ctx->files_to_delete_.insert(normalized_fpath);
|
||||
files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
|
||||
files_to_delete.insert(normalized_fpath);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -961,7 +961,21 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
|
||||
assert(versions_->GetColumnFamilySet());
|
||||
ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||
assert(default_cfd);
|
||||
recovery_ctx->UpdateVersionEdits(default_cfd, edit);
|
||||
s = versions_->LogAndApply(
|
||||
default_cfd, *default_cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
|
||||
directories_.GetDbDir(), /*new_descriptor_log*/ false);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
mutex_.Unlock();
|
||||
for (const auto& fname : files_to_delete) {
|
||||
s = env_->DeleteFile(fname);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
mutex_.Lock();
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -399,7 +399,7 @@ IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
|
||||
Status DBImpl::Recover(
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
|
||||
bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
|
||||
uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
|
||||
uint64_t* recovered_seq) {
|
||||
mutex_.AssertHeld();
|
||||
|
||||
bool is_new_db = false;
|
||||
@ -518,10 +518,9 @@ Status DBImpl::Recover(
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
s = SetDBId(read_only, recovery_ctx);
|
||||
s = SetDBId(read_only);
|
||||
if (s.ok() && !read_only) {
|
||||
s = DeleteUnreferencedSstFiles(recovery_ctx);
|
||||
s = DeleteUnreferencedSstFiles();
|
||||
}
|
||||
|
||||
if (immutable_db_options_.paranoid_checks && s.ok()) {
|
||||
@ -536,6 +535,10 @@ Status DBImpl::Recover(
|
||||
}
|
||||
}
|
||||
}
|
||||
// DB mutex is already held
|
||||
if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
|
||||
s = InitPersistStatsColumnFamily();
|
||||
}
|
||||
|
||||
std::vector<std::string> files_in_wal_dir;
|
||||
if (s.ok()) {
|
||||
@ -605,10 +608,7 @@ Status DBImpl::Recover(
|
||||
WalNumber max_wal_number =
|
||||
versions_->GetWalSet().GetWals().rbegin()->first;
|
||||
edit.DeleteWalsBefore(max_wal_number + 1);
|
||||
assert(recovery_ctx != nullptr);
|
||||
assert(versions_->GetColumnFamilySet() != nullptr);
|
||||
recovery_ctx->UpdateVersionEdits(
|
||||
versions_->GetColumnFamilySet()->GetDefault(), edit);
|
||||
s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
@ -644,8 +644,8 @@ Status DBImpl::Recover(
|
||||
std::sort(wals.begin(), wals.end());
|
||||
|
||||
bool corrupted_wal_found = false;
|
||||
s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found,
|
||||
recovery_ctx);
|
||||
s = RecoverLogFiles(wals, &next_sequence, read_only,
|
||||
&corrupted_wal_found);
|
||||
if (corrupted_wal_found && recovered_seq != nullptr) {
|
||||
*recovered_seq = next_sequence;
|
||||
}
|
||||
@ -805,30 +805,10 @@ Status DBImpl::InitPersistStatsColumnFamily() {
|
||||
return s;
|
||||
}
|
||||
|
||||
Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
|
||||
mutex_.AssertHeld();
|
||||
assert(versions_->descriptor_log_ == nullptr);
|
||||
Status s = versions_->LogAndApply(
|
||||
recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_,
|
||||
recovery_ctx.edit_lists_, &mutex_, directories_.GetDbDir());
|
||||
if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) {
|
||||
mutex_.Unlock();
|
||||
for (const auto& fname : recovery_ctx.files_to_delete_) {
|
||||
s = env_->DeleteFile(fname);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
mutex_.Lock();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
// REQUIRES: wal_numbers are sorted in ascending order
|
||||
Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
|
||||
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
|
||||
SequenceNumber* next_sequence, bool read_only,
|
||||
bool* corrupted_wal_found,
|
||||
RecoveryContext* recovery_ctx) {
|
||||
bool* corrupted_wal_found) {
|
||||
struct LogReporter : public log::Reader::Reporter {
|
||||
Env* env;
|
||||
Logger* info_log;
|
||||
@ -853,7 +833,6 @@ Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
|
||||
edit.SetColumnFamily(cfd->GetID());
|
||||
version_edits.insert({cfd->GetID(), edit});
|
||||
}
|
||||
|
||||
int job_id = next_job_id_.fetch_add(1);
|
||||
{
|
||||
auto stream = event_logger_.Log();
|
||||
@ -1277,7 +1256,6 @@ Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
|
||||
edit->SetLogNumber(max_wal_number + 1);
|
||||
}
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
// we must mark the next log number as used, even though it's
|
||||
// not actually used. that is because VersionSet assumes
|
||||
@ -1285,40 +1263,42 @@ Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
|
||||
// log number
|
||||
versions_->MarkFileNumberUsed(max_wal_number + 1);
|
||||
|
||||
if (corrupted_wal_found != nullptr && *corrupted_wal_found == true &&
|
||||
immutable_db_options_.wal_recovery_mode ==
|
||||
WALRecoveryMode::kPointInTimeRecovery) {
|
||||
MoveCorruptedWalFiles(wal_numbers, corrupted_wal_number);
|
||||
}
|
||||
|
||||
assert(recovery_ctx != nullptr);
|
||||
autovector<ColumnFamilyData*> cfds;
|
||||
autovector<const MutableCFOptions*> cf_opts;
|
||||
autovector<autovector<VersionEdit*>> edit_lists;
|
||||
for (auto* cfd : *versions_->GetColumnFamilySet()) {
|
||||
cfds.push_back(cfd);
|
||||
cf_opts.push_back(cfd->GetLatestMutableCFOptions());
|
||||
auto iter = version_edits.find(cfd->GetID());
|
||||
assert(iter != version_edits.end());
|
||||
recovery_ctx->UpdateVersionEdits(cfd, iter->second);
|
||||
edit_lists.push_back({&iter->second});
|
||||
}
|
||||
|
||||
std::unique_ptr<VersionEdit> wal_deletion;
|
||||
if (flushed) {
|
||||
VersionEdit wal_deletion;
|
||||
wal_deletion = std::make_unique<VersionEdit>();
|
||||
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
|
||||
wal_deletion.DeleteWalsBefore(max_wal_number + 1);
|
||||
wal_deletion->DeleteWalsBefore(max_wal_number + 1);
|
||||
}
|
||||
if (!allow_2pc()) {
|
||||
// In non-2pc mode, flushing the memtables of the column families
|
||||
// means we can advance min_log_number_to_keep.
|
||||
wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1);
|
||||
wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1);
|
||||
}
|
||||
assert(versions_->GetColumnFamilySet() != nullptr);
|
||||
recovery_ctx->UpdateVersionEdits(
|
||||
versions_->GetColumnFamilySet()->GetDefault(), wal_deletion);
|
||||
edit_lists.back().push_back(wal_deletion.get());
|
||||
}
|
||||
|
||||
// write MANIFEST with update
|
||||
status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
|
||||
directories_.GetDbDir(),
|
||||
/*new_descriptor_log=*/true);
|
||||
}
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
if (data_seen && !flushed) {
|
||||
status = RestoreAliveLogFiles(wal_numbers);
|
||||
} else if (!wal_numbers.empty()) {
|
||||
} else {
|
||||
// If there's no data in the WAL, or we flushed all the data, still
|
||||
// truncate the log file. If the process goes into a crash loop before
|
||||
// the file is deleted, the preallocated space will never get freed.
|
||||
@ -1334,48 +1314,6 @@ Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
|
||||
return status;
|
||||
}
|
||||
|
||||
void DBImpl::MoveCorruptedWalFiles(std::vector<uint64_t>& wal_numbers,
|
||||
uint64_t corrupted_wal_number) {
|
||||
size_t num_wals = wal_numbers.size();
|
||||
// Find the first corrupted wal.
|
||||
auto iter = std::lower_bound(wal_numbers.begin(), wal_numbers.end(),
|
||||
corrupted_wal_number);
|
||||
auto corrupt_start_iter = iter;
|
||||
|
||||
// Increment iter to move WAL files from first corrupted_wal_number + 1.
|
||||
iter++;
|
||||
|
||||
std::string archival_path =
|
||||
ArchivalDirectory(immutable_db_options_.GetWalDir());
|
||||
Status create_status = env_->CreateDirIfMissing(archival_path);
|
||||
|
||||
// create_status is only checked when it needs to move the corrupted WAL files
|
||||
// to archive folder.
|
||||
create_status.PermitUncheckedError();
|
||||
|
||||
// Truncate the last WAL to reclaim the pre allocated space before
|
||||
// moving it.
|
||||
GetLogSizeAndMaybeTruncate(wal_numbers.back(), /*truncate=*/true, nullptr)
|
||||
.PermitUncheckedError();
|
||||
|
||||
// Move all the WAL files from corrupted_wal_number + 1 to last WAL
|
||||
// (max_wal_number) to avoid column family inconsistency error to archival
|
||||
// directory. If its unable to create archive dir, it will delete the
|
||||
// corrupted WAL files.
|
||||
// We are moving all but first corrupted WAL file to a different folder.
|
||||
while (iter != wal_numbers.end()) {
|
||||
LogFileNumberSize log(*iter);
|
||||
std::string fname = LogFileName(immutable_db_options_.GetWalDir(), *iter);
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (create_status.ok()) {
|
||||
wal_manager_.ArchiveWALFile(fname, *iter);
|
||||
}
|
||||
#endif
|
||||
iter++;
|
||||
}
|
||||
wal_numbers.erase(corrupt_start_iter + 1, wal_numbers.begin() + num_wals);
|
||||
}
|
||||
|
||||
Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
|
||||
LogFileNumberSize* log_ptr) {
|
||||
LogFileNumberSize log(wal_number);
|
||||
@ -1438,8 +1376,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
|
||||
// log has such preallocated space, so we only truncate for the last log.
|
||||
LogFileNumberSize log;
|
||||
s = GetLogSizeAndMaybeTruncate(
|
||||
wal_number,
|
||||
/*truncate=*/(wal_number == wal_numbers.back()), &log);
|
||||
wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
@ -1800,13 +1737,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
|
||||
|
||||
impl->mutex_.Lock();
|
||||
|
||||
RecoveryContext recovery_ctx;
|
||||
|
||||
// Handles create_if_missing, error_if_exists
|
||||
uint64_t recovered_seq(kMaxSequenceNumber);
|
||||
s = impl->Recover(column_families, false, false, false, &recovered_seq,
|
||||
&recovery_ctx);
|
||||
s = impl->Recover(column_families, false, false, false, &recovered_seq);
|
||||
if (s.ok()) {
|
||||
uint64_t new_log_number = impl->versions_->NewFileNumber();
|
||||
log::Writer* new_log = nullptr;
|
||||
@ -1823,6 +1756,40 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
// set column family handles
|
||||
for (auto cf : column_families) {
|
||||
auto cfd =
|
||||
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
|
||||
if (cfd != nullptr) {
|
||||
handles->push_back(
|
||||
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
|
||||
impl->NewThreadStatusCfInfo(cfd);
|
||||
} else {
|
||||
if (db_options.create_missing_column_families) {
|
||||
// missing column family, create it
|
||||
ColumnFamilyHandle* handle;
|
||||
impl->mutex_.Unlock();
|
||||
s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
|
||||
impl->mutex_.Lock();
|
||||
if (s.ok()) {
|
||||
handles->push_back(handle);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
s = Status::InvalidArgument("Column family not found", cf.name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
SuperVersionContext sv_context(/* create_superversion */ true);
|
||||
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
|
||||
impl->InstallSuperVersionAndScheduleWork(
|
||||
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
|
||||
}
|
||||
sv_context.Clean();
|
||||
if (impl->two_write_queues_) {
|
||||
impl->log_write_mutex_.Lock();
|
||||
}
|
||||
@ -1835,15 +1802,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
}
|
||||
if (s.ok()) {
|
||||
// In WritePrepared there could be gap in sequence numbers. This breaks
|
||||
// the trick we use in kPointInTimeRecovery which assumes the first seq
|
||||
// in the log right after the corrupted log is one larger than the last
|
||||
// seq we read from the wals. To let this trick keep working, we add a
|
||||
// dummy entry with the expected sequence to the first log right after
|
||||
// recovery. In non-WritePrepared case also the new log after recovery
|
||||
// could be empty, and thus missing the consecutive seq hint to
|
||||
// distinguish middle-log corruption to
|
||||
// corrupted-log-remained-after-recovery. This case also will be
|
||||
// addressed by a dummy write.
|
||||
// the trick we use in kPointInTimeRecovery which assumes the first seq in
|
||||
// the log right after the corrupted log is one larger than the last seq
|
||||
// we read from the wals. To let this trick keep working, we add a dummy
|
||||
// entry with the expected sequence to the first log right after recovery.
|
||||
// In non-WritePrepared case also the new log after recovery could be
|
||||
// empty, and thus missing the consecutive seq hint to distinguish
|
||||
// middle-log corruption to corrupted-log-remained-after-recovery. This
|
||||
// case also will be addressed by a dummy write.
|
||||
if (recovered_seq != kMaxSequenceNumber) {
|
||||
WriteBatch empty_batch;
|
||||
WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
|
||||
@ -1862,52 +1828,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
}
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = impl->LogAndApplyForRecovery(recovery_ctx);
|
||||
}
|
||||
|
||||
if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
|
||||
impl->mutex_.AssertHeld();
|
||||
s = impl->InitPersistStatsColumnFamily();
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
// set column family handles
|
||||
for (auto cf : column_families) {
|
||||
auto cfd =
|
||||
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
|
||||
if (cfd != nullptr) {
|
||||
handles->push_back(
|
||||
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
|
||||
impl->NewThreadStatusCfInfo(cfd);
|
||||
} else {
|
||||
if (db_options.create_missing_column_families) {
|
||||
// missing column family, create it
|
||||
ColumnFamilyHandle* handle;
|
||||
impl->mutex_.Unlock();
|
||||
s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
|
||||
impl->mutex_.Lock();
|
||||
if (s.ok()) {
|
||||
handles->push_back(handle);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
s = Status::InvalidArgument("Column family not found", cf.name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
SuperVersionContext sv_context(/* create_superversion */ true);
|
||||
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
|
||||
impl->InstallSuperVersionAndScheduleWork(
|
||||
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
|
||||
}
|
||||
sv_context.Clean();
|
||||
}
|
||||
if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
|
||||
// try to read format version
|
||||
s = impl->PersistentStatsProcessFormatVersion();
|
||||
@ -1933,8 +1853,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
if (cfd->ioptions()->merge_operator != nullptr &&
|
||||
!cfd->mem()->IsMergeOperatorSupported()) {
|
||||
s = Status::InvalidArgument(
|
||||
"The memtable of column family %s does not support merge "
|
||||
"operator "
|
||||
"The memtable of column family %s does not support merge operator "
|
||||
"its options.merge_operator is non-null",
|
||||
cfd->GetName().c_str());
|
||||
}
|
||||
|
@ -33,8 +33,7 @@ DBImplSecondary::~DBImplSecondary() {}
|
||||
Status DBImplSecondary::Recover(
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
|
||||
bool /*error_if_data_exists_in_wals*/, uint64_t*,
|
||||
RecoveryContext* /*recovery_ctx*/) {
|
||||
bool /*error_if_data_exists_in_wals*/, uint64_t*) {
|
||||
mutex_.AssertHeld();
|
||||
|
||||
JobContext job_context(0);
|
||||
|
@ -81,8 +81,8 @@ class DBImplSecondary : public DBImpl {
|
||||
// and log_readers_ to facilitate future operations.
|
||||
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
bool read_only, bool error_if_wal_file_exists,
|
||||
bool error_if_data_exists_in_wals, uint64_t* = nullptr,
|
||||
RecoveryContext* recovery_ctx = nullptr) override;
|
||||
bool error_if_data_exists_in_wals,
|
||||
uint64_t* = nullptr) override;
|
||||
|
||||
// Implementations of the DB interface
|
||||
using DB::Get;
|
||||
|
@ -1428,7 +1428,7 @@ public class RocksDBTest {
|
||||
assertThat(livefiles.manifestFileSize).isEqualTo(59);
|
||||
assertThat(livefiles.files.size()).isEqualTo(3);
|
||||
assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT");
|
||||
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000005");
|
||||
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004");
|
||||
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000007");
|
||||
}
|
||||
}
|
||||
|
@ -604,14 +604,10 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) {
|
||||
dbfull()->TEST_WaitForStatsDumpRun(
|
||||
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
|
||||
// writing to all three cf, flush default cf
|
||||
// LogNumbers: default: 16, stats: 10, pikachu: 5
|
||||
// Since in recovery process, cfd_stats column is created after WAL is
|
||||
// created, synced and MANIFEST is persisted, its log number which depends on
|
||||
// logfile_number_ will be different. Since "pikachu" is never flushed, thus
|
||||
// its log_number should be the smallest of the three.
|
||||
// LogNumbers: default: 14, stats: 4, pikachu: 4
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_LT(cfd_test->GetLogNumber(), cfd_stats->GetLogNumber());
|
||||
ASSERT_LT(cfd_test->GetLogNumber(), cfd_default->GetLogNumber());
|
||||
ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
|
||||
ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
|
||||
|
||||
ASSERT_OK(Put("foo1", "v1"));
|
||||
ASSERT_OK(Put("bar1", "v1"));
|
||||
|
Loading…
Reference in New Issue
Block a user