diff --git a/HISTORY.md b/HISTORY.md index eec1ad616..76a99ea35 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes * If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file. +* Restoring backups with BackupEngine is now a logically atomic operation, so that if a restore operation is interrupted, DB::Open on it will fail. Using BackupEngineOptions::sync (default) ensures atomicity even in case of power loss or OS crash. * Fixed a race related to the destruction of `ColumnFamilyData` objects. The earlier logic unlocked the DB mutex before destroying the thread-local `SuperVersion` pointers, which could result in a process crash if another thread managed to get a reference to the `ColumnFamilyData` object. * Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file. * Fixed an issue where `OnFlushCompleted` was not called for atomic flush. @@ -17,6 +18,7 @@ ### Behavior Changes * `StringAppendOperator` additionally accepts a string as the delimiter. +* BackupEngineOptions::sync (default true) now applies to restoring backups in addition to creating backups. This could slow down restores, but ensures they are fully persisted before returning OK. (Consider increasing max_background_operations to improve performance.) ## 6.23.0 (2021-07-16) ### Behavior Changes @@ -26,7 +28,7 @@ * `GetLiveFilesMetaData()` now populates the `temperature`, `oldest_ancester_time`, and `file_creation_time` fields of its `LiveFileMetaData` results when the information is available. Previously these fields always contained zero indicating unknown. * Fix mismatches of OnCompaction{Begin,Completed} in case of DisableManualCompaction(). * Fix continuous logging of an existing background error on every user write -* Fix a bug that `Get()` return Status::OK() and an empty value for non-existent key when `read_options.read_tier = kBlockCacheTier`. +* Fix a bug that `Get()` return Status::OK() and an empty value for non-existent key when `read_options.read_tier = kBlockCacheTier`. * Fix a bug that stat in `get_context` didn't accumulate to statistics when query is failed. * Fixed handling of DBOptions::wal_dir with LoadLatestOptions() or ldb --try_load_options on a copied or moved DB. Previously, when the WAL directory is same as DB directory (default), a copied or moved DB would reference the old path of the DB as the WAL directory, potentially corrupting both copies. Under this change, the wal_dir from DB::GetOptions() or LoadLatestOptions() may now be empty, indicating that the current DB directory is used for WALs. This is also a subtle API change. diff --git a/include/rocksdb/utilities/backup_engine.h b/include/rocksdb/utilities/backup_engine.h index d6a7764e6..8b9426e83 100644 --- a/include/rocksdb/utilities/backup_engine.h +++ b/include/rocksdb/utilities/backup_engine.h @@ -56,10 +56,11 @@ struct BackupEngineOptions { // Default: nullptr Logger* info_log; - // If sync == true, we can guarantee you'll get consistent backup even - // on a machine crash/reboot. Backup process is slower with sync enabled. - // If sync == false, we don't guarantee anything on machine reboot. However, - // chances are some of the backups are consistent. + // If sync == true, we can guarantee you'll get consistent backup and + // restore even on a machine crash/reboot. Backup and restore processes are + // slower with sync enabled. If sync == false, we can only guarantee that + // other previously synced backups and restores are not modified while + // creating a new one. // Default: true bool sync; diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index b98c56bc6..294fde6ca 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -1707,6 +1707,11 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, } Status s; std::vector restore_items_to_finish; + std::string temporary_current_file; + std::string final_current_file; + std::unique_ptr db_dir_for_fsync; + std::unique_ptr wal_dir_for_fsync; + for (const auto& file_info : backup->GetFiles()) { const std::string& file = file_info->filename; // 1. get DB filename @@ -1722,13 +1727,36 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, } // 3. Construct the final path // kWalFile lives in wal_dir and all the rest live in db_dir - dst = ((type == kWalFile) ? wal_dir : db_dir) + "/" + dst; + if (type == kWalFile) { + dst = wal_dir + "/" + dst; + if (options_.sync && !wal_dir_for_fsync) { + s = db_env_->NewDirectory(wal_dir, &wal_dir_for_fsync); + if (!s.ok()) { + return s; + } + } + } else { + dst = db_dir + "/" + dst; + if (options_.sync && !db_dir_for_fsync) { + s = db_env_->NewDirectory(db_dir, &db_dir_for_fsync); + if (!s.ok()) { + return s; + } + } + } + // For atomicity, initially restore CURRENT file to a temporary name. + // This is useful even without options_.sync e.g. in case the restore + // process is interrupted. + if (type == kCurrentFile) { + final_current_file = dst; + dst = temporary_current_file = dst + ".tmp"; + } ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); CopyOrCreateWorkItem copy_or_create_work_item( GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_, - EnvOptions() /* src_env_options */, false, rate_limiter, + EnvOptions() /* src_env_options */, options_.sync, rate_limiter, 0 /* size_limit */); RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), file, dst, @@ -1757,6 +1785,31 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, } } + // When enabled, the first Fsync is to ensure all files are fully persisted + // before renaming CURRENT.tmp + if (s.ok() && db_dir_for_fsync) { + ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n"); + s = db_dir_for_fsync->Fsync(); + } + + if (s.ok() && wal_dir_for_fsync) { + s = wal_dir_for_fsync->Fsync(); + } + + if (s.ok() && !temporary_current_file.empty()) { + ROCKS_LOG_INFO(options_.info_log, "Restore: atomic rename CURRENT.tmp\n"); + assert(!final_current_file.empty()); + s = db_env_->RenameFile(temporary_current_file, final_current_file); + } + + if (s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) { + // Second Fsync is to ensure the final atomic rename of DB restore is + // fully persisted even if power goes out right after restore operation + // returns success + assert(db_dir_for_fsync); + s = db_dir_for_fsync->Fsync(); + } + ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str()); return s; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index f537d134c..2b7c5c8a3 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -642,9 +642,22 @@ class BackupEngineTest : public testing::Test { CreateLoggerFromOptions(dbname_, logger_options, &logger_) .PermitUncheckedError(); + // The sync option is not easily testable in unit tests, but should be + // smoke tested across all the other backup tests. However, it is + // certainly not worth doubling the runtime of backup tests for it. + // Thus, we can enable sync for one of our alternate testing + // configurations. + constexpr bool kUseSync = +#ifdef ROCKSDB_MODIFY_NPHASH + true; +#else + false; +#endif // ROCKSDB_MODIFY_NPHASH + // set up backup db options backupable_options_.reset(new BackupableDBOptions( - backupdir_, test_backup_env_.get(), true, logger_.get(), true)); + backupdir_, test_backup_env_.get(), /*share_table_files*/ true, + logger_.get(), kUseSync)); // most tests will use multi-threaded backups backupable_options_->max_background_operations = 7; @@ -3122,82 +3135,108 @@ TEST_F(BackupEngineTest, Concurrency) { Options db_opts = options_; db_opts.wal_dir = ""; + db_opts.create_if_missing = false; BackupableDBOptions be_opts = *backupable_options_; be_opts.destroy_old_data = false; std::mt19937 rng{std::random_device()()}; std::array read_threads; + std::array restore_verify_threads; for (uint32_t i = 0; i < read_threads.size(); ++i) { uint32_t sleep_micros = rng() % 100000; - read_threads[i] = std::thread([this, i, sleep_micros, &db_opts, &be_opts] { - test_db_env_->SleepForMicroseconds(sleep_micros); + read_threads[i] = std::thread( + [this, i, sleep_micros, &db_opts, &be_opts, &restore_verify_threads] { + test_db_env_->SleepForMicroseconds(sleep_micros); - // Whether to also re-open the BackupEngine, potentially seeing - // additional backups - bool reopen = i == 3; - // Whether we are going to restore "latest" - bool latest = i > 1; + // Whether to also re-open the BackupEngine, potentially seeing + // additional backups + bool reopen = i == 3; + // Whether we are going to restore "latest" + bool latest = i > 1; - BackupEngine* my_be; - if (reopen) { - ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be)); - } else { - my_be = backup_engine_.get(); - } + BackupEngine* my_be; + if (reopen) { + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be)); + } else { + my_be = backup_engine_.get(); + } - // Verify metadata (we don't receive updates from concurrently - // creating a new backup) - std::vector infos; - my_be->GetBackupInfo(&infos); - const uint32_t count = static_cast(infos.size()); - infos.clear(); - if (reopen) { - ASSERT_GE(count, 2U); - ASSERT_LE(count, 4U); - fprintf(stderr, "Reopen saw %u backups\n", count); - } else { - ASSERT_EQ(count, 2U); - } - std::vector ids; - my_be->GetCorruptedBackups(&ids); - ASSERT_EQ(ids.size(), 0U); + // Verify metadata (we don't receive updates from concurrently + // creating a new backup) + std::vector infos; + my_be->GetBackupInfo(&infos); + const uint32_t count = static_cast(infos.size()); + infos.clear(); + if (reopen) { + ASSERT_GE(count, 2U); + ASSERT_LE(count, 4U); + fprintf(stderr, "Reopen saw %u backups\n", count); + } else { + ASSERT_EQ(count, 2U); + } + std::vector ids; + my_be->GetCorruptedBackups(&ids); + ASSERT_EQ(ids.size(), 0U); - // Restore one of the backups, or "latest" - std::string restore_db_dir = dbname_ + "/restore" + ToString(i); - BackupID to_restore; - if (latest) { - to_restore = count; - ASSERT_OK( - my_be->RestoreDBFromLatestBackup(restore_db_dir, restore_db_dir)); - } else { - to_restore = i + 1; - ASSERT_OK(my_be->VerifyBackup(to_restore, true)); - ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir, - restore_db_dir)); - } + // (Eventually, see below) Restore one of the backups, or "latest" + std::string restore_db_dir = dbname_ + "/restore" + ToString(i); + DestroyDir(test_db_env_.get(), restore_db_dir).PermitUncheckedError(); + BackupID to_restore; + if (latest) { + to_restore = count; + } else { + to_restore = i + 1; + } - // Open restored DB to verify its contents - DB* restored; - ASSERT_OK(DB::Open(db_opts, restore_db_dir, &restored)); - int factor = std::min(static_cast(to_restore), max_factor); - AssertExists(restored, 0, factor * keys_iteration); - AssertEmpty(restored, factor * keys_iteration, - (factor + 1) * keys_iteration); - delete restored; + // Open restored DB to verify its contents, but test atomic restore + // by doing it async and ensuring we either get OK or InvalidArgument + restore_verify_threads[i] = + std::thread([this, &db_opts, restore_db_dir, to_restore] { + DB* restored; + Status s; + for (;;) { + s = DB::Open(db_opts, restore_db_dir, &restored); + if (s.IsInvalidArgument()) { + // Restore hasn't finished + test_db_env_->SleepForMicroseconds(1000); + continue; + } else { + // We should only get InvalidArgument if restore is + // incomplete, or OK if complete + ASSERT_OK(s); + break; + } + } + int factor = std::min(static_cast(to_restore), max_factor); + AssertExists(restored, 0, factor * keys_iteration); + AssertEmpty(restored, factor * keys_iteration, + (factor + 1) * keys_iteration); + delete restored; + }); - // Re-verify metadata (we don't receive updates from concurrently - // creating a new backup) - my_be->GetBackupInfo(&infos); - ASSERT_EQ(infos.size(), count); - my_be->GetCorruptedBackups(&ids); - ASSERT_EQ(ids.size(), 0); - // fprintf(stderr, "Finished read thread\n"); + // (Ok now) Restore one of the backups, or "latest" + if (latest) { + ASSERT_OK(my_be->RestoreDBFromLatestBackup(restore_db_dir, + restore_db_dir)); + } else { + ASSERT_OK(my_be->VerifyBackup(to_restore, true)); + ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir, + restore_db_dir)); + } - if (reopen) { - delete my_be; - } - }); + // Re-verify metadata (we don't receive updates from concurrently + // creating a new backup) + my_be->GetBackupInfo(&infos); + ASSERT_EQ(infos.size(), count); + my_be->GetCorruptedBackups(&ids); + ASSERT_EQ(ids.size(), 0); + // fprintf(stderr, "Finished read thread\n"); + + if (reopen) { + delete my_be; + } + }); } BackupEngine* alt_be; @@ -3229,6 +3268,11 @@ TEST_F(BackupEngineTest, Concurrency) { } delete alt_be; + + for (auto& t : restore_verify_threads) { + t.join(); + } + CloseDBAndBackupEngine(); }