Merge pull request #452 from robertabcd/backupable-mem

Reduce memory footprint in backupable db.
This commit is contained in:
Igor Canadi 2015-01-28 10:53:46 -08:00
commit 5257c9c42c

View File

@ -145,19 +145,26 @@ class BackupEngineImpl : public BackupEngine {
FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum) FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
: refs(0), filename(fname), size(sz), checksum_value(checksum) {} : refs(0), filename(fname), size(sz), checksum_value(checksum) {}
FileInfo(const FileInfo&) = delete;
FileInfo& operator=(const FileInfo&) = delete;
int refs; int refs;
const std::string filename; const std::string filename;
const uint64_t size; const uint64_t size;
uint32_t checksum_value; const uint32_t checksum_value;
}; };
class BackupMeta { class BackupMeta {
public: public:
BackupMeta(const std::string& meta_filename, BackupMeta(const std::string& meta_filename,
std::unordered_map<std::string, FileInfo>* file_infos, Env* env) std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
Env* env)
: timestamp_(0), size_(0), meta_filename_(meta_filename), : timestamp_(0), size_(0), meta_filename_(meta_filename),
file_infos_(file_infos), env_(env) {} file_infos_(file_infos), env_(env) {}
BackupMeta(const BackupMeta&) = delete;
BackupMeta& operator=(const BackupMeta&) = delete;
~BackupMeta() {} ~BackupMeta() {}
void RecordTimestamp() { void RecordTimestamp() {
@ -177,7 +184,7 @@ class BackupEngineImpl : public BackupEngine {
return sequence_number_; return sequence_number_;
} }
Status AddFile(const FileInfo& file_info); Status AddFile(std::shared_ptr<FileInfo> file_info);
void Delete(bool delete_meta = true); void Delete(bool delete_meta = true);
@ -185,14 +192,14 @@ class BackupEngineImpl : public BackupEngine {
return files_.empty(); return files_.empty();
} }
const FileInfo* GetFile(const std::string& filename) const { std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
auto it = file_infos_->find(filename); auto it = file_infos_->find(filename);
if (it == file_infos_->end()) if (it == file_infos_->end())
return nullptr; return nullptr;
return &it->second; return it->second;
} }
const std::vector<std::string>& GetFiles() { const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
return files_; return files_;
} }
@ -207,8 +214,8 @@ class BackupEngineImpl : public BackupEngine {
uint64_t size_; uint64_t size_;
std::string const meta_filename_; std::string const meta_filename_;
// files with relative paths (without "/" prefix!!) // files with relative paths (without "/" prefix!!)
std::vector<std::string> files_; std::vector<std::shared_ptr<FileInfo>> files_;
std::unordered_map<std::string, FileInfo>* file_infos_; std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
Env* env_; Env* env_;
static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
@ -297,9 +304,11 @@ class BackupEngineImpl : public BackupEngine {
// backup state data // backup state data
BackupID latest_backup_id_; BackupID latest_backup_id_;
std::map<BackupID, BackupMeta> backups_; std::map<BackupID, unique_ptr<BackupMeta>> backups_;
std::map<BackupID, std::pair<Status, BackupMeta> > corrupt_backups_; std::map<BackupID,
std::unordered_map<std::string, FileInfo> backuped_file_infos_; std::pair<Status, unique_ptr<BackupMeta>>> corrupt_backups_;
std::unordered_map<std::string,
std::shared_ptr<FileInfo>> backuped_file_infos_;
std::atomic<bool> stop_backup_; std::atomic<bool> stop_backup_;
// options data // options data
@ -382,9 +391,10 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
continue; continue;
} }
assert(backups_.find(backup_id) == backups_.end()); assert(backups_.find(backup_id) == backups_.end());
backups_.insert(std::make_pair( backups_.emplace(backup_id,
backup_id, BackupMeta(GetBackupMetaFile(backup_id), unique_ptr<BackupMeta>(new BackupMeta(
&backuped_file_infos_, backup_env_))); GetBackupMetaFile(backup_id),
&backuped_file_infos_, backup_env_)));
} }
if (options_.destroy_old_data) { // Destory old data if (options_.destroy_old_data) { // Destory old data
@ -396,16 +406,16 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
} else { // Load data from storage } else { // Load data from storage
// load the backups if any // load the backups if any
for (auto& backup : backups_) { for (auto& backup : backups_) {
Status s = backup.second.LoadFromFile(options_.backup_dir); Status s = backup.second->LoadFromFile(options_.backup_dir);
if (!s.ok()) { if (!s.ok()) {
Log(options_.info_log, "Backup %u corrupted -- %s", backup.first, Log(options_.info_log, "Backup %u corrupted -- %s", backup.first,
s.ToString().c_str()); s.ToString().c_str());
corrupt_backups_.insert(std::make_pair( corrupt_backups_.insert(std::make_pair(
backup.first, std::make_pair(s, backup.second))); backup.first, std::make_pair(s, std::move(backup.second))));
} }
} }
for (auto corrupt : corrupt_backups_) { for (const auto& corrupt : corrupt_backups_) {
backups_.erase(backups_.find(corrupt.first)); backups_.erase(backups_.find(corrupt.first));
} }
@ -465,13 +475,14 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
BackupID new_backup_id = latest_backup_id_ + 1; BackupID new_backup_id = latest_backup_id_ + 1;
assert(backups_.find(new_backup_id) == backups_.end()); assert(backups_.find(new_backup_id) == backups_.end());
auto ret = backups_.insert(std::make_pair( auto ret = backups_.emplace(new_backup_id,
new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id), unique_ptr<BackupMeta>(new BackupMeta(
&backuped_file_infos_, backup_env_))); GetBackupMetaFile(new_backup_id),
&backuped_file_infos_, backup_env_)));
assert(ret.second == true); assert(ret.second == true);
auto& new_backup = ret.first->second; auto& new_backup = ret.first->second;
new_backup.RecordTimestamp(); new_backup->RecordTimestamp();
new_backup.SetSequenceNumber(sequence_number); new_backup->SetSequenceNumber(sequence_number);
auto start_backup = backup_env_-> NowMicros(); auto start_backup = backup_env_-> NowMicros();
@ -506,7 +517,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
// * if it's kTableFile, then it's shared // * if it's kTableFile, then it's shared
// * if it's kDescriptorFile, limit the size to manifest_file_size // * if it's kDescriptorFile, limit the size to manifest_file_size
s = BackupFile(new_backup_id, s = BackupFile(new_backup_id,
&new_backup, new_backup.get(),
options_.share_table_files && type == kTableFile, options_.share_table_files && type == kTableFile,
db->GetName(), /* src_dir */ db->GetName(), /* src_dir */
live_files[i], /* src_fname */ live_files[i], /* src_fname */
@ -521,7 +532,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
// we only care about live log files // we only care about live log files
// copy the file into backup_dir/files/<new backup>/ // copy the file into backup_dir/files/<new backup>/
s = BackupFile(new_backup_id, s = BackupFile(new_backup_id,
&new_backup, new_backup.get(),
false, /* not shared */ false, /* not shared */
db->GetOptions().wal_dir, db->GetOptions().wal_dir,
live_wal_files[i]->PathName(), live_wal_files[i]->PathName(),
@ -543,7 +554,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
if (s.ok()) { if (s.ok()) {
// persist the backup metadata on the disk // persist the backup metadata on the disk
s = new_backup.StoreToFile(options_.sync); s = new_backup->StoreToFile(options_.sync);
} }
if (s.ok()) { if (s.ok()) {
// install the newly created backup meta! (atomic) // install the newly created backup meta! (atomic)
@ -591,11 +602,11 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
Log(options_.info_log, "Backup DONE. All is good"); Log(options_.info_log, "Backup DONE. All is good");
// backup_speed is in byte/second // backup_speed is in byte/second
double backup_speed = new_backup.GetSize() / (1.048576 * backup_time); double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
Log(options_.info_log, "Backup number of files: %u", Log(options_.info_log, "Backup number of files: %u",
new_backup.GetNumberFiles()); new_backup->GetNumberFiles());
Log(options_.info_log, "Backup size: %" PRIu64 " bytes", Log(options_.info_log, "Backup size: %" PRIu64 " bytes",
new_backup.GetSize()); new_backup->GetSize());
Log(options_.info_log, "Backup time: %" PRIu64 " microseconds", backup_time); Log(options_.info_log, "Backup time: %" PRIu64 " microseconds", backup_time);
Log(options_.info_log, "Backup speed: %.3f MB/s", backup_speed); Log(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
Log(options_.info_log, "Backup Statistics %s", Log(options_.info_log, "Backup Statistics %s",
@ -624,20 +635,20 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
Log(options_.info_log, "Deleting backup %u", backup_id); Log(options_.info_log, "Deleting backup %u", backup_id);
auto backup = backups_.find(backup_id); auto backup = backups_.find(backup_id);
if (backup != backups_.end()) { if (backup != backups_.end()) {
backup->second.Delete(); backup->second->Delete();
backups_.erase(backup); backups_.erase(backup);
} else { } else {
auto corrupt = corrupt_backups_.find(backup_id); auto corrupt = corrupt_backups_.find(backup_id);
if (corrupt == corrupt_backups_.end()) { if (corrupt == corrupt_backups_.end()) {
return Status::NotFound("Backup not found"); return Status::NotFound("Backup not found");
} }
corrupt->second.second.Delete(); corrupt->second.second->Delete();
corrupt_backups_.erase(corrupt); corrupt_backups_.erase(corrupt);
} }
std::vector<std::string> to_delete; std::vector<std::string> to_delete;
for (auto& itr : backuped_file_infos_) { for (auto& itr : backuped_file_infos_) {
if (itr.second.refs == 0) { if (itr.second->refs == 0) {
Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first)); Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
s.ToString().c_str()); s.ToString().c_str());
@ -660,10 +671,11 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) { void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
backup_info->reserve(backups_.size()); backup_info->reserve(backups_.size());
for (auto& backup : backups_) { for (auto& backup : backups_) {
if (!backup.second.Empty()) { if (!backup.second->Empty()) {
backup_info->push_back(BackupInfo( backup_info->push_back(BackupInfo(
backup.first, backup.second.GetTimestamp(), backup.second.GetSize(), backup.first, backup.second->GetTimestamp(),
backup.second.GetNumberFiles())); backup.second->GetSize(),
backup.second->GetNumberFiles()));
} }
} }
} }
@ -689,7 +701,7 @@ Status BackupEngineImpl::RestoreDBFromBackup(
return Status::NotFound("Backup not found"); return Status::NotFound("Backup not found");
} }
auto& backup = backup_itr->second; auto& backup = backup_itr->second;
if (backup.Empty()) { if (backup->Empty()) {
return Status::NotFound("Backup not found"); return Status::NotFound("Backup not found");
} }
@ -737,7 +749,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(
options_.restore_rate_limit, copy_file_buffer_size_)); options_.restore_rate_limit, copy_file_buffer_size_));
} }
Status s; Status s;
for (auto& file : backup.GetFiles()) { for (const auto& file_info : backup->GetFiles()) {
const std::string &file = file_info->filename;
std::string dst; std::string dst;
// 1. extract the filename // 1. extract the filename
size_t slash = file.find_last_of('/'); size_t slash = file.find_last_of('/');
@ -772,9 +785,7 @@ Status BackupEngineImpl::RestoreDBFromBackup(
break; break;
} }
const auto iter = backuped_file_infos_.find(file); if (file_info->checksum_value != checksum_value) {
assert(iter != backuped_file_infos_.end());
if (iter->second.checksum_value != checksum_value) {
s = Status::Corruption("Checksum check failed"); s = Status::Corruption("Checksum check failed");
break; break;
} }
@ -988,7 +999,8 @@ Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
} }
} }
if (s.ok()) { if (s.ok()) {
s = backup->AddFile(FileInfo(dst_relative, size, checksum_value)); s = backup->AddFile(std::make_shared<FileInfo>(
dst_relative, size, checksum_value));
} }
return s; return s;
} }
@ -1107,34 +1119,34 @@ Status BackupEngineImpl::GarbageCollect() {
// ------- BackupMeta class -------- // ------- BackupMeta class --------
Status BackupEngineImpl::BackupMeta::AddFile(const FileInfo& file_info) { Status BackupEngineImpl::BackupMeta::AddFile(
size_ += file_info.size; std::shared_ptr<FileInfo> file_info) {
files_.push_back(file_info.filename); auto itr = file_infos_->find(file_info->filename);
auto itr = file_infos_->find(file_info.filename);
if (itr == file_infos_->end()) { if (itr == file_infos_->end()) {
auto ret = file_infos_->insert({file_info.filename, file_info}); auto ret = file_infos_->emplace(file_info->filename, file_info);
if (ret.second) { if (ret.second) {
ret.first->second.refs = 1; itr = ret.first;
itr->second->refs = 1;
} else { } else {
// if this happens, something is seriously wrong // if this happens, something is seriously wrong
return Status::Corruption("In memory metadata insertion error"); return Status::Corruption("In memory metadata insertion error");
} }
} else { } else {
if (itr->second.checksum_value != file_info.checksum_value) { if (itr->second->checksum_value != file_info->checksum_value) {
return Status::Corruption("Checksum mismatch for existing backup file"); return Status::Corruption("Checksum mismatch for existing backup file");
} }
++itr->second.refs; // increase refcount if already present ++itr->second->refs; // increase refcount if already present
} }
size_ += file_info->size;
files_.push_back(itr->second);
return Status::OK(); return Status::OK();
} }
void BackupEngineImpl::BackupMeta::Delete(bool delete_meta) { void BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
for (const auto& file : files_) { for (const auto& file : files_) {
auto itr = file_infos_->find(file); --file->refs; // decrease refcount
assert(itr != file_infos_->end());
--(itr->second.refs); // decrease refcount
} }
files_.clear(); files_.clear();
// delete meta file // delete meta file
@ -1179,7 +1191,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10)); num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
data.remove_prefix(next - data.data() + 1); // +1 for '\n' data.remove_prefix(next - data.data() + 1); // +1 for '\n'
std::vector<FileInfo> files; std::vector<std::shared_ptr<FileInfo>> files;
Slice checksum_prefix("crc32 "); Slice checksum_prefix("crc32 ");
@ -1188,8 +1200,8 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
std::string filename = GetSliceUntil(&line, ' ').ToString(); std::string filename = GetSliceUntil(&line, ' ').ToString();
uint64_t size; uint64_t size;
const FileInfo* file_info = GetFile(filename); const std::shared_ptr<FileInfo> file_info = GetFile(filename);
if (file_info != nullptr) { if (file_info) {
size = file_info->size; size = file_info->size;
} else { } else {
s = env_->GetFileSize(backup_dir + "/" + filename, &size); s = env_->GetFileSize(backup_dir + "/" + filename, &size);
@ -1214,7 +1226,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
return Status::Corruption("Unknown checksum type"); return Status::Corruption("Unknown checksum type");
} }
files.emplace_back(filename, size, checksum_value); files.emplace_back(new FileInfo(filename, size, checksum_value));
} }
if (s.ok() && data.size() > 0) { if (s.ok() && data.size() > 0) {
@ -1223,6 +1235,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
} }
if (s.ok()) { if (s.ok()) {
files_.reserve(files.size());
for (const auto& file_info : files) { for (const auto& file_info : files) {
s = AddFile(file_info); s = AddFile(file_info);
if (!s.ok()) { if (!s.ok()) {
@ -1252,12 +1265,9 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
sequence_number_); sequence_number_);
len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size()); len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size());
for (const auto& file : files_) { for (const auto& file : files_) {
const auto& iter = file_infos_->find(file);
assert(iter != file_infos_->end());
// use crc32 for now, switch to something else if needed // use crc32 for now, switch to something else if needed
len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n", len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n",
file.c_str(), iter->second.checksum_value); file->filename.c_str(), file->checksum_value);
} }
s = backup_meta_file->Append(Slice(buf.get(), (size_t)len)); s = backup_meta_file->Append(Slice(buf.get(), (size_t)len));