From ee50b8d499189e936280f9306b2cf84216659203 Mon Sep 17 00:00:00 2001 From: Cheng Chang Date: Sat, 28 Mar 2020 19:05:54 -0700 Subject: [PATCH] Be able to decrease background thread's CPU priority when creating database backup (#6602) Summary: When creating a database backup, the background threads will not only consume IO resources by copying files, but also consuming CPU such as by computing checksums. During peak times, the CPU consumption by the background threads might affect online queries. This PR makes it possible to decrease CPU priority of these threads when creating a new backup. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6602 Test Plan: make check Reviewed By: siying, zhichao-cao Differential Revision: D20683216 Pulled By: cheng-chang fbshipit-source-id: 9978b9ed9488e8ce135e90ca083e5b4b7221fd84 --- HISTORY.md | 1 + db/db_impl/db_impl.cc | 2 +- db/db_impl/db_impl_compaction_flush.cc | 13 +-- db/error_handler.cc | 2 +- db/error_handler_fs_test.cc | 10 +- db/version_set_test.cc | 6 +- file/filename.cc | 4 +- file/filename.h | 4 +- include/rocksdb/options.h | 7 ++ include/rocksdb/utilities/backupable_db.h | 103 ++++++++++++++++--- port/port_posix.cc | 30 ++++++ port/port_posix.h | 5 + port/win/port_win.cc | 23 +++-- port/win/port_win.h | 4 + table/plain/plain_table_builder.cc | 8 +- util/threadpool_imp.cc | 8 +- utilities/backupable/backupable_db.cc | 111 +++++++++++++-------- utilities/backupable/backupable_db_test.cc | 81 +++++++++++++++ 18 files changed, 324 insertions(+), 98 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 5f00f58e4..85dce2cc6 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Fix spelling so that API now has correctly spelled transaction state name `COMMITTED`, while the old misspelled `COMMITED` is still available as an alias. * Updated default format_version in BlockBasedTableOptions from 2 to 4. SST files generated with the new default can be read by RocksDB versions 5.16 and newer, and use more efficient encoding of keys in index blocks. * `Cache::Insert` now expects clients to pass in function objects implementing the `Cache::Deleter` interface as deleters instead of plain function pointers. +* A new parameter `CreateBackupOptions` is added to both `BackupEngine::CreateNewBackup` and `BackupEngine::CreateNewBackupWithMetadata`, you can decrease CPU priority of `BackupEngine`'s background threads by setting `decrease_background_thread_cpu_priority` and `background_thread_cpu_priority` in `CreateBackupOptions`. ### Bug Fixes * Fix a bug where range tombstone blocks in ingested files were cached incorrectly during ingestion. If range tombstones were read from those incorrectly cached blocks, the keys they covered would be exposed. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index ab277cdbf..e65ccff03 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -312,7 +312,7 @@ Status DBImpl::ResumeImpl() { } // Make sure the IO Status stored in version set is set to OK. - if(s.ok()) { + if (s.ok()) { versions_->SetIOStatusOK(); } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 8cb0febe9..8d5d8e268 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -208,7 +208,8 @@ Status DBImpl::FlushMemTableToOutputFile( } if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { - if (!io_s.ok()&& !io_s.IsShutdownInProgress() && !io_s.IsColumnFamilyDropped()) { + if (!io_s.ok() && !io_s.IsShutdownInProgress() && + !io_s.IsColumnFamilyDropped()) { error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); } else { Status new_bg_error = s; @@ -2876,11 +2877,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", status.ToString().c_str()); - if (!io_s.ok()) { - error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction); - } else { - error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); - } + if (!io_s.ok()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction); + } else { + error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); + } if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) { // Put this cfd back in the compaction queue so we can retry after some // time diff --git a/db/error_handler.cc b/db/error_handler.cc index cbee2b811..3c99dce99 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -256,7 +256,7 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, Status bg_err(new_bg_io_err, Status::Severity::kUnrecoverableError); bg_error_ = bg_err; EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, - db_mutex_, &auto_recovery); + db_mutex_, &auto_recovery); return bg_error_; } else if (bg_io_err.GetRetryable()) { // Second, check if the error is a retryable IO error or not. if it is diff --git a/db/error_handler_fs_test.cc b/db/error_handler_fs_test.cc index b32fec84b..623c4c85b 100644 --- a/db/error_handler_fs_test.cc +++ b/db/error_handler_fs_test.cc @@ -183,7 +183,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteError) { TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) { std::shared_ptr fault_fs( - new FaultInjectionTestFS(FileSystem::Default())); + new FaultInjectionTestFS(FileSystem::Default())); std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); std::shared_ptr listener( new ErrorHandlerFSListener()); @@ -289,7 +289,7 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteError) { TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) { std::shared_ptr fault_fs( - new FaultInjectionTestFS(FileSystem::Default())); + new FaultInjectionTestFS(FileSystem::Default())); std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); std::shared_ptr listener( new ErrorHandlerFSListener()); @@ -457,7 +457,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) { TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) { std::shared_ptr fault_fs( - new FaultInjectionTestFS(FileSystem::Default())); + new FaultInjectionTestFS(FileSystem::Default())); std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); std::shared_ptr listener( new ErrorHandlerFSListener()); @@ -556,7 +556,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteError) { TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) { std::shared_ptr fault_fs( - new FaultInjectionTestFS(FileSystem::Default())); + new FaultInjectionTestFS(FileSystem::Default())); std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); std::shared_ptr listener( new ErrorHandlerFSListener()); @@ -778,7 +778,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteError) { TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) { std::shared_ptr fault_fs( - new FaultInjectionTestFS(FileSystem::Default())); + new FaultInjectionTestFS(FileSystem::Default())); std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); std::shared_ptr listener( new ErrorHandlerFSListener()); diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 85cd29275..9f1dfc215 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1378,7 +1378,8 @@ class EmptyDefaultCfNewManifest : public VersionSetTestBase, TEST_F(EmptyDefaultCfNewManifest, Recover) { PrepareManifest(nullptr, nullptr, &log_writer_); log_writer_.reset(); - Status s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); + Status s = + SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); ASSERT_OK(s); std::string manifest_path; VerifyManifest(&manifest_path); @@ -1440,7 +1441,8 @@ TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) { db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); PrepareManifest(nullptr, nullptr, &log_writer_); log_writer_.reset(); - Status s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); + Status s = + SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); ASSERT_OK(s); std::string manifest_path; diff --git a/file/filename.cc b/file/filename.cc index 2ff0e30c6..04783b899 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -369,8 +369,8 @@ bool ParseFileName(const std::string& fname, uint64_t* number, } IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname, - uint64_t descriptor_number, - FSDirectory* directory_to_fsync) { + uint64_t descriptor_number, + FSDirectory* directory_to_fsync) { // Remove leading "dbname/" and add newline to manifest file name std::string manifest = DescriptorFileName(dbname, descriptor_number); Slice contents = manifest; diff --git a/file/filename.h b/file/filename.h index 7dfe81d37..59b55e556 100644 --- a/file/filename.h +++ b/file/filename.h @@ -170,8 +170,8 @@ extern bool ParseFileName(const std::string& filename, uint64_t* number, // Make the CURRENT file point to the descriptor file with the // specified number. extern IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname, - uint64_t descriptor_number, - FSDirectory* directory_to_fsync); + uint64_t descriptor_number, + FSDirectory* directory_to_fsync); // Make the IDENTITY file for the db extern Status SetIdentityFile(Env* env, const std::string& dbname, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 87a0b9442..aa1245b9f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -51,6 +51,13 @@ class InternalKeyComparator; class WalFilter; class FileSystem; +enum class CpuPriority { + kIdle = 0, + kLow = 1, + kNormal = 2, + kHigh = 3, +}; + // DB contents are stored in a set of blocks, each of which holds a // sequence of key,value pairs. Each block may be compressed before // being stored in a file. The following enum describes which diff --git a/include/rocksdb/utilities/backupable_db.h b/include/rocksdb/utilities/backupable_db.h index f281ed133..eea9ea2de 100644 --- a/include/rocksdb/utilities/backupable_db.h +++ b/include/rocksdb/utilities/backupable_db.h @@ -19,6 +19,7 @@ #include "rocksdb/utilities/stackable_db.h" #include "rocksdb/env.h" +#include "rocksdb/options.h" #include "rocksdb/status.h" namespace ROCKSDB_NAMESPACE { @@ -142,6 +143,24 @@ struct BackupableDBOptions { } }; +struct CreateBackupOptions { + // Flush will always trigger if 2PC is enabled. + // If write-ahead logs are disabled, set flush_before_backup=true to + // avoid losing unflushed key/value pairs from the memtable. + bool flush_before_backup = false; + + // Callback for reporting progress. + std::function progress_callback = []() {}; + + // If false, background_thread_cpu_priority is ignored. + // Otherwise, the cpu priority can be decreased, + // if you try to increase the priority, the priority will not change. + // The initial priority of the threads is CpuPriority::kNormal, + // so you can decrease to priorities lower than kNormal. + bool decrease_background_thread_cpu_priority = false; + CpuPriority background_thread_cpu_priority = CpuPriority::kNormal; +}; + struct RestoreOptions { // If true, restore won't overwrite the existing log files in wal_dir. It will // also move all log files from archive directory to wal_dir. Use this option @@ -208,8 +227,13 @@ class BackupEngineReadOnly { public: virtual ~BackupEngineReadOnly() {} - static Status Open(Env* db_env, const BackupableDBOptions& options, + static Status Open(const BackupableDBOptions& options, Env* db_env, BackupEngineReadOnly** backup_engine_ptr); + // keep for backward compatibility. + static Status Open(Env* db_env, const BackupableDBOptions& options, + BackupEngineReadOnly** backup_engine_ptr) { + return BackupEngineReadOnly::Open(options, db_env, backup_engine_ptr); + } // Returns info about backups in backup_info // You can GetBackupInfo safely, even with other BackupEngine performing @@ -225,14 +249,29 @@ class BackupEngineReadOnly { // responsibility to synchronize the operation, i.e. don't delete the backup // when you're restoring from it // See also the corresponding doc in BackupEngine + virtual Status RestoreDBFromBackup(const RestoreOptions& options, + BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) = 0; + + // keep for backward compatibility. virtual Status RestoreDBFromBackup( BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& restore_options = RestoreOptions()) = 0; + const RestoreOptions& options = RestoreOptions()) { + return RestoreDBFromBackup(options, backup_id, db_dir, wal_dir); + } // See the corresponding doc in BackupEngine + virtual Status RestoreDBFromLatestBackup(const RestoreOptions& options, + const std::string& db_dir, + const std::string& wal_dir) = 0; + + // keep for backward compatibility. virtual Status RestoreDBFromLatestBackup( const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& restore_options = RestoreOptions()) = 0; + const RestoreOptions& options = RestoreOptions()) { + return RestoreDBFromLatestBackup(options, db_dir, wal_dir); + } // checks that each file exists and that the size of the file matches our // expectations. it does not check file checksum. @@ -253,27 +292,44 @@ class BackupEngine { // BackupableDBOptions have to be the same as the ones used in previous // BackupEngines for the same backup directory. - static Status Open(Env* db_env, const BackupableDBOptions& options, + static Status Open(const BackupableDBOptions& options, Env* db_env, BackupEngine** backup_engine_ptr); - // same as CreateNewBackup, but stores extra application metadata - // Flush will always trigger if 2PC is enabled. - // If write-ahead logs are disabled, set flush_before_backup=true to - // avoid losing unflushed key/value pairs from the memtable. + // keep for backward compatibility. + static Status Open(Env* db_env, const BackupableDBOptions& options, + BackupEngine** backup_engine_ptr) { + return BackupEngine::Open(options, db_env, backup_engine_ptr); + } + + // same as CreateNewBackup, but stores extra application metadata. + virtual Status CreateNewBackupWithMetadata( + const CreateBackupOptions& options, DB* db, + const std::string& app_metadata) = 0; + + // keep here for backward compatibility. virtual Status CreateNewBackupWithMetadata( DB* db, const std::string& app_metadata, bool flush_before_backup = false, - std::function progress_callback = []() {}) = 0; + std::function progress_callback = []() {}) { + CreateBackupOptions options; + options.flush_before_backup = flush_before_backup; + options.progress_callback = progress_callback; + return CreateNewBackupWithMetadata(options, db, app_metadata); + } // Captures the state of the database in the latest backup // NOT a thread safe call - // Flush will always trigger if 2PC is enabled. - // If write-ahead logs are disabled, set flush_before_backup=true to - // avoid losing unflushed key/value pairs from the memtable. + virtual Status CreateNewBackup(const CreateBackupOptions& options, DB* db) { + return CreateNewBackupWithMetadata(options, db, ""); + } + + // keep here for backward compatibility. virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false, std::function progress_callback = []() {}) { - return CreateNewBackupWithMetadata(db, "", flush_before_backup, - progress_callback); + CreateBackupOptions options; + options.flush_before_backup = flush_before_backup; + options.progress_callback = progress_callback; + return CreateNewBackup(options, db); } // Deletes old backups, keeping latest num_backups_to_keep alive. @@ -313,14 +369,29 @@ class BackupEngine { // database will diverge from backups 4 and 5 and the new backup will fail. // If you want to create new backup, you will first have to delete backups 4 // and 5. + virtual Status RestoreDBFromBackup(const RestoreOptions& options, + BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) = 0; + + // keep for backward compatibility. virtual Status RestoreDBFromBackup( BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& restore_options = RestoreOptions()) = 0; + const RestoreOptions& options = RestoreOptions()) { + return RestoreDBFromBackup(options, backup_id, db_dir, wal_dir); + } // restore from the latest backup + virtual Status RestoreDBFromLatestBackup(const RestoreOptions& options, + const std::string& db_dir, + const std::string& wal_dir) = 0; + + // keep for backward compatibility. virtual Status RestoreDBFromLatestBackup( const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& restore_options = RestoreOptions()) = 0; + const RestoreOptions& options = RestoreOptions()) { + return RestoreDBFromLatestBackup(options, db_dir, wal_dir); + } // checks that each file exists and that the size of the file matches our // expectations. it does not check file checksum. diff --git a/port/port_posix.cc b/port/port_posix.cc index e3ea5ca69..4154b4fa9 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -230,5 +231,34 @@ static size_t GetPageSize() { const size_t kPageSize = GetPageSize(); +void SetCpuPriority(ThreadId id, CpuPriority priority) { +#ifdef OS_LINUX + sched_param param; + param.sched_priority = 0; + switch (priority) { + case CpuPriority::kHigh: + sched_setscheduler(id, SCHED_OTHER, ¶m); + setpriority(PRIO_PROCESS, id, -20); + break; + case CpuPriority::kNormal: + sched_setscheduler(id, SCHED_OTHER, ¶m); + setpriority(PRIO_PROCESS, id, 0); + break; + case CpuPriority::kLow: + sched_setscheduler(id, SCHED_OTHER, ¶m); + setpriority(PRIO_PROCESS, id, 19); + break; + case CpuPriority::kIdle: + sched_setscheduler(id, SCHED_IDLE, ¶m); + break; + default: + assert(false); + } +#else + (void)id; + (void)priority; +#endif +} + } // namespace port } // namespace ROCKSDB_NAMESPACE diff --git a/port/port_posix.h b/port/port_posix.h index 0c9c69833..065572a8a 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -13,6 +13,7 @@ #include +#include "rocksdb/options.h" #include "rocksdb/rocksdb_namespace.h" // size_t printf formatting named in the manner of C99 standard formatting @@ -214,5 +215,9 @@ extern int GetMaxOpenFiles(); extern const size_t kPageSize; +using ThreadId = pid_t; + +extern void SetCpuPriority(ThreadId id, CpuPriority priority); + } // namespace port } // namespace ROCKSDB_NAMESPACE diff --git a/port/win/port_win.cc b/port/win/port_win.cc index 6e43b1b58..2d99a7a9b 100644 --- a/port/win/port_win.cc +++ b/port/win/port_win.cc @@ -159,20 +159,19 @@ DIR* opendir(const char* name) { std::unique_ptr dir(new DIR); - dir->handle_ = RX_FindFirstFileEx(RX_FN(pattern).c_str(), - FindExInfoBasic, // Do not want alternative name - &dir->data_, - FindExSearchNameMatch, - NULL, // lpSearchFilter - 0); + dir->handle_ = + RX_FindFirstFileEx(RX_FN(pattern).c_str(), + FindExInfoBasic, // Do not want alternative name + &dir->data_, FindExSearchNameMatch, + NULL, // lpSearchFilter + 0); if (dir->handle_ == INVALID_HANDLE_VALUE) { return nullptr; } RX_FILESTRING x(dir->data_.cFileName, RX_FNLEN(dir->data_.cFileName)); - strcpy_s(dir->entry_.d_name, sizeof(dir->entry_.d_name), - FN_TO_RX(x).c_str()); + strcpy_s(dir->entry_.d_name, sizeof(dir->entry_.d_name), FN_TO_RX(x).c_str()); return dir.release(); } @@ -195,7 +194,7 @@ struct dirent* readdir(DIR* dirp) { } RX_FILESTRING x(dirp->data_.cFileName, RX_FNLEN(dirp->data_.cFileName)); - strcpy_s(dirp->entry_.d_name, sizeof(dirp->entry_.d_name), + strcpy_s(dirp->entry_.d_name, sizeof(dirp->entry_.d_name), FN_TO_RX(x).c_str()); return &dirp->entry_; @@ -265,5 +264,11 @@ int GetMaxOpenFiles() { return -1; } // Assume 4KB page size const size_t kPageSize = 4U * 1024U; +void SetCpuPriority(ThreadId id, CpuPriority priority) { + // Not supported + (void)id; + (void)priority; +} + } // namespace port } // namespace ROCKSDB_NAMESPACE diff --git a/port/win/port_win.h b/port/win/port_win.h index 76fc8cf39..abe669be6 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -336,6 +336,10 @@ extern int GetMaxOpenFiles(); std::string utf16_to_utf8(const std::wstring& utf16); std::wstring utf8_to_utf16(const std::string& utf8); +using ThreadId = int; + +extern void SetCpuPriority(ThreadId id, CpuPriority priority); + } // namespace port diff --git a/table/plain/plain_table_builder.cc b/table/plain/plain_table_builder.cc index d2d6babf7..bc4a04d74 100644 --- a/table/plain/plain_table_builder.cc +++ b/table/plain/plain_table_builder.cc @@ -258,12 +258,8 @@ Status PlainTableBuilder::Finish() { // -- Write property block BlockHandle property_block_handle; - IOStatus s = WriteBlock( - property_block_builder.Finish(), - file_, - &offset_, - &property_block_handle - ); + IOStatus s = WriteBlock(property_block_builder.Finish(), file_, &offset_, + &property_block_handle); if (!s.ok()) { return std::move(s); } diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index 713af02e2..4c5d8594f 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -232,12 +232,8 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { #ifdef OS_LINUX if (decrease_cpu_priority) { - setpriority( - PRIO_PROCESS, - // Current thread. - 0, - // Lowest priority possible. - 19); + // 0 means current thread. + port::SetCpuPriority(0, CpuPriority::kLow); low_cpu_priority = true; } diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 0ca67670b..a2058b75c 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -88,32 +88,41 @@ void BackupableDBOptions::Dump(Logger* logger) const { // -------- BackupEngineImpl class --------- class BackupEngineImpl : public BackupEngine { public: - BackupEngineImpl(Env* db_env, const BackupableDBOptions& options, + BackupEngineImpl(const BackupableDBOptions& options, Env* db_env, bool read_only = false); ~BackupEngineImpl() override; - Status CreateNewBackupWithMetadata(DB* db, const std::string& app_metadata, - bool flush_before_backup = false, - std::function progress_callback = - []() {}) override; + + using BackupEngine::CreateNewBackupWithMetadata; + Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db, + const std::string& app_metadata) override; + Status PurgeOldBackups(uint32_t num_backups_to_keep) override; + Status DeleteBackup(BackupID backup_id) override; + void StopBackup() override { stop_backup_.store(true, std::memory_order_release); } + Status GarbageCollect() override; // The returned BackupInfos are in chronological order, which means the // latest backup comes last. void GetBackupInfo(std::vector* backup_info) override; + void GetCorruptedBackups(std::vector* corrupt_backup_ids) override; - Status RestoreDBFromBackup( - BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& restore_options = RestoreOptions()) override; - Status RestoreDBFromLatestBackup( - const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& restore_options = RestoreOptions()) override { - return RestoreDBFromBackup(latest_valid_backup_id_, db_dir, wal_dir, - restore_options); + + using BackupEngine::RestoreDBFromBackup; + Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) override; + + using BackupEngine::RestoreDBFromLatestBackup; + Status RestoreDBFromLatestBackup(const RestoreOptions& options, + const std::string& db_dir, + const std::string& wal_dir) override { + return RestoreDBFromBackup(options, latest_valid_backup_id_, db_dir, + wal_dir); } Status VerifyBackup(BackupID backup_id) override; @@ -459,6 +468,7 @@ class BackupEngineImpl : public BackupEngine { std::mutex byte_report_mutex_; channel files_to_copy_or_create_; std::vector threads_; + std::atomic threads_cpu_priority_; // Certain operations like PurgeOldBackups and DeleteBackup will trigger // automatic GarbageCollect (true) unless we've already done one in this // session and have not failed to delete backup files since then (false). @@ -512,10 +522,10 @@ class BackupEngineImpl : public BackupEngine { static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB }; -Status BackupEngine::Open(Env* env, const BackupableDBOptions& options, +Status BackupEngine::Open(const BackupableDBOptions& options, Env* env, BackupEngine** backup_engine_ptr) { std::unique_ptr backup_engine( - new BackupEngineImpl(env, options)); + new BackupEngineImpl(options, env)); auto s = backup_engine->Initialize(); if (!s.ok()) { *backup_engine_ptr = nullptr; @@ -525,9 +535,8 @@ Status BackupEngine::Open(Env* env, const BackupableDBOptions& options, return Status::OK(); } -BackupEngineImpl::BackupEngineImpl(Env* db_env, - const BackupableDBOptions& options, - bool read_only) +BackupEngineImpl::BackupEngineImpl(const BackupableDBOptions& options, + Env* db_env, bool read_only) : initialized_(false), latest_backup_id_(0), latest_valid_backup_id_(0), @@ -730,6 +739,8 @@ Status BackupEngineImpl::Initialize() { // set up threads perform copies from files_to_copy_or_create_ in the // background + threads_cpu_priority_ = CpuPriority::kNormal; + threads_.reserve(options_.max_background_operations); for (int t = 0; t < options_.max_background_operations; t++) { threads_.emplace_back([this]() { #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) @@ -737,8 +748,16 @@ Status BackupEngineImpl::Initialize() { pthread_setname_np(pthread_self(), "backup_engine"); #endif #endif + CpuPriority current_priority = CpuPriority::kNormal; CopyOrCreateWorkItem work_item; while (files_to_copy_or_create_.read(work_item)) { + CpuPriority priority = threads_cpu_priority_; + if (current_priority != priority) { + TEST_SYNC_POINT_CALLBACK( + "BackupEngineImpl::Initialize:SetCpuPriority", &priority); + port::SetCpuPriority(0, priority); + current_priority = priority; + } CopyOrCreateResult result; result.status = CopyOrCreateFile( work_item.src_path, work_item.dst_path, work_item.contents, @@ -756,14 +775,20 @@ Status BackupEngineImpl::Initialize() { } Status BackupEngineImpl::CreateNewBackupWithMetadata( - DB* db, const std::string& app_metadata, bool flush_before_backup, - std::function progress_callback) { + const CreateBackupOptions& options, DB* db, + const std::string& app_metadata) { assert(initialized_); assert(!read_only_); if (app_metadata.size() > kMaxAppMetaSize) { return Status::InvalidArgument("App metadata too large"); } + if (options.decrease_background_thread_cpu_priority) { + if (options.background_thread_cpu_priority < threads_cpu_priority_) { + threads_cpu_priority_.store(options.background_thread_cpu_priority); + } + } + BackupID new_backup_id = latest_backup_id_ + 1; assert(backups_.find(new_backup_id) == backups_.end()); @@ -869,7 +894,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( fname, src_env_options, rate_limiter, size_bytes, size_limit_bytes, options_.share_files_with_checksum && type == kTableFile, - progress_callback); + options.progress_callback); } return st; } /* copy_file_cb */, @@ -880,9 +905,9 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( false /* shared */, "" /* src_dir */, fname, EnvOptions() /* src_env_options */, rate_limiter, contents.size(), 0 /* size_limit */, false /* shared_checksum */, - progress_callback, contents); + options.progress_callback, contents); } /* create_file_cb */, - &sequence_number, flush_before_backup ? 0 : port::kMaxUint64); + &sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64); if (s.ok()) { new_backup->SetSequenceNumber(sequence_number); } @@ -1105,9 +1130,10 @@ BackupEngineImpl::GetCorruptedBackups( } } -Status BackupEngineImpl::RestoreDBFromBackup( - BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& restore_options) { +Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, + BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) { assert(initialized_); auto corrupt_itr = corrupt_backups_.find(backup_id); if (corrupt_itr != corrupt_backups_.end()) { @@ -1124,13 +1150,13 @@ Status BackupEngineImpl::RestoreDBFromBackup( ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id); ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n", - static_cast(restore_options.keep_log_files)); + static_cast(options.keep_log_files)); // just in case. Ignore errors db_env_->CreateDirIfMissing(db_dir); db_env_->CreateDirIfMissing(wal_dir); - if (restore_options.keep_log_files) { + if (options.keep_log_files) { // delete files in db_dir, but keep all the log files DeleteChildren(db_dir, 1 << kLogFile); // move all the files from archive dir to wal_dir @@ -1928,8 +1954,8 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) { // -------- BackupEngineReadOnlyImpl --------- class BackupEngineReadOnlyImpl : public BackupEngineReadOnly { public: - BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options) - : backup_engine_(new BackupEngineImpl(db_env, options, true)) {} + BackupEngineReadOnlyImpl(const BackupableDBOptions& options, Env* db_env) + : backup_engine_(new BackupEngineImpl(options, db_env, true)) {} ~BackupEngineReadOnlyImpl() override {} @@ -1943,18 +1969,19 @@ class BackupEngineReadOnlyImpl : public BackupEngineReadOnly { backup_engine_->GetCorruptedBackups(corrupt_backup_ids); } - Status RestoreDBFromBackup( - BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& restore_options = RestoreOptions()) override { - return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir, - restore_options); + using BackupEngineReadOnly::RestoreDBFromBackup; + Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) override { + return backup_engine_->RestoreDBFromBackup(options, backup_id, db_dir, + wal_dir); } - Status RestoreDBFromLatestBackup( - const std::string& db_dir, const std::string& wal_dir, - const RestoreOptions& restore_options = RestoreOptions()) override { - return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir, - restore_options); + using BackupEngineReadOnly::RestoreDBFromLatestBackup; + Status RestoreDBFromLatestBackup(const RestoreOptions& options, + const std::string& db_dir, + const std::string& wal_dir) override { + return backup_engine_->RestoreDBFromLatestBackup(options, db_dir, wal_dir); } Status VerifyBackup(BackupID backup_id) override { @@ -1967,14 +1994,14 @@ class BackupEngineReadOnlyImpl : public BackupEngineReadOnly { std::unique_ptr backup_engine_; }; -Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options, +Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env, BackupEngineReadOnly** backup_engine_ptr) { if (options.destroy_old_data) { return Status::InvalidArgument( "Can't destroy old data with ReadOnly BackupEngine"); } std::unique_ptr backup_engine( - new BackupEngineReadOnlyImpl(env, options)); + new BackupEngineReadOnlyImpl(options, env)); auto s = backup_engine->Initialize(); if (!s.ok()) { *backup_engine_ptr = nullptr; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index efdb34b30..1f60fe79f 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -1842,6 +1842,87 @@ TEST_P(BackupableDBTestWithParam, BackupUsingDirectIO) { } } +TEST_F(BackupableDBTest, BackgroundThreadCpuPriority) { + std::atomic priority(CpuPriority::kNormal); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackupEngineImpl::Initialize:SetCpuPriority", [&](void* new_priority) { + priority.store(*reinterpret_cast(new_priority)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // 1 thread is easier to test, otherwise, we may not be sure which thread + // actually does the work during CreateNewBackup. + backupable_options_->max_background_operations = 1; + OpenDBAndBackupEngine(true); + + { + FillDB(db_.get(), 0, 100); + + // by default, cpu priority is not changed. + CreateBackupOptions options; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get())); + + ASSERT_EQ(priority, CpuPriority::kNormal); + } + + { + FillDB(db_.get(), 101, 200); + + // decrease cpu priority from normal to low. + CreateBackupOptions options; + options.decrease_background_thread_cpu_priority = true; + options.background_thread_cpu_priority = CpuPriority::kLow; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get())); + + ASSERT_EQ(priority, CpuPriority::kLow); + } + + { + FillDB(db_.get(), 201, 300); + + // try to upgrade cpu priority back to normal, + // the priority should still low. + CreateBackupOptions options; + options.decrease_background_thread_cpu_priority = true; + options.background_thread_cpu_priority = CpuPriority::kNormal; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get())); + + ASSERT_EQ(priority, CpuPriority::kLow); + } + + { + FillDB(db_.get(), 301, 400); + + // decrease cpu priority from low to idle. + CreateBackupOptions options; + options.decrease_background_thread_cpu_priority = true; + options.background_thread_cpu_priority = CpuPriority::kIdle; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get())); + + ASSERT_EQ(priority, CpuPriority::kIdle); + } + + { + FillDB(db_.get(), 301, 400); + + // reset priority to later verify that it's not updated by SetCpuPriority. + priority = CpuPriority::kNormal; + + // setting the same cpu priority won't call SetCpuPriority. + CreateBackupOptions options; + options.decrease_background_thread_cpu_priority = true; + options.background_thread_cpu_priority = CpuPriority::kIdle; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get())); + + ASSERT_EQ(priority, CpuPriority::kNormal); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); +} + } // anon namespace } // namespace ROCKSDB_NAMESPACE