Fix a buffer size race condition in BackupEngine (#8732)
Summary: If RateLimiter burst bytes changes during concurrent Restore operations Pull Request resolved: https://github.com/facebook/rocksdb/pull/8732 Test Plan: updated unit test fails with TSAN before change, passes after Reviewed By: ajkr Differential Revision: D30683879 Pulled By: pdillinger fbshipit-source-id: d0ddb3587ade91ee2a4d926b475acf7781b03086
This commit is contained in:
parent
f9ffeaed3f
commit
32752551b9
@ -6,6 +6,7 @@
|
||||
* Fixed a bug that could lead to duplicate DB ID or DB session ID in POSIX environments without /proc/sys/kernel/random/uuid.
|
||||
* Fix a race in DumpStats() with column family destruction due to not taking a Ref on each entry while iterating the ColumnFamilySet.
|
||||
* Fix a race in item ref counting in LRUCache when promoting an item from the SecondaryCache.
|
||||
* Fix a race in BackupEngine if RateLimiter is reconfigured during concurrent Restore operations.
|
||||
|
||||
### New Features
|
||||
* RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.
|
||||
|
@ -785,7 +785,6 @@ class BackupEngineImpl {
|
||||
std::unique_ptr<Directory> private_directory_;
|
||||
|
||||
static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
|
||||
mutable size_t copy_file_buffer_size_;
|
||||
bool read_only_;
|
||||
BackupStatistics backup_statistics_;
|
||||
std::unordered_set<std::string> reported_ignored_fields_;
|
||||
@ -924,7 +923,6 @@ BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options,
|
||||
options_(options),
|
||||
db_env_(db_env),
|
||||
backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
|
||||
copy_file_buffer_size_(kDefaultCopyFileBufferSize),
|
||||
read_only_(read_only) {
|
||||
if (options_.backup_rate_limiter == nullptr &&
|
||||
options_.backup_rate_limit > 0) {
|
||||
@ -1263,11 +1261,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
|
||||
s = backup_env_->CreateDir(private_dir);
|
||||
}
|
||||
|
||||
RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
|
||||
if (rate_limiter) {
|
||||
copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
|
||||
}
|
||||
|
||||
// A set into which we will insert the dst_paths that are calculated for live
|
||||
// files and live WAL files.
|
||||
// This is used to check whether a live files shares a dst_path with another
|
||||
@ -1291,6 +1284,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
|
||||
? true
|
||||
: false;
|
||||
EnvOptions src_raw_env_options(db_options);
|
||||
RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
|
||||
s = checkpoint.CreateCustomCheckpoint(
|
||||
db_options,
|
||||
[&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
|
||||
@ -1700,11 +1694,6 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
|
||||
DeleteChildren(db_dir);
|
||||
}
|
||||
|
||||
RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
|
||||
if (rate_limiter) {
|
||||
copy_file_buffer_size_ =
|
||||
static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
|
||||
}
|
||||
Status s;
|
||||
std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
|
||||
std::string temporary_current_file;
|
||||
@ -1756,8 +1745,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
|
||||
dst.c_str());
|
||||
CopyOrCreateWorkItem copy_or_create_work_item(
|
||||
GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
|
||||
EnvOptions() /* src_env_options */, options_.sync, rate_limiter,
|
||||
0 /* size_limit */);
|
||||
EnvOptions() /* src_env_options */, options_.sync,
|
||||
options_.restore_rate_limiter.get(), 0 /* size_limit */);
|
||||
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
|
||||
copy_or_create_work_item.result.get_future(), file, dst,
|
||||
file_info->checksum_hex);
|
||||
@ -1916,13 +1905,17 @@ Status BackupEngineImpl::CopyOrCreateFile(
|
||||
return s;
|
||||
}
|
||||
|
||||
size_t buf_size =
|
||||
rate_limiter ? static_cast<size_t>(rate_limiter->GetSingleBurstBytes())
|
||||
: kDefaultCopyFileBufferSize;
|
||||
|
||||
std::unique_ptr<WritableFileWriter> dest_writer(
|
||||
new WritableFileWriter(std::move(dst_file), dst, dst_file_options));
|
||||
std::unique_ptr<SequentialFileReader> src_reader;
|
||||
std::unique_ptr<char[]> buf;
|
||||
if (!src.empty()) {
|
||||
src_reader.reset(new SequentialFileReader(std::move(src_file), src));
|
||||
buf.reset(new char[copy_file_buffer_size_]);
|
||||
buf.reset(new char[buf_size]);
|
||||
}
|
||||
|
||||
Slice data;
|
||||
@ -1932,9 +1925,8 @@ Status BackupEngineImpl::CopyOrCreateFile(
|
||||
return Status::Incomplete("Backup stopped");
|
||||
}
|
||||
if (!src.empty()) {
|
||||
size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
|
||||
? copy_file_buffer_size_
|
||||
: static_cast<size_t>(size_limit);
|
||||
size_t buffer_to_read =
|
||||
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
|
||||
s = src_reader->Read(buffer_to_read, &data, buf.get());
|
||||
processed_buffer_size += buffer_to_read;
|
||||
} else {
|
||||
@ -2227,15 +2219,19 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
|
||||
return s;
|
||||
}
|
||||
|
||||
std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
|
||||
RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
|
||||
size_t buf_size =
|
||||
rate_limiter ? static_cast<size_t>(rate_limiter->GetSingleBurstBytes())
|
||||
: kDefaultCopyFileBufferSize;
|
||||
std::unique_ptr<char[]> buf(new char[buf_size]);
|
||||
Slice data;
|
||||
|
||||
do {
|
||||
if (stop_backup_.load(std::memory_order_acquire)) {
|
||||
return Status::Incomplete("Backup stopped");
|
||||
}
|
||||
size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
|
||||
copy_file_buffer_size_ : static_cast<size_t>(size_limit);
|
||||
size_t buffer_to_read =
|
||||
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
|
||||
s = src_reader->Read(buffer_to_read, &data, buf.get());
|
||||
|
||||
if (!s.ok()) {
|
||||
|
@ -3120,6 +3120,13 @@ TEST_F(BackupEngineTest, Concurrency) {
|
||||
//
|
||||
// Because of the challenges of integrating this into db_stress,
|
||||
// this is a non-deterministic mini-stress test here instead.
|
||||
|
||||
// To check for a race condition in handling buffer size based on byte
|
||||
// burst limit, we need a (generous) rate limiter
|
||||
std::shared_ptr<RateLimiter> limiter{NewGenericRateLimiter(1000000000)};
|
||||
backupable_options_->backup_rate_limiter = limiter;
|
||||
backupable_options_->restore_rate_limiter = limiter;
|
||||
|
||||
OpenDBAndBackupEngine(true, false, kShareWithChecksum);
|
||||
|
||||
static constexpr int keys_iteration = 5000;
|
||||
@ -3145,8 +3152,9 @@ TEST_F(BackupEngineTest, Concurrency) {
|
||||
std::array<std::thread, 4> restore_verify_threads;
|
||||
for (uint32_t i = 0; i < read_threads.size(); ++i) {
|
||||
uint32_t sleep_micros = rng() % 100000;
|
||||
read_threads[i] = std::thread(
|
||||
[this, i, sleep_micros, &db_opts, &be_opts, &restore_verify_threads] {
|
||||
read_threads[i] =
|
||||
std::thread([this, i, sleep_micros, &db_opts, &be_opts,
|
||||
&restore_verify_threads, &limiter] {
|
||||
test_db_env_->SleepForMicroseconds(sleep_micros);
|
||||
|
||||
// Whether to also re-open the BackupEngine, potentially seeing
|
||||
@ -3225,6 +3233,14 @@ TEST_F(BackupEngineTest, Concurrency) {
|
||||
restore_db_dir));
|
||||
}
|
||||
|
||||
// Test for race condition in reconfiguring limiter
|
||||
// FIXME: this could set to a different value in all threads, except
|
||||
// GenericRateLimiter::SetBytesPerSecond has a write-write race
|
||||
// reported by TSAN
|
||||
if (i == 0) {
|
||||
limiter->SetBytesPerSecond(2000000000);
|
||||
}
|
||||
|
||||
// Re-verify metadata (we don't receive updates from concurrently
|
||||
// creating a new backup)
|
||||
my_be->GetBackupInfo(&infos);
|
||||
|
Loading…
x
Reference in New Issue
Block a user