From a7fd1d0881b0e2d10714c34c1e1e5819f6a99cb3 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Fri, 6 Aug 2021 09:48:53 -0700 Subject: [PATCH] Make backup restore atomic, with sync option (#8568) Summary: Guarantees that if a restore is interrupted, DB::Open will fail. This works by restoring CURRENT first to CURRENT.tmp then as a final step renaming to CURRENT. Also makes restore respect BackupEngineOptions::sync (default true). When set, the restore is guaranteed persisted by the time it returns OK. Also makes the above atomicity guarantee work in case the interruption is power loss or OS crash (not just process interruption or crash). Fixes https://github.com/facebook/rocksdb/issues/8500 Pull Request resolved: https://github.com/facebook/rocksdb/pull/8568 Test Plan: added to backup mini-stress unit test. Passes with gtest_repeat=100 (whereas fails 7 times without the CURRENT.tmp) Reviewed By: akankshamahajan15 Differential Revision: D29812605 Pulled By: pdillinger fbshipit-source-id: 24e9a993b305b1835ca95558fa7a7152e54cda8e --- HISTORY.md | 4 +- include/rocksdb/utilities/backup_engine.h | 9 +- utilities/backupable/backupable_db.cc | 57 ++++++- utilities/backupable/backupable_db_test.cc | 168 +++++++++++++-------- 4 files changed, 169 insertions(+), 69 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index eec1ad616..76a99ea35 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes * If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file. +* Restoring backups with BackupEngine is now a logically atomic operation, so that if a restore operation is interrupted, DB::Open on it will fail. Using BackupEngineOptions::sync (default) ensures atomicity even in case of power loss or OS crash. * Fixed a race related to the destruction of `ColumnFamilyData` objects. The earlier logic unlocked the DB mutex before destroying the thread-local `SuperVersion` pointers, which could result in a process crash if another thread managed to get a reference to the `ColumnFamilyData` object. * Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file. * Fixed an issue where `OnFlushCompleted` was not called for atomic flush. @@ -17,6 +18,7 @@ ### Behavior Changes * `StringAppendOperator` additionally accepts a string as the delimiter. +* BackupEngineOptions::sync (default true) now applies to restoring backups in addition to creating backups. This could slow down restores, but ensures they are fully persisted before returning OK. (Consider increasing max_background_operations to improve performance.) ## 6.23.0 (2021-07-16) ### Behavior Changes @@ -26,7 +28,7 @@ * `GetLiveFilesMetaData()` now populates the `temperature`, `oldest_ancester_time`, and `file_creation_time` fields of its `LiveFileMetaData` results when the information is available. Previously these fields always contained zero indicating unknown. * Fix mismatches of OnCompaction{Begin,Completed} in case of DisableManualCompaction(). * Fix continuous logging of an existing background error on every user write -* Fix a bug that `Get()` return Status::OK() and an empty value for non-existent key when `read_options.read_tier = kBlockCacheTier`. +* Fix a bug that `Get()` return Status::OK() and an empty value for non-existent key when `read_options.read_tier = kBlockCacheTier`. * Fix a bug that stat in `get_context` didn't accumulate to statistics when query is failed. * Fixed handling of DBOptions::wal_dir with LoadLatestOptions() or ldb --try_load_options on a copied or moved DB. Previously, when the WAL directory is same as DB directory (default), a copied or moved DB would reference the old path of the DB as the WAL directory, potentially corrupting both copies. Under this change, the wal_dir from DB::GetOptions() or LoadLatestOptions() may now be empty, indicating that the current DB directory is used for WALs. This is also a subtle API change. diff --git a/include/rocksdb/utilities/backup_engine.h b/include/rocksdb/utilities/backup_engine.h index d6a7764e6..8b9426e83 100644 --- a/include/rocksdb/utilities/backup_engine.h +++ b/include/rocksdb/utilities/backup_engine.h @@ -56,10 +56,11 @@ struct BackupEngineOptions { // Default: nullptr Logger* info_log; - // If sync == true, we can guarantee you'll get consistent backup even - // on a machine crash/reboot. Backup process is slower with sync enabled. - // If sync == false, we don't guarantee anything on machine reboot. However, - // chances are some of the backups are consistent. + // If sync == true, we can guarantee you'll get consistent backup and + // restore even on a machine crash/reboot. Backup and restore processes are + // slower with sync enabled. If sync == false, we can only guarantee that + // other previously synced backups and restores are not modified while + // creating a new one. // Default: true bool sync; diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index b98c56bc6..294fde6ca 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -1707,6 +1707,11 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, } Status s; std::vector restore_items_to_finish; + std::string temporary_current_file; + std::string final_current_file; + std::unique_ptr db_dir_for_fsync; + std::unique_ptr wal_dir_for_fsync; + for (const auto& file_info : backup->GetFiles()) { const std::string& file = file_info->filename; // 1. get DB filename @@ -1722,13 +1727,36 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, } // 3. Construct the final path // kWalFile lives in wal_dir and all the rest live in db_dir - dst = ((type == kWalFile) ? wal_dir : db_dir) + "/" + dst; + if (type == kWalFile) { + dst = wal_dir + "/" + dst; + if (options_.sync && !wal_dir_for_fsync) { + s = db_env_->NewDirectory(wal_dir, &wal_dir_for_fsync); + if (!s.ok()) { + return s; + } + } + } else { + dst = db_dir + "/" + dst; + if (options_.sync && !db_dir_for_fsync) { + s = db_env_->NewDirectory(db_dir, &db_dir_for_fsync); + if (!s.ok()) { + return s; + } + } + } + // For atomicity, initially restore CURRENT file to a temporary name. + // This is useful even without options_.sync e.g. in case the restore + // process is interrupted. + if (type == kCurrentFile) { + final_current_file = dst; + dst = temporary_current_file = dst + ".tmp"; + } ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); CopyOrCreateWorkItem copy_or_create_work_item( GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_, - EnvOptions() /* src_env_options */, false, rate_limiter, + EnvOptions() /* src_env_options */, options_.sync, rate_limiter, 0 /* size_limit */); RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), file, dst, @@ -1757,6 +1785,31 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, } } + // When enabled, the first Fsync is to ensure all files are fully persisted + // before renaming CURRENT.tmp + if (s.ok() && db_dir_for_fsync) { + ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n"); + s = db_dir_for_fsync->Fsync(); + } + + if (s.ok() && wal_dir_for_fsync) { + s = wal_dir_for_fsync->Fsync(); + } + + if (s.ok() && !temporary_current_file.empty()) { + ROCKS_LOG_INFO(options_.info_log, "Restore: atomic rename CURRENT.tmp\n"); + assert(!final_current_file.empty()); + s = db_env_->RenameFile(temporary_current_file, final_current_file); + } + + if (s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) { + // Second Fsync is to ensure the final atomic rename of DB restore is + // fully persisted even if power goes out right after restore operation + // returns success + assert(db_dir_for_fsync); + s = db_dir_for_fsync->Fsync(); + } + ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str()); return s; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index f537d134c..2b7c5c8a3 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -642,9 +642,22 @@ class BackupEngineTest : public testing::Test { CreateLoggerFromOptions(dbname_, logger_options, &logger_) .PermitUncheckedError(); + // The sync option is not easily testable in unit tests, but should be + // smoke tested across all the other backup tests. However, it is + // certainly not worth doubling the runtime of backup tests for it. + // Thus, we can enable sync for one of our alternate testing + // configurations. + constexpr bool kUseSync = +#ifdef ROCKSDB_MODIFY_NPHASH + true; +#else + false; +#endif // ROCKSDB_MODIFY_NPHASH + // set up backup db options backupable_options_.reset(new BackupableDBOptions( - backupdir_, test_backup_env_.get(), true, logger_.get(), true)); + backupdir_, test_backup_env_.get(), /*share_table_files*/ true, + logger_.get(), kUseSync)); // most tests will use multi-threaded backups backupable_options_->max_background_operations = 7; @@ -3122,82 +3135,108 @@ TEST_F(BackupEngineTest, Concurrency) { Options db_opts = options_; db_opts.wal_dir = ""; + db_opts.create_if_missing = false; BackupableDBOptions be_opts = *backupable_options_; be_opts.destroy_old_data = false; std::mt19937 rng{std::random_device()()}; std::array read_threads; + std::array restore_verify_threads; for (uint32_t i = 0; i < read_threads.size(); ++i) { uint32_t sleep_micros = rng() % 100000; - read_threads[i] = std::thread([this, i, sleep_micros, &db_opts, &be_opts] { - test_db_env_->SleepForMicroseconds(sleep_micros); + read_threads[i] = std::thread( + [this, i, sleep_micros, &db_opts, &be_opts, &restore_verify_threads] { + test_db_env_->SleepForMicroseconds(sleep_micros); - // Whether to also re-open the BackupEngine, potentially seeing - // additional backups - bool reopen = i == 3; - // Whether we are going to restore "latest" - bool latest = i > 1; + // Whether to also re-open the BackupEngine, potentially seeing + // additional backups + bool reopen = i == 3; + // Whether we are going to restore "latest" + bool latest = i > 1; - BackupEngine* my_be; - if (reopen) { - ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be)); - } else { - my_be = backup_engine_.get(); - } + BackupEngine* my_be; + if (reopen) { + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be)); + } else { + my_be = backup_engine_.get(); + } - // Verify metadata (we don't receive updates from concurrently - // creating a new backup) - std::vector infos; - my_be->GetBackupInfo(&infos); - const uint32_t count = static_cast(infos.size()); - infos.clear(); - if (reopen) { - ASSERT_GE(count, 2U); - ASSERT_LE(count, 4U); - fprintf(stderr, "Reopen saw %u backups\n", count); - } else { - ASSERT_EQ(count, 2U); - } - std::vector ids; - my_be->GetCorruptedBackups(&ids); - ASSERT_EQ(ids.size(), 0U); + // Verify metadata (we don't receive updates from concurrently + // creating a new backup) + std::vector infos; + my_be->GetBackupInfo(&infos); + const uint32_t count = static_cast(infos.size()); + infos.clear(); + if (reopen) { + ASSERT_GE(count, 2U); + ASSERT_LE(count, 4U); + fprintf(stderr, "Reopen saw %u backups\n", count); + } else { + ASSERT_EQ(count, 2U); + } + std::vector ids; + my_be->GetCorruptedBackups(&ids); + ASSERT_EQ(ids.size(), 0U); - // Restore one of the backups, or "latest" - std::string restore_db_dir = dbname_ + "/restore" + ToString(i); - BackupID to_restore; - if (latest) { - to_restore = count; - ASSERT_OK( - my_be->RestoreDBFromLatestBackup(restore_db_dir, restore_db_dir)); - } else { - to_restore = i + 1; - ASSERT_OK(my_be->VerifyBackup(to_restore, true)); - ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir, - restore_db_dir)); - } + // (Eventually, see below) Restore one of the backups, or "latest" + std::string restore_db_dir = dbname_ + "/restore" + ToString(i); + DestroyDir(test_db_env_.get(), restore_db_dir).PermitUncheckedError(); + BackupID to_restore; + if (latest) { + to_restore = count; + } else { + to_restore = i + 1; + } - // Open restored DB to verify its contents - DB* restored; - ASSERT_OK(DB::Open(db_opts, restore_db_dir, &restored)); - int factor = std::min(static_cast(to_restore), max_factor); - AssertExists(restored, 0, factor * keys_iteration); - AssertEmpty(restored, factor * keys_iteration, - (factor + 1) * keys_iteration); - delete restored; + // Open restored DB to verify its contents, but test atomic restore + // by doing it async and ensuring we either get OK or InvalidArgument + restore_verify_threads[i] = + std::thread([this, &db_opts, restore_db_dir, to_restore] { + DB* restored; + Status s; + for (;;) { + s = DB::Open(db_opts, restore_db_dir, &restored); + if (s.IsInvalidArgument()) { + // Restore hasn't finished + test_db_env_->SleepForMicroseconds(1000); + continue; + } else { + // We should only get InvalidArgument if restore is + // incomplete, or OK if complete + ASSERT_OK(s); + break; + } + } + int factor = std::min(static_cast(to_restore), max_factor); + AssertExists(restored, 0, factor * keys_iteration); + AssertEmpty(restored, factor * keys_iteration, + (factor + 1) * keys_iteration); + delete restored; + }); - // Re-verify metadata (we don't receive updates from concurrently - // creating a new backup) - my_be->GetBackupInfo(&infos); - ASSERT_EQ(infos.size(), count); - my_be->GetCorruptedBackups(&ids); - ASSERT_EQ(ids.size(), 0); - // fprintf(stderr, "Finished read thread\n"); + // (Ok now) Restore one of the backups, or "latest" + if (latest) { + ASSERT_OK(my_be->RestoreDBFromLatestBackup(restore_db_dir, + restore_db_dir)); + } else { + ASSERT_OK(my_be->VerifyBackup(to_restore, true)); + ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir, + restore_db_dir)); + } - if (reopen) { - delete my_be; - } - }); + // Re-verify metadata (we don't receive updates from concurrently + // creating a new backup) + my_be->GetBackupInfo(&infos); + ASSERT_EQ(infos.size(), count); + my_be->GetCorruptedBackups(&ids); + ASSERT_EQ(ids.size(), 0); + // fprintf(stderr, "Finished read thread\n"); + + if (reopen) { + delete my_be; + } + }); } BackupEngine* alt_be; @@ -3229,6 +3268,11 @@ TEST_F(BackupEngineTest, Concurrency) { } delete alt_be; + + for (auto& t : restore_verify_threads) { + t.join(); + } + CloseDBAndBackupEngine(); }