Get file attributes in bulk for VerifyBackup and CreateNewBackup
Summary: For VerifyBackup(), backup files can be spread across "shared/", "shared_checksum/", and "private/" subdirectories, so we have to bulk get all three. For CreateNewBackup(), we make two separate bulk calls: one for the data files and one for WAL files. There is also a new helper function, ExtendPathnameToSizeBytes(), that translates the file attributes vector to a map. I decided to leave GetChildrenFileAttributes()'s (from D53781) return type as vector to keep it consistent with GetChildren(). Depends on D53781. Test Plan: verified relevant unit tests $ ./backupable_db_test Reviewers: IslamAbdelRahman, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D53919
This commit is contained in:
parent
12fd9b1868
commit
f8e90e8753
@ -117,6 +117,12 @@ class BackupEngineImpl : public BackupEngine {
|
||||
private:
|
||||
void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
|
||||
|
||||
// Extends the "result" map with pathname->size mappings for the contents of
|
||||
// "dir". Pathnames are prefixed with "dir".
|
||||
Status InsertPathnameToSizeBytes(
|
||||
const std::string& dir,
|
||||
std::unordered_map<std::string, uint64_t>* result);
|
||||
|
||||
struct FileInfo {
|
||||
FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
|
||||
: refs(0), filename(fname), size(sz), checksum_value(checksum) {}
|
||||
@ -179,8 +185,10 @@ class BackupEngineImpl : public BackupEngine {
|
||||
return files_;
|
||||
}
|
||||
|
||||
Status LoadFromFile(const std::string& backup_dir,
|
||||
bool use_size_in_file_name);
|
||||
// @param abs_path_to_size Pre-fetched file sizes (bytes).
|
||||
Status LoadFromFile(
|
||||
const std::string& backup_dir, bool use_size_in_file_name,
|
||||
const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
|
||||
Status StoreToFile(bool sync);
|
||||
|
||||
std::string GetInfoString() {
|
||||
@ -427,7 +435,7 @@ class BackupEngineImpl : public BackupEngine {
|
||||
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
|
||||
BackupID backup_id, bool shared, const std::string& src_dir,
|
||||
const std::string& fname, // starts with "/"
|
||||
RateLimiter* rate_limiter, uint64_t size_limit = 0,
|
||||
RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit = 0,
|
||||
bool shared_checksum = false,
|
||||
std::function<void()> progress_callback = []() {},
|
||||
const std::string& contents = std::string());
|
||||
@ -573,10 +581,19 @@ Status BackupEngineImpl::Initialize() {
|
||||
return s;
|
||||
}
|
||||
} else { // Load data from storage
|
||||
std::unordered_map<std::string, uint64_t> abs_path_to_size;
|
||||
for (const auto& rel_dir :
|
||||
{GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
|
||||
const auto abs_dir = GetAbsolutePath(rel_dir);
|
||||
InsertPathnameToSizeBytes(abs_dir, &abs_path_to_size);
|
||||
}
|
||||
// load the backups if any
|
||||
for (auto& backup : backups_) {
|
||||
InsertPathnameToSizeBytes(
|
||||
GetAbsolutePath(GetPrivateFileRel(backup.first)), &abs_path_to_size);
|
||||
Status s = backup.second->LoadFromFile(
|
||||
options_.backup_dir, options_.use_file_size_in_file_name);
|
||||
options_.backup_dir, options_.use_file_size_in_file_name,
|
||||
abs_path_to_size);
|
||||
if (!s.ok()) {
|
||||
Log(options_.info_log, "Backup %u corrupted -- %s", backup.first,
|
||||
s.ToString().c_str());
|
||||
@ -685,6 +702,12 @@ Status BackupEngineImpl::CreateNewBackup(
|
||||
std::unordered_set<std::string> live_dst_paths;
|
||||
live_dst_paths.reserve(live_files.size() + live_wal_files.size());
|
||||
|
||||
// Pre-fetch sizes for data files
|
||||
std::unordered_map<std::string, uint64_t> data_path_to_size;
|
||||
if (s.ok()) {
|
||||
s = InsertPathnameToSizeBytes(db->GetName(), &data_path_to_size);
|
||||
}
|
||||
|
||||
std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
|
||||
// Add a CopyOrCreateWorkItem to the channel for each live file
|
||||
std::string manifest_fname, current_fname;
|
||||
@ -709,13 +732,19 @@ Status BackupEngineImpl::CreateNewBackup(
|
||||
manifest_fname = live_files[i];
|
||||
}
|
||||
|
||||
auto data_path_to_size_iter =
|
||||
data_path_to_size.find(db->GetName() + live_files[i]);
|
||||
uint64_t size_bytes = data_path_to_size_iter == data_path_to_size.end()
|
||||
? port::kMaxUint64
|
||||
: data_path_to_size_iter->second;
|
||||
|
||||
// rules:
|
||||
// * if it's kTableFile, then it's shared
|
||||
// * if it's kDescriptorFile, limit the size to manifest_file_size
|
||||
s = AddBackupFileWorkItem(
|
||||
live_dst_paths, backup_items_to_finish, new_backup_id,
|
||||
options_.share_table_files && type == kTableFile, db->GetName(),
|
||||
live_files[i], rate_limiter.get(),
|
||||
live_files[i], rate_limiter.get(), size_bytes,
|
||||
(type == kDescriptorFile) ? manifest_file_size : 0,
|
||||
options_.share_files_with_checksum && type == kTableFile,
|
||||
progress_callback);
|
||||
@ -725,21 +754,37 @@ Status BackupEngineImpl::CreateNewBackup(
|
||||
s = AddBackupFileWorkItem(
|
||||
live_dst_paths, backup_items_to_finish, new_backup_id,
|
||||
false /* shared */, "" /* src_dir */, CurrentFileName(""),
|
||||
rate_limiter.get(), 0 /* size_limit */, false /* shared_checksum */,
|
||||
progress_callback, manifest_fname.substr(1) + "\n");
|
||||
rate_limiter.get(), manifest_fname.size(), 0 /* size_limit */,
|
||||
false /* shared_checksum */, progress_callback,
|
||||
manifest_fname.substr(1) + "\n");
|
||||
}
|
||||
|
||||
// Pre-fetch sizes for WAL files
|
||||
std::unordered_map<std::string, uint64_t> wal_path_to_size;
|
||||
if (s.ok()) {
|
||||
if (db->GetOptions().wal_dir != "") {
|
||||
s = InsertPathnameToSizeBytes(db->GetOptions().wal_dir,
|
||||
&wal_path_to_size);
|
||||
} else {
|
||||
wal_path_to_size = std::move(data_path_to_size);
|
||||
}
|
||||
}
|
||||
|
||||
// Add a CopyOrCreateWorkItem to the channel for each WAL file
|
||||
for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
|
||||
auto wal_path_to_size_iter =
|
||||
wal_path_to_size.find(live_wal_files[i]->PathName());
|
||||
uint64_t size_bytes = wal_path_to_size_iter == wal_path_to_size.end()
|
||||
? port::kMaxUint64
|
||||
: wal_path_to_size_iter->second;
|
||||
if (live_wal_files[i]->Type() == kAliveLogFile) {
|
||||
// we only care about live log files
|
||||
// copy the file into backup_dir/files/<new backup>/
|
||||
s = AddBackupFileWorkItem(live_dst_paths,
|
||||
backup_items_to_finish,
|
||||
new_backup_id,
|
||||
false, /* not shared */
|
||||
s = AddBackupFileWorkItem(live_dst_paths, backup_items_to_finish,
|
||||
new_backup_id, false, /* not shared */
|
||||
db->GetOptions().wal_dir,
|
||||
live_wal_files[i]->PathName(),
|
||||
rate_limiter.get());
|
||||
rate_limiter.get(), size_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1070,21 +1115,20 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
|
||||
|
||||
Log(options_.info_log, "Verifying backup id %u\n", backup_id);
|
||||
|
||||
uint64_t size;
|
||||
Status result;
|
||||
std::string file_path;
|
||||
for (const auto& file_info : backup->GetFiles()) {
|
||||
const std::string& file = file_info->filename;
|
||||
file_path = GetAbsolutePath(file);
|
||||
result = backup_env_->FileExists(file_path);
|
||||
if (!result.ok()) {
|
||||
return result;
|
||||
std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
|
||||
for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
|
||||
GetSharedFileWithChecksumRel()}) {
|
||||
const auto abs_dir = GetAbsolutePath(rel_dir);
|
||||
InsertPathnameToSizeBytes(abs_dir, &curr_abs_path_to_size);
|
||||
}
|
||||
result = backup_env_->GetFileSize(file_path, &size);
|
||||
if (!result.ok()) {
|
||||
return result;
|
||||
} else if (size != file_info->size) {
|
||||
return Status::Corruption("File corrupted: " + file);
|
||||
|
||||
for (const auto& file_info : backup->GetFiles()) {
|
||||
const auto abs_path = GetAbsolutePath(file_info->filename);
|
||||
if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
|
||||
return Status::NotFound("File missing: " + abs_path);
|
||||
}
|
||||
if (file_info->size != curr_abs_path_to_size[abs_path]) {
|
||||
return Status::Corruption("File corrupted: " + abs_path);
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
@ -1219,30 +1263,29 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
|
||||
std::unordered_set<std::string>& live_dst_paths,
|
||||
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
|
||||
BackupID backup_id, bool shared, const std::string& src_dir,
|
||||
const std::string& fname, RateLimiter* rate_limiter, uint64_t size_limit,
|
||||
bool shared_checksum, std::function<void()> progress_callback,
|
||||
const std::string& contents) {
|
||||
const std::string& fname, RateLimiter* rate_limiter, uint64_t size_bytes,
|
||||
uint64_t size_limit, bool shared_checksum,
|
||||
std::function<void()> progress_callback, const std::string& contents) {
|
||||
assert(!fname.empty() && fname[0] == '/');
|
||||
assert(contents.empty() != src_dir.empty());
|
||||
|
||||
std::string dst_relative = fname.substr(1);
|
||||
std::string dst_relative_tmp;
|
||||
Status s;
|
||||
uint64_t size;
|
||||
uint32_t checksum_value = 0;
|
||||
|
||||
if (shared && shared_checksum) {
|
||||
// add checksum and file length to the file name
|
||||
s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
|
||||
&checksum_value);
|
||||
if (s.ok()) {
|
||||
s = db_env_->GetFileSize(src_dir + fname, &size);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
dst_relative = GetSharedFileWithChecksum(dst_relative, checksum_value,
|
||||
size);
|
||||
if (size_bytes == port::kMaxUint64) {
|
||||
return Status::NotFound("File missing: " + src_dir + fname);
|
||||
}
|
||||
dst_relative =
|
||||
GetSharedFileWithChecksum(dst_relative, checksum_value, size_bytes);
|
||||
dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
|
||||
dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
|
||||
} else if (shared) {
|
||||
@ -1282,7 +1325,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
|
||||
if (shared_checksum) {
|
||||
Log(options_.info_log,
|
||||
"%s already present, with checksum %u and size %" PRIu64,
|
||||
fname.c_str(), checksum_value, size);
|
||||
fname.c_str(), checksum_value, size_bytes);
|
||||
} else if (backuped_file_infos_.find(dst_relative) ==
|
||||
backuped_file_infos_.end() && !same_path) {
|
||||
// file already exists, but it's not referenced by any backup. overwrite
|
||||
@ -1295,7 +1338,6 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
|
||||
backup_env_->DeleteFile(dst_path);
|
||||
} else {
|
||||
// the file is present and referenced by a backup
|
||||
db_env_->GetFileSize(src_dir + fname, &size); // Ignore error
|
||||
Log(options_.info_log, "%s already present, calculate checksum",
|
||||
fname.c_str());
|
||||
s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
|
||||
@ -1324,7 +1366,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
|
||||
backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
|
||||
CopyOrCreateResult result;
|
||||
result.status = s;
|
||||
result.size = size;
|
||||
result.size = size_bytes;
|
||||
result.checksum_value = checksum_value;
|
||||
promise_result.set_value(std::move(result));
|
||||
}
|
||||
@ -1390,6 +1432,22 @@ void BackupEngineImpl::DeleteChildren(const std::string& dir,
|
||||
}
|
||||
}
|
||||
|
||||
Status BackupEngineImpl::InsertPathnameToSizeBytes(
|
||||
const std::string& dir, std::unordered_map<std::string, uint64_t>* result) {
|
||||
assert(result != nullptr);
|
||||
std::vector<Env::FileAttributes> files_attrs;
|
||||
Status status = backup_env_->GetChildrenFileAttributes(dir, &files_attrs);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
const bool slash_needed = dir.empty() || dir.back() != '/';
|
||||
for (const auto& file_attrs : files_attrs) {
|
||||
result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
|
||||
file_attrs.size_bytes);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BackupEngineImpl::GarbageCollect() {
|
||||
assert(!read_only_);
|
||||
Log(options_.info_log, "Starting garbage collection");
|
||||
@ -1561,8 +1619,9 @@ bool TEST_GetFileSizeFromBackupFileName(const std::string full_name,
|
||||
// <file1> <crc32(literal string)> <crc32_value>
|
||||
// <file2> <crc32(literal string)> <crc32_value>
|
||||
// ...
|
||||
Status BackupEngineImpl::BackupMeta::LoadFromFile(const std::string& backup_dir,
|
||||
bool use_size_in_file_name) {
|
||||
Status BackupEngineImpl::BackupMeta::LoadFromFile(
|
||||
const std::string& backup_dir, bool use_size_in_file_name,
|
||||
const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
|
||||
assert(Empty());
|
||||
Status s;
|
||||
unique_ptr<SequentialFile> backup_meta_file;
|
||||
@ -1606,9 +1665,11 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(const std::string& backup_dir,
|
||||
} else {
|
||||
if (!use_size_in_file_name ||
|
||||
!GetFileSizeFromBackupFileName(filename, &size)) {
|
||||
s = env_->GetFileSize(backup_dir + "/" + filename, &size);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
std::string abs_path = backup_dir + "/" + filename;
|
||||
try {
|
||||
size = abs_path_to_size.at(abs_path);
|
||||
} catch (std::out_of_range& e) {
|
||||
return Status::NotFound("Size missing for pathname: " + abs_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -232,6 +232,23 @@ class TestEnv : public EnvWrapper {
|
||||
return EnvWrapper::GetChildren(dir, r);
|
||||
}
|
||||
|
||||
// Some test cases do not actually create the test files (e.g., see
|
||||
// DummyDB::live_files_) - for those cases, we mock those files' attributes
|
||||
// so CreateNewBackup() can get their attributes.
|
||||
void SetFilenamesForMockedAttrs(const std::vector<std::string>& filenames) {
|
||||
filenames_for_mocked_attrs_ = filenames;
|
||||
}
|
||||
Status GetChildrenFileAttributes(
|
||||
const std::string& dir, std::vector<Env::FileAttributes>* r) override {
|
||||
if (filenames_for_mocked_attrs_.size() > 0) {
|
||||
for (const auto& filename : filenames_for_mocked_attrs_) {
|
||||
r->push_back({dir + filename, 10 /* size_bytes */});
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
return EnvWrapper::GetChildrenFileAttributes(dir, r);
|
||||
}
|
||||
|
||||
void SetCreateDirIfMissingFailure(bool fail) {
|
||||
create_dir_if_missing_failure_ = fail;
|
||||
}
|
||||
@ -255,6 +272,7 @@ class TestEnv : public EnvWrapper {
|
||||
port::Mutex mutex_;
|
||||
bool dummy_sequential_file_ = false;
|
||||
std::vector<std::string> written_files_;
|
||||
std::vector<std::string> filenames_for_mocked_attrs_;
|
||||
uint64_t limit_written_files_ = 1000000;
|
||||
uint64_t limit_delete_files_ = 1000000;
|
||||
|
||||
@ -780,6 +798,7 @@ TEST_F(BackupableDBTest, NoDoubleCopy) {
|
||||
dummy_db_->live_files_ = { "/00010.sst", "/00011.sst",
|
||||
"/CURRENT", "/MANIFEST-01" };
|
||||
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
|
||||
test_backup_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
|
||||
std::vector<std::string> should_have_written = {
|
||||
"/shared/00010.sst.tmp", "/shared/00011.sst.tmp",
|
||||
@ -796,6 +815,7 @@ TEST_F(BackupableDBTest, NoDoubleCopy) {
|
||||
dummy_db_->live_files_ = { "/00010.sst", "/00015.sst",
|
||||
"/CURRENT", "/MANIFEST-01" };
|
||||
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
|
||||
test_backup_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
|
||||
// should not open 00010.sst - it's already there
|
||||
should_have_written = {
|
||||
@ -846,6 +866,7 @@ TEST_F(BackupableDBTest, DifferentEnvs) {
|
||||
dummy_db_->live_files_ = { "/00010.sst", "/00011.sst",
|
||||
"/CURRENT", "/MANIFEST-01" };
|
||||
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
|
||||
test_backup_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
|
||||
|
||||
CloseDBAndBackupEngine();
|
||||
@ -857,6 +878,7 @@ TEST_F(BackupableDBTest, DifferentEnvs) {
|
||||
CloseDBAndBackupEngine();
|
||||
DestroyDB(dbname_, Options());
|
||||
|
||||
test_backup_env_->SetFilenamesForMockedAttrs({});
|
||||
AssertBackupConsistency(0, 0, 100, 500);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user