diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 8e5ef5220..6048082d8 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -10,6 +10,7 @@ #include "utilities/backupable_db.h" #include "db/filename.h" #include "util/coding.h" +#include "util/crc32c.h" #include "rocksdb/transaction_log.h" #define __STDC_FORMAT_MACROS @@ -48,12 +49,22 @@ class BackupEngine { void DeleteBackupsNewerThan(uint64_t sequence_number); private: + struct FileInfo { + FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum) + : refs(0), filename(fname), size(sz), checksum_value(checksum) {} + + int refs; + const std::string filename; + const uint64_t size; + uint32_t checksum_value; + }; + class BackupMeta { public: BackupMeta(const std::string& meta_filename, - std::unordered_map* file_refs, Env* env) + std::unordered_map* file_infos, Env* env) : timestamp_(0), size_(0), meta_filename_(meta_filename), - file_refs_(file_refs), env_(env) {} + file_infos_(file_infos), env_(env) {} ~BackupMeta() {} @@ -73,7 +84,8 @@ class BackupEngine { return sequence_number_; } - void AddFile(const std::string& filename, uint64_t size); + Status AddFile(const FileInfo& file_info); + void Delete(); bool Empty() { @@ -96,7 +108,7 @@ class BackupEngine { std::string const meta_filename_; // files with relative paths (without "/" prefix!!) std::vector files_; - std::unordered_map* file_refs_; + std::unordered_map* file_infos_; Env* env_; static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB @@ -141,6 +153,7 @@ class BackupEngine { Env* dst_env, bool sync, uint64_t* size = nullptr, + uint32_t* checksum_value = nullptr, uint64_t size_limit = 0); // if size_limit == 0, there is no size limit, copy everything Status BackupFile(BackupID backup_id, @@ -149,15 +162,21 @@ class BackupEngine { const std::string& src_dir, const std::string& src_fname, // starts with "/" uint64_t size_limit = 0); + + Status CalculateChecksum(const std::string& src, + Env* src_env, + uint64_t size_limit, + uint32_t* checksum_value); + // Will delete all the files we don't need anymore // If full_scan == true, it will do the full scan of files/ directory - // and delete all the files that are not referenced from backuped_file_refs_ + // and delete all the files that are not referenced from backuped_file_infos__ void GarbageCollection(bool full_scan); // backup state data BackupID latest_backup_id_; std::map backups_; - std::unordered_map backuped_file_refs_; + std::unordered_map backuped_file_infos_; std::vector obsolete_backups_; std::atomic stop_backup_; @@ -198,7 +217,7 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) assert(backups_.find(backup_id) == backups_.end()); backups_.insert(std::make_pair( backup_id, BackupMeta(GetBackupMetaFile(backup_id), - &backuped_file_refs_, backup_env_))); + &backuped_file_infos_, backup_env_))); } if (options_.destroy_old_data) { // Destory old data @@ -302,7 +321,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { assert(backups_.find(new_backup_id) == backups_.end()); auto ret = backups_.insert(std::make_pair( new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id), - &backuped_file_refs_, backup_env_))); + &backuped_file_infos_, backup_env_))); assert(ret.second == true); auto& new_backup = ret.first->second; new_backup.RecordTimestamp(); @@ -478,10 +497,19 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id, "/" + dst; Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); - s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false); + uint32_t checksum_value; + s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false, + nullptr /* size */, &checksum_value); if (!s.ok()) { break; } + + const auto iter = backuped_file_infos_.find(file); + assert(iter != backuped_file_infos_.end()); + if (iter->second.checksum_value != checksum_value) { + s = Status::Corruption("Checksum check failed"); + break; + } } Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str()); @@ -555,6 +583,7 @@ Status BackupEngine::CopyFile(const std::string& src, Env* dst_env, bool sync, uint64_t* size, + uint32_t* checksum_value, uint64_t size_limit) { Status s; unique_ptr dst_file; @@ -564,6 +593,9 @@ Status BackupEngine::CopyFile(const std::string& src, if (size != nullptr) { *size = 0; } + if (checksum_value != nullptr) { + *checksum_value = 0; + } // Check if size limit is set. if not, set it to very big number if (size_limit == 0) { @@ -589,12 +621,19 @@ Status BackupEngine::CopyFile(const std::string& src, copy_file_buffer_size_ : size_limit; s = src_file->Read(buffer_to_read, &data, buf.get()); size_limit -= data.size(); + + if (!s.ok()) { + return s; + } + if (size != nullptr) { *size += data.size(); } - if (s.ok()) { - s = dst_file->Append(data); + if (checksum_value != nullptr) { + *checksum_value = crc32c::Extend(*checksum_value, data.data(), + data.size()); } + s = dst_file->Append(data); } while (s.ok() && data.size() > 0 && size_limit > 0); if (s.ok() && sync) { @@ -629,9 +668,15 @@ Status BackupEngine::BackupFile(BackupID backup_id, // if it's shared, we also need to check if it exists -- if it does, // no need to copy it again + uint32_t checksum_value = 0; if (shared && backup_env_->FileExists(dst_path)) { backup_env_->GetFileSize(dst_path, &size); // Ignore error - Log(options_.info_log, "%s already present", src_fname.c_str()); + Log(options_.info_log, "%s already present, calculate checksum", + src_fname.c_str()); + s = CalculateChecksum(src_dir + src_fname, + db_env_, + size_limit, + &checksum_value); } else { Log(options_.info_log, "Copying %s", src_fname.c_str()); s = CopyFile(src_dir + src_fname, @@ -640,22 +685,63 @@ Status BackupEngine::BackupFile(BackupID backup_id, backup_env_, options_.sync, &size, + &checksum_value, size_limit); if (s.ok() && shared) { s = backup_env_->RenameFile(dst_path_tmp, dst_path); } } if (s.ok()) { - backup->AddFile(dst_relative, size); + s = backup->AddFile(FileInfo(dst_relative, size, checksum_value)); } return s; } +Status BackupEngine::CalculateChecksum(const std::string& src, + Env* src_env, + uint64_t size_limit, + uint32_t* checksum_value) { + *checksum_value = 0; + if (size_limit == 0) { + size_limit = std::numeric_limits::max(); + } + + EnvOptions env_options; + env_options.use_mmap_writes = false; + + std::unique_ptr src_file; + Status s = src_env->NewSequentialFile(src, &src_file, env_options); + if (!s.ok()) { + return s; + } + + std::unique_ptr buf(new char[copy_file_buffer_size_]); + Slice data; + + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return Status::Incomplete("Backup stopped"); + } + size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ? + copy_file_buffer_size_ : size_limit; + s = src_file->Read(buffer_to_read, &data, buf.get()); + + if (!s.ok()) { + return s; + } + + size_limit -= data.size(); + *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size()); + } while (data.size() > 0 && size_limit > 0); + + return s; +} + void BackupEngine::GarbageCollection(bool full_scan) { Log(options_.info_log, "Starting garbage collection"); std::vector to_delete; - for (auto& itr : backuped_file_refs_) { - if (itr.second == 0) { + for (auto& itr : backuped_file_infos_) { + if (itr.second.refs == 0) { Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first)); Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), s.ToString().c_str()); @@ -663,7 +749,7 @@ void BackupEngine::GarbageCollection(bool full_scan) { } } for (auto& td : to_delete) { - backuped_file_refs_.erase(td); + backuped_file_infos_.erase(td); } if (!full_scan) { // take care of private dirs -- if full_scan == true, then full_scan will @@ -686,7 +772,7 @@ void BackupEngine::GarbageCollection(bool full_scan) { for (auto& child : shared_children) { std::string rel_fname = GetSharedFileRel(child); // if it's not refcounted, delete it - if (backuped_file_refs_.find(rel_fname) == backuped_file_refs_.end()) { + if (backuped_file_infos_.find(rel_fname) == backuped_file_infos_.end()) { // this might be a directory, but DeleteFile will just fail in that // case, so we're good Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname)); @@ -731,23 +817,34 @@ void BackupEngine::GarbageCollection(bool full_scan) { // ------- BackupMeta class -------- -void BackupEngine::BackupMeta::AddFile(const std::string& filename, - uint64_t size) { - size_ += size; - files_.push_back(filename); - auto itr = file_refs_->find(filename); - if (itr == file_refs_->end()) { - file_refs_->insert(std::make_pair(filename, 1)); +Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) { + size_ += file_info.size; + files_.push_back(file_info.filename); + + auto itr = file_infos_->find(file_info.filename); + if (itr == file_infos_->end()) { + auto ret = file_infos_->insert({file_info.filename, file_info}); + if (ret.second) { + ret.first->second.refs = 1; + } else { + // if this happens, something is seriously wrong + return Status::Corruption("In memory metadata insertion error"); + } } else { - ++itr->second; // increase refcount if already present + if (itr->second.checksum_value != file_info.checksum_value) { + return Status::Corruption("Checksum mismatch for existing backup file"); + } + ++itr->second.refs; // increase refcount if already present } + + return Status::OK(); } void BackupEngine::BackupMeta::Delete() { - for (auto& file : files_) { - auto itr = file_refs_->find(file); - assert(itr != file_refs_->end()); - --(itr->second); // decrease refcount + for (const auto& file : files_) { + auto itr = file_infos_->find(file); + assert(itr != file_infos_->end()); + --(itr->second.refs); // decrease refcount } files_.clear(); // delete meta file @@ -759,8 +856,8 @@ void BackupEngine::BackupMeta::Delete() { // // // -// -// +// +// // ... // TODO: maybe add checksum? Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { @@ -790,18 +887,40 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { sscanf(data.data(), "%u%n", &num_files, &bytes_read); data.remove_prefix(bytes_read + 1); // +1 for '\n' - std::vector> files; + std::vector files; for (uint32_t i = 0; s.ok() && i < num_files; ++i) { - std::string filename = GetSliceUntil(&data, '\n').ToString(); + auto line = GetSliceUntil(&data, '\n'); + std::string filename = GetSliceUntil(&line, ' ').ToString(); + uint64_t size; s = env_->GetFileSize(backup_dir + "/" + filename, &size); - files.push_back(std::make_pair(filename, size)); + + if (line.empty()) { + return Status::Corruption("File checksum is missing"); + } + + uint32_t checksum_value = 0; + if (line.starts_with("crc32 ")) { + line.remove_prefix(6); + sscanf(line.data(), "%u", &checksum_value); + if (memcmp(line.data(), std::to_string(checksum_value).c_str(), + line.size() - 1) != 0) { + return Status::Corruption("Invalid checksum value"); + } + } else { + return Status::Corruption("Unknown checksum type"); + } + + files.emplace_back(filename, size, checksum_value); } if (s.ok()) { - for (auto file : files) { - AddFile(file.first, file.second); + for (const auto& file_info : files) { + s = AddFile(file_info); + if (!s.ok()) { + break; + } } } @@ -825,8 +944,13 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) { len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n", sequence_number_); len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size()); - for (size_t i = 0; i < files_.size(); ++i) { - len += snprintf(buf.get() + len, buf_size - len, "%s\n", files_[i].c_str()); + for (const auto& file : files_) { + const auto& iter = file_infos_->find(file); + + assert(iter != file_infos_->end()); + // use crc32 for now, switch to something else if needed + len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n", + file.c_str(), iter->second.checksum_value); } s = backup_meta_file->Append(Slice(buf.get(), (size_t)len)); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index de240558f..c5909f8e7 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -154,7 +154,6 @@ class TestEnv : public EnvWrapper { Status NewSequentialFile(const std::string& f, unique_ptr* r, const EnvOptions& options) { - opened_files_.push_back(f); if (dummy_sequential_file_) { r->reset(new TestEnv::DummySequentialFile()); return Status::OK(); @@ -165,6 +164,7 @@ class TestEnv : public EnvWrapper { Status NewWritableFile(const std::string& f, unique_ptr* r, const EnvOptions& options) { + written_files_.push_back(f); if (limit_written_files_ <= 0) { return Status::IOError("Sorry, can't do this"); } @@ -172,14 +172,14 @@ class TestEnv : public EnvWrapper { return EnvWrapper::NewWritableFile(f, r, options); } - void AssertOpenedFiles(std::vector& should_have_opened) { - sort(should_have_opened.begin(), should_have_opened.end()); - sort(opened_files_.begin(), opened_files_.end()); - ASSERT_TRUE(opened_files_ == should_have_opened); + void AssertWrittenFiles(std::vector& should_have_written) { + sort(should_have_written.begin(), should_have_written.end()); + sort(written_files_.begin(), written_files_.end()); + ASSERT_TRUE(written_files_ == should_have_written); } - void ClearOpenedFiles() { - opened_files_.clear(); + void ClearWrittenFiles() { + written_files_.clear(); } void SetLimitWrittenFiles(uint64_t limit) { @@ -192,7 +192,7 @@ class TestEnv : public EnvWrapper { private: bool dummy_sequential_file_ = false; - std::vector opened_files_; + std::vector written_files_; uint64_t limit_written_files_ = 1000000; }; // TestEnv @@ -239,6 +239,46 @@ class FileManager : public EnvWrapper { return s; } + Status CorruptChecksum(const std::string& fname, bool appear_valid) { + std::string metadata; + Status s = ReadFileToString(this, fname, &metadata); + if (!s.ok()) { + return s; + } + s = DeleteFile(fname); + if (!s.ok()) { + return s; + } + + std::vector positions; + auto pos = metadata.find(" crc32 "); + if (pos == std::string::npos) { + return Status::Corruption("checksum not found"); + } + do { + positions.push_back(pos); + pos = metadata.find(" crc32 ", pos + 6); + } while (pos != std::string::npos); + + pos = positions[rnd_.Next() % positions.size()]; + if (metadata.size() < pos + 7) { + return Status::Corruption("bad CRC32 checksum value"); + } + + if (appear_valid) { + if (metadata[pos + 8] == '\n') { + // single digit value, safe to insert one more digit + metadata.insert(pos + 8, 1, '0'); + } else { + metadata.erase(pos + 8, 1); + } + } else { + metadata[pos + 7] = 'a'; + } + + return WriteToFile(fname, metadata); + } + Status WriteToFile(const std::string& fname, const std::string& data) { unique_ptr file; EnvOptions env_options; @@ -249,6 +289,7 @@ class FileManager : public EnvWrapper { } return file->Append(Slice(data)); } + private: Random rnd_; }; // FileManager @@ -412,30 +453,43 @@ TEST(BackupableDBTest, NoDoubleCopy) { // should write 5 DB files + LATEST_BACKUP + one meta file test_backup_env_->SetLimitWrittenFiles(7); - test_db_env_->ClearOpenedFiles(); + test_backup_env_->ClearWrittenFiles(); test_db_env_->SetLimitWrittenFiles(0); dummy_db_->live_files_ = { "/00010.sst", "/00011.sst", "/CURRENT", "/MANIFEST-01" }; dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; ASSERT_OK(db_->CreateNewBackup(false)); - std::vector should_have_openened = dummy_db_->live_files_; - should_have_openened.push_back("/00011.log"); - AppendPath(dbname_, should_have_openened); - test_db_env_->AssertOpenedFiles(should_have_openened); + std::vector should_have_written = { + "/shared/00010.sst.tmp", + "/shared/00011.sst.tmp", + "/private/1.tmp/CURRENT", + "/private/1.tmp/MANIFEST-01", + "/private/1.tmp/00011.log", + "/meta/1.tmp", + "/LATEST_BACKUP.tmp" + }; + AppendPath(dbname_ + "_backup", should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); // should write 4 new DB files + LATEST_BACKUP + one meta file // should not write/copy 00010.sst, since it's already there! test_backup_env_->SetLimitWrittenFiles(6); - test_db_env_->ClearOpenedFiles(); + test_backup_env_->ClearWrittenFiles(); dummy_db_->live_files_ = { "/00010.sst", "/00015.sst", "/CURRENT", "/MANIFEST-01" }; dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; ASSERT_OK(db_->CreateNewBackup(false)); // should not open 00010.sst - it's already there - should_have_openened = { "/00015.sst", "/CURRENT", - "/MANIFEST-01", "/00011.log" }; - AppendPath(dbname_, should_have_openened); - test_db_env_->AssertOpenedFiles(should_have_openened); + should_have_written = { + "/shared/00015.sst.tmp", + "/private/2.tmp/CURRENT", + "/private/2.tmp/MANIFEST-01", + "/private/2.tmp/00011.log", + "/meta/2.tmp", + "/LATEST_BACKUP.tmp" + }; + AppendPath(dbname_ + "_backup", should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); ASSERT_OK(db_->DeleteBackup(1)); ASSERT_EQ(true, @@ -463,6 +517,8 @@ TEST(BackupableDBTest, NoDoubleCopy) { // 3. Corrupted backup meta file or missing backuped file - we should // not be able to open that backup, but all other backups should be // fine +// 4. Corrupted checksum value - if the checksum is not a valid uint32_t, +// db open should fail, otherwise, it aborts during the restore process. TEST(BackupableDBTest, CorruptionsTest) { const int keys_iteration = 5000; Random rnd(6); @@ -519,12 +575,29 @@ TEST(BackupableDBTest, CorruptionsTest) { CloseRestoreDB(); ASSERT_TRUE(!s.ok()); - // new backup should be 4! + // --------- case 4. corrupted checksum value ---- + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false)); + // checksum of backup 3 is an invalid value, this can be detected at + // db open time, and it reverts to the previous backup automatically + AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5); + // checksum of the backup 2 appears to be valid, this can cause checksum + // mismatch and abort restore process + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true)); + ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2")); + OpenRestoreDB(); + ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2")); + s = restore_db_->RestoreDBFromBackup(2, dbname_, dbname_); + ASSERT_TRUE(!s.ok()); + ASSERT_OK(restore_db_->DeleteBackup(2)); + CloseRestoreDB(); + AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5); + + // new backup should be 2! OpenBackupableDB(); - FillDB(db_.get(), keys_iteration * 3, keys_iteration * 4); + FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2); ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2))); CloseBackupableDB(); - AssertBackupConsistency(4, 0, keys_iteration * 4, keys_iteration * 5); + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5); } // open DB, write, close DB, backup, restore, repeat