Bytes read/written stats for CreateNewBackup*() (#8819)

Summary:
Gets `Statistics` from the options associated with the `DB` undergoing backup, and populates new ticker stats with the thread-local `IOContext` read/write counters for the threads doing backup work.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8819

Reviewed By: pdillinger

Differential Revision: D30779238

Pulled By: ajkr

fbshipit-source-id: 75ccafc355f90906df5cf80367f7245b985772d8
This commit is contained in:
Andrew Kryczka 2021-09-07 18:23:58 -07:00 committed by Facebook GitHub Bot
parent 6cca9fab7c
commit 9308ff366c
7 changed files with 167 additions and 35 deletions

View File

@ -11,6 +11,7 @@
### New Features
* RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.
* Added a ticker statistic, "rocksdb.verify_checksum.read.bytes", reporting how many bytes were read from file to serve `VerifyChecksum()` and `VerifyFileChecksums()` queries.
* Added ticker statistics, "rocksdb.backup.read.bytes" and "rocksdb.backup.write.bytes", reporting how many bytes were read and written during backup.
### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API

View File

@ -7,6 +7,8 @@
#include <cstring>
#include "monitoring/iostats_context_imp.h"
namespace ROCKSDB_NAMESPACE {
Status LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
@ -49,6 +51,7 @@ bool LineFileReader::ReadLine(std::string* out) {
out->append(buf_begin_, buf_end_ - buf_begin_);
Slice result;
status_ = sfr_.Read(buf_.size(), &result, buf_.data());
IOSTATS_ADD(bytes_read, result.size());
if (!status_.ok()) {
status_.MustCheck();
return false;

View File

@ -408,6 +408,10 @@ enum Tickers : uint32_t {
// Bytes read by `VerifyChecksum()` and `VerifyFileChecksums()` APIs.
VERIFY_CHECKSUM_READ_BYTES,
// Bytes read/written while creating backups
BACKUP_READ_BYTES,
BACKUP_WRITE_BYTES,
TICKER_ENUM_MAX
};

View File

@ -5004,6 +5004,10 @@ class TickerTypeJni {
return -0x1E;
case ROCKSDB_NAMESPACE::Tickers::VERIFY_CHECKSUM_READ_BYTES:
return -0x1F;
case ROCKSDB_NAMESPACE::Tickers::BACKUP_READ_BYTES:
return -0x20;
case ROCKSDB_NAMESPACE::Tickers::BACKUP_WRITE_BYTES:
return -0x21;
case ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX:
// 0x5F was the max value in the initial copy of tickers to Java.
// Since these values are exposed directly to Java clients, we keep
@ -5345,6 +5349,10 @@ class TickerTypeJni {
return ROCKSDB_NAMESPACE::Tickers::SECONDARY_CACHE_HITS;
case -0x1F:
return ROCKSDB_NAMESPACE::Tickers::VERIFY_CHECKSUM_READ_BYTES;
case -0x20:
return ROCKSDB_NAMESPACE::Tickers::BACKUP_READ_BYTES;
case -0x21:
return ROCKSDB_NAMESPACE::Tickers::BACKUP_WRITE_BYTES;
case 0x5F:
// 0x5F was the max value in the initial copy of tickers to Java.
// Since these values are exposed directly to Java clients, we keep

View File

@ -207,6 +207,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
"rocksdb.memtable.garbage.bytes.at.flush"},
{SECONDARY_CACHE_HITS, "rocksdb.secondary.cache.hits"},
{VERIFY_CHECKSUM_READ_BYTES, "rocksdb.verify_checksum.read.bytes"},
{BACKUP_READ_BYTES, "rocksdb.backup.read.bytes"},
{BACKUP_WRITE_BYTES, "rocksdb.backup.write.bytes"},
};
const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {

View File

@ -33,6 +33,7 @@
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/transaction_log.h"
@ -583,6 +584,7 @@ class BackupEngineImpl {
bool sync;
RateLimiter* rate_limiter;
uint64_t size_limit;
Statistics* stats;
std::promise<CopyOrCreateResult> result;
std::function<void()> progress_callback;
std::string src_checksum_func_name;
@ -600,6 +602,7 @@ class BackupEngineImpl {
sync(false),
rate_limiter(nullptr),
size_limit(0),
stats(nullptr),
src_checksum_func_name(kUnknownFileChecksumFuncName),
src_checksum_hex(""),
db_id(""),
@ -622,6 +625,7 @@ class BackupEngineImpl {
sync = o.sync;
rate_limiter = o.rate_limiter;
size_limit = o.size_limit;
stats = o.stats;
result = std::move(o.result);
progress_callback = std::move(o.progress_callback);
src_checksum_func_name = std::move(o.src_checksum_func_name);
@ -634,7 +638,7 @@ class BackupEngineImpl {
CopyOrCreateWorkItem(
std::string _src_path, std::string _dst_path, std::string _contents,
Env* _src_env, Env* _dst_env, EnvOptions _src_env_options, bool _sync,
RateLimiter* _rate_limiter, uint64_t _size_limit,
RateLimiter* _rate_limiter, uint64_t _size_limit, Statistics* _stats,
std::function<void()> _progress_callback = []() {},
const std::string& _src_checksum_func_name =
kUnknownFileChecksumFuncName,
@ -649,6 +653,7 @@ class BackupEngineImpl {
sync(_sync),
rate_limiter(_rate_limiter),
size_limit(_size_limit),
stats(_stats),
progress_callback(_progress_callback),
src_checksum_func_name(_src_checksum_func_name),
src_checksum_hex(_src_checksum_hex),
@ -756,8 +761,8 @@ class BackupEngineImpl {
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& fname, // starts with "/"
const EnvOptions& src_env_options, RateLimiter* rate_limiter,
FileType file_type, uint64_t size_bytes, uint64_t size_limit = 0,
bool shared_checksum = false,
FileType file_type, uint64_t size_bytes, Statistics* stats,
uint64_t size_limit = 0, bool shared_checksum = false,
std::function<void()> progress_callback = []() {},
const std::string& contents = std::string(),
const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName,
@ -1153,6 +1158,13 @@ Status BackupEngineImpl::Initialize() {
port::SetCpuPriority(0, priority);
current_priority = priority;
}
// `bytes_read` and `bytes_written` stats are enabled based on
// compile-time support and cannot be dynamically toggled. So we do not
// need to worry about `PerfLevel` here, unlike many other
// `IOStatsContext` / `PerfContext` stats.
uint64_t prev_bytes_read = IOSTATS(bytes_read);
uint64_t prev_bytes_written = IOSTATS(bytes_written);
CopyOrCreateResult result;
result.status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
@ -1160,6 +1172,12 @@ Status BackupEngineImpl::Initialize() {
work_item.sync, work_item.rate_limiter, &result.size,
&result.checksum_hex, work_item.size_limit,
work_item.progress_callback);
RecordTick(work_item.stats, BACKUP_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
RecordTick(work_item.stats, BACKUP_WRITE_BYTES,
IOSTATS(bytes_written) - prev_bytes_written);
result.db_id = work_item.db_id;
result.db_session_id = work_item.db_session_id;
if (result.status.ok() && !work_item.src_checksum_hex.empty()) {
@ -1217,6 +1235,12 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
BackupID new_backup_id = latest_backup_id_ + 1;
// `bytes_read` and `bytes_written` stats are enabled based on compile-time
// support and cannot be dynamically toggled. So we do not need to worry about
// `PerfLevel` here, unlike many other `IOStatsContext` / `PerfContext` stats.
uint64_t prev_bytes_read = IOSTATS(bytes_read);
uint64_t prev_bytes_written = IOSTATS(bytes_written);
assert(backups_.find(new_backup_id) == backups_.end());
auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
@ -1270,10 +1294,11 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
// Add a CopyOrCreateWorkItem to the channel for each live file
Status disabled = db->DisableFileDeletions();
DBOptions db_options = db->GetDBOptions();
Statistics* stats = db_options.statistics.get();
if (s.ok()) {
CheckpointImpl checkpoint(db);
uint64_t sequence_number = 0;
DBOptions db_options = db->GetDBOptions();
FileChecksumGenFactory* db_checksum_factory =
db_options.file_checksum_gen_factory.get();
const std::string kFileChecksumGenFactoryName =
@ -1337,7 +1362,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
options_.share_table_files &&
(type == kTableFile || type == kBlobFile),
src_dirname, fname, src_env_options, rate_limiter, type,
size_bytes, size_limit_bytes,
size_bytes, db_options.statistics.get(), size_limit_bytes,
options_.share_files_with_checksum &&
(type == kTableFile || type == kBlobFile),
options.progress_callback, "" /* contents */,
@ -1352,8 +1377,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
live_dst_paths, backup_items_to_finish, new_backup_id,
false /* shared */, "" /* src_dir */, fname,
EnvOptions() /* src_env_options */, rate_limiter, type,
contents.size(), 0 /* size_limit */, false /* shared_checksum */,
options.progress_callback, contents);
contents.size(), db_options.statistics.get(), 0 /* size_limit */,
false /* shared_checksum */, options.progress_callback, contents);
} /* create_file_cb */,
&sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64,
compare_checksum);
@ -1415,8 +1440,27 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
if (s.ok()) {
backup_statistics_.IncrementNumberSuccessBackup();
}
if (!s.ok()) {
// here we know that we succeeded and installed the new backup
latest_backup_id_ = new_backup_id;
latest_valid_backup_id_ = new_backup_id;
if (new_backup_id_ptr) {
*new_backup_id_ptr = new_backup_id;
}
ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
// backup_speed is in byte/second
double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
new_backup->GetNumberFiles());
char human_size[16];
AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
backup_time);
ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
backup_statistics_.ToString().c_str());
} else {
backup_statistics_.IncrementNumberFailBackup();
// clean all the files we might have created
ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
@ -1426,30 +1470,11 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
// delete files that we might have already written
might_need_garbage_collect_ = true;
DeleteBackup(new_backup_id).PermitUncheckedError();
return s;
}
// here we know that we succeeded and installed the new backup
// in the LATEST_BACKUP file
latest_backup_id_ = new_backup_id;
latest_valid_backup_id_ = new_backup_id;
if (new_backup_id_ptr) {
*new_backup_id_ptr = new_backup_id;
}
ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
// backup_speed is in byte/second
double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
new_backup->GetNumberFiles());
char human_size[16];
AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
backup_time);
ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
backup_statistics_.ToString().c_str());
RecordTick(stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read);
RecordTick(stats, BACKUP_WRITE_BYTES,
IOSTATS(bytes_written) - prev_bytes_written);
return s;
}
@ -1746,7 +1771,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
CopyOrCreateWorkItem copy_or_create_work_item(
GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
EnvOptions() /* src_env_options */, options_.sync,
options_.restore_rate_limiter.get(), 0 /* size_limit */);
options_.restore_rate_limiter.get(), 0 /* size_limit */,
nullptr /* stats */);
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), file, dst,
file_info->checksum_hex);
@ -1981,7 +2007,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& fname, const EnvOptions& src_env_options,
RateLimiter* rate_limiter, FileType file_type, uint64_t size_bytes,
uint64_t size_limit, bool shared_checksum,
Statistics* stats, uint64_t size_limit, bool shared_checksum,
std::function<void()> progress_callback, const std::string& contents,
const std::string& src_checksum_func_name,
const std::string& src_checksum_str) {
@ -2176,8 +2202,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
CopyOrCreateWorkItem copy_or_create_work_item(
src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
size_limit, progress_callback, src_checksum_func_name, checksum_hex,
db_id, db_session_id);
size_limit, stats, progress_callback, src_checksum_func_name,
checksum_hex, db_id, db_session_id);
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), shared, need_to_copy,
backup_env_, temp_dest_path, final_dest_path, dst_relative);
@ -2870,6 +2896,7 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(
}
s = backup_meta_file->Append(Slice(buf.str()));
IOSTATS_ADD(bytes_written, buf.str().size());
if (s.ok() && sync) {
s = backup_meta_file->Sync();
}

View File

@ -3568,6 +3568,93 @@ TEST_F(BackupEngineTest, BackgroundThreadCpuPriority) {
DestroyDB(dbname_, options_);
}
// Populates `*total_size` with the size of all files under `backup_dir`.
// We don't go through `BackupEngine` currently because it's hard to figure out
// the metadata file size.
Status GetSizeOfBackupFiles(FileSystem* backup_fs,
const std::string& backup_dir, size_t* total_size) {
*total_size = 0;
std::vector<std::string> dir_stack = {backup_dir};
Status s;
while (s.ok() && !dir_stack.empty()) {
std::string dir = std::move(dir_stack.back());
dir_stack.pop_back();
std::vector<std::string> children;
s = backup_fs->GetChildren(dir, IOOptions(), &children, nullptr /* dbg */);
for (size_t i = 0; s.ok() && i < children.size(); ++i) {
std::string path = dir + "/" + children[i];
bool is_dir;
s = backup_fs->IsDirectory(path, IOOptions(), &is_dir, nullptr /* dbg */);
uint64_t file_size = 0;
if (s.ok()) {
if (is_dir) {
dir_stack.emplace_back(std::move(path));
} else {
s = backup_fs->GetFileSize(path, IOOptions(), &file_size,
nullptr /* dbg */);
}
}
if (s.ok()) {
*total_size += file_size;
}
}
}
return s;
}
TEST_F(BackupEngineTest, IOStats) {
// Tests the `BACKUP_READ_BYTES` and `BACKUP_WRITE_BYTES` ticker stats have
// the expected values according to the files in the backups.
// These ticker stats are expected to be populated regardless of `PerfLevel`
// in user thread
SetPerfLevel(kDisable);
options_.statistics = CreateDBStatistics();
OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
kShareWithChecksum);
FillDB(db_.get(), 0 /* from */, 100 /* to */, kFlushMost);
ASSERT_EQ(0, options_.statistics->getTickerCount(BACKUP_READ_BYTES));
ASSERT_EQ(0, options_.statistics->getTickerCount(BACKUP_WRITE_BYTES));
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
false /* flush_before_backup */));
size_t orig_backup_files_size;
ASSERT_OK(GetSizeOfBackupFiles(test_backup_env_->GetFileSystem().get(),
backupdir_, &orig_backup_files_size));
size_t expected_bytes_written = orig_backup_files_size;
ASSERT_EQ(expected_bytes_written,
options_.statistics->getTickerCount(BACKUP_WRITE_BYTES));
// Bytes read is more difficult to pin down since there are reads for many
// purposes other than creating file, like `GetSortedWalFiles()` to find first
// sequence number, or `CreateNewBackup()` thread to find SST file session ID.
// So we loosely require there are at least as many reads as needed for
// copying, but not as many as twice that.
ASSERT_GE(options_.statistics->getTickerCount(BACKUP_READ_BYTES),
expected_bytes_written);
ASSERT_LT(expected_bytes_written,
2 * options_.statistics->getTickerCount(BACKUP_READ_BYTES));
FillDB(db_.get(), 100 /* from */, 200 /* to */, kFlushMost);
ASSERT_OK(options_.statistics->Reset());
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
false /* flush_before_backup */));
size_t final_backup_files_size;
ASSERT_OK(GetSizeOfBackupFiles(test_backup_env_->GetFileSystem().get(),
backupdir_, &final_backup_files_size));
expected_bytes_written = final_backup_files_size - orig_backup_files_size;
ASSERT_EQ(expected_bytes_written,
options_.statistics->getTickerCount(BACKUP_WRITE_BYTES));
// See above for why these bounds were chosen.
ASSERT_GE(options_.statistics->getTickerCount(BACKUP_READ_BYTES),
expected_bytes_written);
ASSERT_LT(expected_bytes_written,
2 * options_.statistics->getTickerCount(BACKUP_READ_BYTES));
}
} // anon namespace
} // namespace ROCKSDB_NAMESPACE