fix backup engine when latest backup corrupt

Summary:
Backup engine is intentionally openable even when some backups are corrupt. Previously the engine could write new backups as long as the most recent backup wasn't corrupt. This PR makes the backup engine able to create new backups even when the most recent one is corrupt.

We now maintain two ID instance variables:

- `latest_backup_id_` is used when creating backup to choose the new ID
- `latest_valid_backup_id_` is used when restoring latest backup since we want most recent valid one
Closes https://github.com/facebook/rocksdb/pull/2804

Differential Revision: D5734148

Pulled By: ajkr

fbshipit-source-id: db440707b31df2c7b084188aa5f6368449e10bcf
This commit is contained in:
Andrew Kryczka 2017-08-31 15:30:35 -07:00 committed by Facebook Github Bot
parent 725bc403f8
commit b97685aef6
2 changed files with 35 additions and 2 deletions

View File

@ -114,7 +114,7 @@ class BackupEngineImpl : public BackupEngine {
Status RestoreDBFromLatestBackup(
const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) override {
return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir,
return RestoreDBFromBackup(latest_valid_backup_id_, db_dir, wal_dir,
restore_options);
}
@ -452,6 +452,7 @@ class BackupEngineImpl : public BackupEngine {
// backup state data
BackupID latest_backup_id_;
BackupID latest_valid_backup_id_;
std::map<BackupID, unique_ptr<BackupMeta>> backups_;
std::map<BackupID,
std::pair<Status, unique_ptr<BackupMeta>>> corrupt_backups_;
@ -593,6 +594,7 @@ Status BackupEngineImpl::Initialize() {
}
latest_backup_id_ = 0;
latest_valid_backup_id_ = 0;
if (options_.destroy_old_data) { // Destroy old data
assert(!read_only_);
ROCKS_LOG_INFO(
@ -624,6 +626,10 @@ Status BackupEngineImpl::Initialize() {
for (auto backup_iter = backups_.rbegin();
backup_iter != backups_.rend() && valid_backups_to_open > 0;
++backup_iter) {
assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
if (latest_backup_id_ == 0) {
latest_backup_id_ = backup_iter->first;
}
InsertPathnameToSizeBytes(
GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
&abs_path_to_size);
@ -644,7 +650,11 @@ Status BackupEngineImpl::Initialize() {
ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
backup_iter->first,
backup_iter->second->GetInfoString().c_str());
latest_backup_id_ = std::max(latest_backup_id_, backup_iter->first);
assert(latest_valid_backup_id_ == 0 ||
latest_valid_backup_id_ > backup_iter->first);
if (latest_valid_backup_id_ == 0) {
latest_valid_backup_id_ = backup_iter->first;
}
--valid_backups_to_open;
}
}
@ -668,6 +678,8 @@ Status BackupEngineImpl::Initialize() {
}
ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
latest_valid_backup_id_);
// set up threads perform copies from files_to_copy_or_create_ in the
// background
@ -877,6 +889,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
// here we know that we succeeded and installed the new backup
// in the LATEST_BACKUP file
latest_backup_id_ = new_backup_id;
latest_valid_backup_id_ = new_backup_id;
ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
// backup_speed is in byte/second

View File

@ -1527,6 +1527,26 @@ TEST_F(BackupableDBTest, LimitBackupsOpened) {
DestroyDB(dbname_, options_);
}
TEST_F(BackupableDBTest, CreateWhenLatestBackupCorrupted) {
// we should pick an ID greater than corrupted backups' IDs so creation can
// succeed even when latest backup is corrupted.
const int kNumKeys = 5000;
OpenDBAndBackupEngine(true /* destroy_old_data */);
FillDB(db_.get(), 0 /* from */, kNumKeys);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
true /* flush_before_backup */));
ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/1",
3 /* bytes_to_corrupt */));
CloseDBAndBackupEngine();
OpenDBAndBackupEngine();
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
true /* flush_before_backup */));
std::vector<BackupInfo> backup_infos;
backup_engine_->GetBackupInfo(&backup_infos);
ASSERT_EQ(1, backup_infos.size());
ASSERT_EQ(2, backup_infos[0].backup_id);
}
} // anon namespace
} // namespace rocksdb