diff --git a/db/db_impl.cc b/db/db_impl.cc index 004661bc0..d963212df 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3540,8 +3540,8 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, // Pardon the long line but I think it is easier to read this way. snprintf(buf, sizeof(buf), " Compactions\n" - "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall Stall-cnt\n" - "--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" + "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" + "------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" ); value->append(buf); for (int level = 0; level < current->NumberLevels(); level++) { @@ -3561,9 +3561,21 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, total_bytes_read += bytes_read; total_bytes_written += stats_[level].bytes_written; + uint64_t stalls = level == 0 ? + (stall_level0_slowdown_count_ + + stall_level0_num_files_count_ + + stall_memtable_compaction_count_) : + stall_leveln_slowdown_count_[level]; + + double stall_us = level == 0 ? + (stall_level0_slowdown_ + + stall_level0_num_files_ + + stall_memtable_compaction_) : + stall_leveln_slowdown_[level]; + snprintf( buf, sizeof(buf), - "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n", + "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f %9lu\n", level, files, current->NumLevelBytes(level) / 1048576.0, @@ -3585,8 +3597,13 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, stats_[level].files_out_levelnp1, stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, stats_[level].count, - stall_leveln_slowdown_[level] / 1000000.0, - (unsigned long) stall_leveln_slowdown_count_[level]); + (int) ((double) stats_[level].micros / + 1000.0 / + (stats_[level].count + 1)), + (double) stall_us / 1000.0 / (stalls + 1), + stall_us / 1000000.0, + (unsigned long) stalls); + total_slowdown += stall_leveln_slowdown_[level]; total_slowdown_count += stall_leveln_slowdown_count_[level]; value->append(buf); diff --git a/db/log_reader.cc b/db/log_reader.cc index 6596cd84f..1dc567413 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -28,6 +28,8 @@ Reader::Reader(unique_ptr&& file, Reporter* reporter, backing_store_(new char[kBlockSize]), buffer_(), eof_(false), + read_error_(false), + eof_offset_(0), last_record_offset_(0), end_of_buffer_offset_(0), initial_offset_(initial_offset) { @@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() { return last_record_offset_; } +void Reader::UnmarkEOF() { + if (read_error_) { + return; + } + + eof_ = false; + + if (eof_offset_ == 0) { + return; + } + + // If the EOF was in the middle of a block (a partial block was read) we have + // to read the rest of the block as ReadPhysicalRecord can only read full + // blocks and expects the file position indicator to be aligned to the start + // of a block. + // + // consumed_bytes + buffer_size() + remaining == kBlockSize + + size_t consumed_bytes = eof_offset_ - buffer_.size(); + size_t remaining = kBlockSize - eof_offset_; + + // backing_store_ is used to concatenate what is left in buffer_ and + // the remainder of the block. If buffer_ already uses backing_store_, + // we just append the new data. + if (buffer_.data() != backing_store_ + consumed_bytes) { + // Buffer_ does not use backing_store_ for storage. + // Copy what is left in buffer_ to backing_store. + memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size()); + } + + Slice read_buffer; + Status status = file_->Read(remaining, &read_buffer, + backing_store_ + eof_offset_); + + size_t added = read_buffer.size(); + end_of_buffer_offset_ += added; + + if (!status.ok()) { + if (added > 0) { + ReportDrop(added, status); + } + + read_error_ = true; + return; + } + + if (read_buffer.data() != backing_store_ + eof_offset_) { + // Read did not write to backing_store_ + memmove(backing_store_ + eof_offset_, read_buffer.data(), + read_buffer.size()); + } + + buffer_ = Slice(backing_store_ + consumed_bytes, + eof_offset_ + added - consumed_bytes); + + if (added < remaining) { + eof_ = true; + eof_offset_ += added; + } else { + eof_offset_ = 0; + } +} + void Reader::ReportCorruption(size_t bytes, const char* reason) { ReportDrop(bytes, Status::Corruption(reason)); } @@ -184,7 +249,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) { unsigned int Reader::ReadPhysicalRecord(Slice* result) { while (true) { if (buffer_.size() < (size_t)kHeaderSize) { - if (!eof_) { + if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); Status status = file_->Read(kBlockSize, &buffer_, backing_store_); @@ -192,10 +257,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { if (!status.ok()) { buffer_.clear(); ReportDrop(kBlockSize, status); - eof_ = true; + read_error_ = true; return kEof; } else if (buffer_.size() < (size_t)kBlockSize) { eof_ = true; + eof_offset_ = buffer_.size(); } continue; } else if (buffer_.size() == 0) { diff --git a/db/log_reader.h b/db/log_reader.h index 8e277c821..81d334da2 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -69,9 +69,10 @@ class Reader { // when we know more data has been written to the file. we can use this // function to force the reader to look again in the file. - void UnmarkEOF() { - eof_ = false; - } + // Also aligns the file position indicator to the start of the next block + // by reading the rest of the data from the EOF position to the end of the + // block that was partially read. + void UnmarkEOF(); SequentialFile* file() { return file_.get(); } @@ -82,6 +83,11 @@ class Reader { char* const backing_store_; Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize + bool read_error_; // Error occurred while reading from file + + // Offset of the file position indicator within the last block when an + // EOF was detected. + size_t eof_offset_; // Offset of the last record returned by ReadRecord. uint64_t last_record_offset_; diff --git a/db/log_test.cc b/db/log_test.cc index dedbff0aa..636518835 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -47,36 +47,93 @@ class LogTest { public: std::string contents_; + explicit StringDest(Slice& reader_contents) : + WritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) { + reader_contents_ = Slice(contents_.data(), 0); + }; + virtual Status Close() { return Status::OK(); } - virtual Status Flush() { return Status::OK(); } + virtual Status Flush() { + ASSERT_TRUE(reader_contents_.size() <= last_flush_); + size_t offset = last_flush_ - reader_contents_.size(); + reader_contents_ = Slice( + contents_.data() + offset, + contents_.size() - offset); + last_flush_ = contents_.size(); + + return Status::OK(); + } virtual Status Sync() { return Status::OK(); } virtual Status Append(const Slice& slice) { contents_.append(slice.data(), slice.size()); return Status::OK(); } + void Drop(size_t bytes) { + contents_.resize(contents_.size() - bytes); + reader_contents_ = Slice( + reader_contents_.data(), reader_contents_.size() - bytes); + last_flush_ = contents_.size(); + } + + private: + Slice& reader_contents_; + size_t last_flush_; }; class StringSource : public SequentialFile { public: - Slice contents_; + Slice& contents_; bool force_error_; + size_t force_error_position_; + bool force_eof_; + size_t force_eof_position_; bool returned_partial_; - StringSource() : force_error_(false), returned_partial_(false) { } + explicit StringSource(Slice& contents) : + contents_(contents), + force_error_(false), + force_error_position_(0), + force_eof_(false), + force_eof_position_(0), + returned_partial_(false) { } virtual Status Read(size_t n, Slice* result, char* scratch) { ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error"; if (force_error_) { - force_error_ = false; - returned_partial_ = true; - return Status::Corruption("read error"); + if (force_error_position_ >= n) { + force_error_position_ -= n; + } else { + *result = Slice(contents_.data(), force_error_position_); + contents_.remove_prefix(force_error_position_); + force_error_ = false; + returned_partial_ = true; + return Status::Corruption("read error"); + } } if (contents_.size() < n) { n = contents_.size(); returned_partial_ = true; } - *result = Slice(contents_.data(), n); + + if (force_eof_) { + if (force_eof_position_ >= n) { + force_eof_position_ -= n; + } else { + force_eof_ = false; + n = force_eof_position_; + returned_partial_ = true; + } + } + + // By using scratch we ensure that caller has control over the + // lifetime of result.data() + memcpy(scratch, contents_.data(), n); + *result = Slice(scratch, n); + contents_.remove_prefix(n); return Status::OK(); } @@ -123,10 +180,10 @@ class LogTest { src->contents_ = dest_contents(); } + Slice reader_contents_; unique_ptr dest_holder_; unique_ptr source_holder_; ReportCollector report_; - bool reading_; Writer writer_; Reader reader_; @@ -135,16 +192,15 @@ class LogTest { static uint64_t initial_offset_last_record_offsets_[]; public: - LogTest() : dest_holder_(new StringDest), - source_holder_(new StringSource), - reading_(false), + LogTest() : reader_contents_(), + dest_holder_(new StringDest(reader_contents_)), + source_holder_(new StringSource(reader_contents_)), writer_(std::move(dest_holder_)), reader_(std::move(source_holder_), &report_, true/*checksum*/, 0/*initial_offset*/) { } void Write(const std::string& msg) { - ASSERT_TRUE(!reading_) << "Write() after starting to read"; writer_.AddRecord(Slice(msg)); } @@ -153,10 +209,6 @@ class LogTest { } std::string Read() { - if (!reading_) { - reading_ = true; - reset_source_contents(); - } std::string scratch; Slice record; if (reader_.ReadRecord(&record, &scratch)) { @@ -175,7 +227,9 @@ class LogTest { } void ShrinkSize(int bytes) { - dest_contents().resize(dest_contents().size() - bytes); + auto dest = dynamic_cast(writer_.file()); + assert(dest); + dest->Drop(bytes); } void FixChecksum(int header_offset, int len) { @@ -185,9 +239,10 @@ class LogTest { EncodeFixed32(&dest_contents()[header_offset], crc); } - void ForceError() { + void ForceError(size_t position = 0) { auto src = dynamic_cast(reader_.file()); src->force_error_ = true; + src->force_error_position_ = position; } size_t DroppedBytes() const { @@ -198,6 +253,22 @@ class LogTest { return report_.message_; } + void ForceEOF(size_t position = 0) { + auto src = dynamic_cast(reader_.file()); + src->force_eof_ = true; + src->force_eof_position_ = position; + } + + void UnmarkEOF() { + auto src = dynamic_cast(reader_.file()); + src->returned_partial_ = false; + reader_.UnmarkEOF(); + } + + bool IsEOF() { + return reader_.IsEOF(); + } + // Returns OK iff recorded error message contains "msg" std::string MatchError(const std::string& msg) const { if (report_.message_.find(msg) == std::string::npos) { @@ -217,9 +288,7 @@ class LogTest { void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, WrittenBytes() + offset_past_end)); @@ -231,9 +300,7 @@ class LogTest { void CheckInitialOffsetRecord(uint64_t initial_offset, int expected_record_offset) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, initial_offset)); @@ -520,6 +587,70 @@ TEST(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } +TEST(LogTest, ClearEofSingleBlock) { + Write("foo"); + Write("bar"); + ForceEOF(3 + kHeaderSize + 2); + ASSERT_EQ("foo", Read()); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_TRUE(IsEOF()); + ASSERT_EQ("EOF", Read()); + Write("xxx"); + UnmarkEOF(); + ASSERT_EQ("xxx", Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofMultiBlock) { + size_t num_full_blocks = 5; + size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25; + Write(BigString("foo", n)); + Write(BigString("bar", n)); + ForceEOF(n + num_full_blocks * kHeaderSize + 10); + ASSERT_EQ(BigString("foo", n), Read()); + ASSERT_TRUE(IsEOF()); + UnmarkEOF(); + ASSERT_EQ(BigString("bar", n), Read()); + ASSERT_TRUE(IsEOF()); + Write(BigString("xxx", n)); + UnmarkEOF(); + ASSERT_EQ(BigString("xxx", n), Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofError) { + // If an error occurs during Read() in UnmarkEOF(), the records contained + // in the buffer should be returned on subsequent calls of ReadRecord() + // until no more full records are left, whereafter ReadRecord() should return + // false to indicate that it cannot read any further. + + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + ASSERT_TRUE(IsEOF()); + Write("xxx"); + ForceError(0); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); +} + +TEST(LogTest, ClearEofError2) { + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + Write("xxx"); + ForceError(3); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ(3U, DroppedBytes()); + ASSERT_EQ("OK", MatchError("read error")); +} + } // namespace log } // namespace rocksdb diff --git a/db/version_set.cc b/db/version_set.cc index f6e9e6d5a..0a717de3a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1553,8 +1553,10 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } } - // find offset in manifest file where this version is stored. - new_manifest_file_size = descriptor_log_->file()->GetFileSize(); + if (s.ok()) { + // find offset in manifest file where this version is stored. + new_manifest_file_size = descriptor_log_->file()->GetFileSize(); + } LogFlush(options_->info_log); mu->Lock(); diff --git a/include/utilities/backupable_db.h b/include/utilities/backupable_db.h index fbe2ae8a3..ab3a1ed80 100644 --- a/include/utilities/backupable_db.h +++ b/include/utilities/backupable_db.h @@ -68,8 +68,6 @@ struct BackupableDBOptions { destroy_old_data(_destroy_old_data) { } }; -class BackupEngine; - typedef uint32_t BackupID; struct BackupInfo { @@ -82,6 +80,29 @@ struct BackupInfo { : backup_id(_backup_id), timestamp(_timestamp), size(_size) {} }; +// Please see the documentation in BackupableDB and RestoreBackupableDB +class BackupEngine { + public: + virtual ~BackupEngine() {} + + static BackupEngine* NewBackupEngine(Env* db_env, + const BackupableDBOptions& options); + + virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false) = 0; + virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0; + virtual Status DeleteBackup(BackupID backup_id) = 0; + virtual void StopBackup() = 0; + + virtual void GetBackupInfo(std::vector* backup_info) = 0; + virtual Status RestoreDBFromBackup(BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) = 0; + virtual Status RestoreDBFromLatestBackup(const std::string& db_dir, + const std::string& wal_dir) = 0; + + virtual void DeleteBackupsNewerThan(uint64_t sequence_number) = 0; +}; + // Stack your DB with BackupableDB to be able to backup the DB class BackupableDB : public StackableDB { public: diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 8e5ef5220..da225e22b 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -10,6 +10,7 @@ #include "utilities/backupable_db.h" #include "db/filename.h" #include "util/coding.h" +#include "util/crc32c.h" #include "rocksdb/transaction_log.h" #define __STDC_FORMAT_MACROS @@ -25,11 +26,11 @@ namespace rocksdb { -// -------- BackupEngine class --------- -class BackupEngine { +// -------- BackupEngineImpl class --------- +class BackupEngineImpl : public BackupEngine { public: - BackupEngine(Env* db_env, const BackupableDBOptions& options); - ~BackupEngine(); + BackupEngineImpl(Env* db_env, const BackupableDBOptions& options); + ~BackupEngineImpl(); Status CreateNewBackup(DB* db, bool flush_before_backup = false); Status PurgeOldBackups(uint32_t num_backups_to_keep); Status DeleteBackup(BackupID backup_id); @@ -48,12 +49,22 @@ class BackupEngine { void DeleteBackupsNewerThan(uint64_t sequence_number); private: + struct FileInfo { + FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum) + : refs(0), filename(fname), size(sz), checksum_value(checksum) {} + + int refs; + const std::string filename; + const uint64_t size; + uint32_t checksum_value; + }; + class BackupMeta { public: BackupMeta(const std::string& meta_filename, - std::unordered_map* file_refs, Env* env) + std::unordered_map* file_infos, Env* env) : timestamp_(0), size_(0), meta_filename_(meta_filename), - file_refs_(file_refs), env_(env) {} + file_infos_(file_infos), env_(env) {} ~BackupMeta() {} @@ -73,7 +84,8 @@ class BackupEngine { return sequence_number_; } - void AddFile(const std::string& filename, uint64_t size); + Status AddFile(const FileInfo& file_info); + void Delete(); bool Empty() { @@ -96,7 +108,7 @@ class BackupEngine { std::string const meta_filename_; // files with relative paths (without "/" prefix!!) std::vector files_; - std::unordered_map* file_refs_; + std::unordered_map* file_infos_; Env* env_; static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB @@ -141,6 +153,7 @@ class BackupEngine { Env* dst_env, bool sync, uint64_t* size = nullptr, + uint32_t* checksum_value = nullptr, uint64_t size_limit = 0); // if size_limit == 0, there is no size limit, copy everything Status BackupFile(BackupID backup_id, @@ -149,15 +162,21 @@ class BackupEngine { const std::string& src_dir, const std::string& src_fname, // starts with "/" uint64_t size_limit = 0); + + Status CalculateChecksum(const std::string& src, + Env* src_env, + uint64_t size_limit, + uint32_t* checksum_value); + // Will delete all the files we don't need anymore // If full_scan == true, it will do the full scan of files/ directory - // and delete all the files that are not referenced from backuped_file_refs_ + // and delete all the files that are not referenced from backuped_file_infos__ void GarbageCollection(bool full_scan); // backup state data BackupID latest_backup_id_; std::map backups_; - std::unordered_map backuped_file_refs_; + std::unordered_map backuped_file_infos_; std::vector obsolete_backups_; std::atomic stop_backup_; @@ -169,7 +188,13 @@ class BackupEngine { static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB }; -BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) +BackupEngine* BackupEngine::NewBackupEngine( + Env* db_env, const BackupableDBOptions& options) { + return new BackupEngineImpl(db_env, options); +} + +BackupEngineImpl::BackupEngineImpl(Env* db_env, + const BackupableDBOptions& options) : stop_backup_(false), options_(options), db_env_(db_env), @@ -198,7 +223,7 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) assert(backups_.find(backup_id) == backups_.end()); backups_.insert(std::make_pair( backup_id, BackupMeta(GetBackupMetaFile(backup_id), - &backuped_file_refs_, backup_env_))); + &backuped_file_infos_, backup_env_))); } if (options_.destroy_old_data) { // Destory old data @@ -252,11 +277,9 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) latest_backup_id_); } -BackupEngine::~BackupEngine() { - LogFlush(options_.info_log); -} +BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); } -void BackupEngine::DeleteBackupsNewerThan(uint64_t sequence_number) { +void BackupEngineImpl::DeleteBackupsNewerThan(uint64_t sequence_number) { for (auto backup : backups_) { if (backup.second.GetSequenceNumber() > sequence_number) { Log(options_.info_log, @@ -276,7 +299,7 @@ void BackupEngine::DeleteBackupsNewerThan(uint64_t sequence_number) { GarbageCollection(false); } -Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { +Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { Status s; std::vector live_files; VectorLogPtr live_wal_files; @@ -302,7 +325,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { assert(backups_.find(new_backup_id) == backups_.end()); auto ret = backups_.insert(std::make_pair( new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id), - &backuped_file_refs_, backup_env_))); + &backuped_file_infos_, backup_env_))); assert(ret.second == true); auto& new_backup = ret.first->second; new_backup.RecordTimestamp(); @@ -386,7 +409,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { return s; } -Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) { +Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) { Log(options_.info_log, "Purging old backups, keeping %u", num_backups_to_keep); while (num_backups_to_keep < backups_.size()) { @@ -399,7 +422,7 @@ Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) { return Status::OK(); } -Status BackupEngine::DeleteBackup(BackupID backup_id) { +Status BackupEngineImpl::DeleteBackup(BackupID backup_id) { Log(options_.info_log, "Deleting backup %u", backup_id); auto backup = backups_.find(backup_id); if (backup == backups_.end()) { @@ -412,7 +435,7 @@ Status BackupEngine::DeleteBackup(BackupID backup_id) { return Status::OK(); } -void BackupEngine::GetBackupInfo(std::vector* backup_info) { +void BackupEngineImpl::GetBackupInfo(std::vector* backup_info) { backup_info->reserve(backups_.size()); for (auto& backup : backups_) { if (!backup.second.Empty()) { @@ -422,9 +445,9 @@ void BackupEngine::GetBackupInfo(std::vector* backup_info) { } } -Status BackupEngine::RestoreDBFromBackup(BackupID backup_id, - const std::string &db_dir, - const std::string &wal_dir) { +Status BackupEngineImpl::RestoreDBFromBackup(BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) { auto backup_itr = backups_.find(backup_id); if (backup_itr == backups_.end()) { return Status::NotFound("Backup not found"); @@ -478,10 +501,19 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id, "/" + dst; Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); - s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false); + uint32_t checksum_value; + s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false, + nullptr /* size */, &checksum_value); if (!s.ok()) { break; } + + const auto iter = backuped_file_infos_.find(file); + assert(iter != backuped_file_infos_.end()); + if (iter->second.checksum_value != checksum_value) { + s = Status::Corruption("Checksum check failed"); + break; + } } Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str()); @@ -489,7 +521,7 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id, } // latest backup id is an ASCII representation of latest backup id -Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) { +Status BackupEngineImpl::GetLatestBackupFileContents(uint32_t* latest_backup) { Status s; unique_ptr file; s = backup_env_->NewSequentialFile(GetLatestBackupFile(), @@ -519,7 +551,7 @@ Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) { // writing 4 bytes to the file is atomic alright, but we should *never* // do something like 1. delete file, 2. write new file // We write to a tmp file and then atomically rename -Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) { +Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) { Status s; unique_ptr file; EnvOptions env_options; @@ -549,13 +581,11 @@ Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) { return s; } -Status BackupEngine::CopyFile(const std::string& src, - const std::string& dst, - Env* src_env, - Env* dst_env, - bool sync, - uint64_t* size, - uint64_t size_limit) { +Status BackupEngineImpl::CopyFile(const std::string& src, + const std::string& dst, Env* src_env, + Env* dst_env, bool sync, uint64_t* size, + uint32_t* checksum_value, + uint64_t size_limit) { Status s; unique_ptr dst_file; unique_ptr src_file; @@ -564,6 +594,9 @@ Status BackupEngine::CopyFile(const std::string& src, if (size != nullptr) { *size = 0; } + if (checksum_value != nullptr) { + *checksum_value = 0; + } // Check if size limit is set. if not, set it to very big number if (size_limit == 0) { @@ -589,12 +622,19 @@ Status BackupEngine::CopyFile(const std::string& src, copy_file_buffer_size_ : size_limit; s = src_file->Read(buffer_to_read, &data, buf.get()); size_limit -= data.size(); + + if (!s.ok()) { + return s; + } + if (size != nullptr) { *size += data.size(); } - if (s.ok()) { - s = dst_file->Append(data); + if (checksum_value != nullptr) { + *checksum_value = crc32c::Extend(*checksum_value, data.data(), + data.size()); } + s = dst_file->Append(data); } while (s.ok() && data.size() > 0 && size_limit > 0); if (s.ok() && sync) { @@ -605,12 +645,10 @@ Status BackupEngine::CopyFile(const std::string& src, } // src_fname will always start with "/" -Status BackupEngine::BackupFile(BackupID backup_id, - BackupMeta* backup, - bool shared, - const std::string& src_dir, - const std::string& src_fname, - uint64_t size_limit) { +Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, + bool shared, const std::string& src_dir, + const std::string& src_fname, + uint64_t size_limit) { assert(src_fname.size() > 0 && src_fname[0] == '/'); std::string dst_relative = src_fname.substr(1); @@ -629,9 +667,15 @@ Status BackupEngine::BackupFile(BackupID backup_id, // if it's shared, we also need to check if it exists -- if it does, // no need to copy it again + uint32_t checksum_value = 0; if (shared && backup_env_->FileExists(dst_path)) { backup_env_->GetFileSize(dst_path, &size); // Ignore error - Log(options_.info_log, "%s already present", src_fname.c_str()); + Log(options_.info_log, "%s already present, calculate checksum", + src_fname.c_str()); + s = CalculateChecksum(src_dir + src_fname, + db_env_, + size_limit, + &checksum_value); } else { Log(options_.info_log, "Copying %s", src_fname.c_str()); s = CopyFile(src_dir + src_fname, @@ -640,22 +684,62 @@ Status BackupEngine::BackupFile(BackupID backup_id, backup_env_, options_.sync, &size, + &checksum_value, size_limit); if (s.ok() && shared) { s = backup_env_->RenameFile(dst_path_tmp, dst_path); } } if (s.ok()) { - backup->AddFile(dst_relative, size); + s = backup->AddFile(FileInfo(dst_relative, size, checksum_value)); } return s; } -void BackupEngine::GarbageCollection(bool full_scan) { +Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env, + uint64_t size_limit, + uint32_t* checksum_value) { + *checksum_value = 0; + if (size_limit == 0) { + size_limit = std::numeric_limits::max(); + } + + EnvOptions env_options; + env_options.use_mmap_writes = false; + + std::unique_ptr src_file; + Status s = src_env->NewSequentialFile(src, &src_file, env_options); + if (!s.ok()) { + return s; + } + + std::unique_ptr buf(new char[copy_file_buffer_size_]); + Slice data; + + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return Status::Incomplete("Backup stopped"); + } + size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ? + copy_file_buffer_size_ : size_limit; + s = src_file->Read(buffer_to_read, &data, buf.get()); + + if (!s.ok()) { + return s; + } + + size_limit -= data.size(); + *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size()); + } while (data.size() > 0 && size_limit > 0); + + return s; +} + +void BackupEngineImpl::GarbageCollection(bool full_scan) { Log(options_.info_log, "Starting garbage collection"); std::vector to_delete; - for (auto& itr : backuped_file_refs_) { - if (itr.second == 0) { + for (auto& itr : backuped_file_infos_) { + if (itr.second.refs == 0) { Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first)); Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), s.ToString().c_str()); @@ -663,7 +747,7 @@ void BackupEngine::GarbageCollection(bool full_scan) { } } for (auto& td : to_delete) { - backuped_file_refs_.erase(td); + backuped_file_infos_.erase(td); } if (!full_scan) { // take care of private dirs -- if full_scan == true, then full_scan will @@ -686,7 +770,7 @@ void BackupEngine::GarbageCollection(bool full_scan) { for (auto& child : shared_children) { std::string rel_fname = GetSharedFileRel(child); // if it's not refcounted, delete it - if (backuped_file_refs_.find(rel_fname) == backuped_file_refs_.end()) { + if (backuped_file_infos_.find(rel_fname) == backuped_file_infos_.end()) { // this might be a directory, but DeleteFile will just fail in that // case, so we're good Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname)); @@ -731,23 +815,34 @@ void BackupEngine::GarbageCollection(bool full_scan) { // ------- BackupMeta class -------- -void BackupEngine::BackupMeta::AddFile(const std::string& filename, - uint64_t size) { - size_ += size; - files_.push_back(filename); - auto itr = file_refs_->find(filename); - if (itr == file_refs_->end()) { - file_refs_->insert(std::make_pair(filename, 1)); +Status BackupEngineImpl::BackupMeta::AddFile(const FileInfo& file_info) { + size_ += file_info.size; + files_.push_back(file_info.filename); + + auto itr = file_infos_->find(file_info.filename); + if (itr == file_infos_->end()) { + auto ret = file_infos_->insert({file_info.filename, file_info}); + if (ret.second) { + ret.first->second.refs = 1; + } else { + // if this happens, something is seriously wrong + return Status::Corruption("In memory metadata insertion error"); + } } else { - ++itr->second; // increase refcount if already present + if (itr->second.checksum_value != file_info.checksum_value) { + return Status::Corruption("Checksum mismatch for existing backup file"); + } + ++itr->second.refs; // increase refcount if already present } + + return Status::OK(); } -void BackupEngine::BackupMeta::Delete() { - for (auto& file : files_) { - auto itr = file_refs_->find(file); - assert(itr != file_refs_->end()); - --(itr->second); // decrease refcount +void BackupEngineImpl::BackupMeta::Delete() { + for (const auto& file : files_) { + auto itr = file_infos_->find(file); + assert(itr != file_infos_->end()); + --(itr->second.refs); // decrease refcount } files_.clear(); // delete meta file @@ -759,11 +854,12 @@ void BackupEngine::BackupMeta::Delete() { // // // -// -// +// +// // ... // TODO: maybe add checksum? -Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { +Status BackupEngineImpl::BackupMeta::LoadFromFile( + const std::string& backup_dir) { assert(Empty()); Status s; unique_ptr backup_meta_file; @@ -790,25 +886,47 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { sscanf(data.data(), "%u%n", &num_files, &bytes_read); data.remove_prefix(bytes_read + 1); // +1 for '\n' - std::vector> files; + std::vector files; for (uint32_t i = 0; s.ok() && i < num_files; ++i) { - std::string filename = GetSliceUntil(&data, '\n').ToString(); + auto line = GetSliceUntil(&data, '\n'); + std::string filename = GetSliceUntil(&line, ' ').ToString(); + uint64_t size; s = env_->GetFileSize(backup_dir + "/" + filename, &size); - files.push_back(std::make_pair(filename, size)); + + if (line.empty()) { + return Status::Corruption("File checksum is missing"); + } + + uint32_t checksum_value = 0; + if (line.starts_with("crc32 ")) { + line.remove_prefix(6); + sscanf(line.data(), "%u", &checksum_value); + if (memcmp(line.data(), std::to_string(checksum_value).c_str(), + line.size() - 1) != 0) { + return Status::Corruption("Invalid checksum value"); + } + } else { + return Status::Corruption("Unknown checksum type"); + } + + files.emplace_back(filename, size, checksum_value); } if (s.ok()) { - for (auto file : files) { - AddFile(file.first, file.second); + for (const auto& file_info : files) { + s = AddFile(file_info); + if (!s.ok()) { + break; + } } } return s; } -Status BackupEngine::BackupMeta::StoreToFile(bool sync) { +Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) { Status s; unique_ptr backup_meta_file; EnvOptions env_options; @@ -825,8 +943,13 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) { len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n", sequence_number_); len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size()); - for (size_t i = 0; i < files_.size(); ++i) { - len += snprintf(buf.get() + len, buf_size - len, "%s\n", files_[i].c_str()); + for (const auto& file : files_) { + const auto& iter = file_infos_->find(file); + + assert(iter != file_infos_->end()); + // use crc32 for now, switch to something else if needed + len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n", + file.c_str(), iter->second.checksum_value); } s = backup_meta_file->Append(Slice(buf.get(), (size_t)len)); @@ -845,7 +968,8 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) { // --- BackupableDB methods -------- BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options) - : StackableDB(db), backup_engine_(new BackupEngine(db->GetEnv(), options)) { + : StackableDB(db), + backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) { if (options.share_table_files) { backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber()); } @@ -879,7 +1003,7 @@ void BackupableDB::StopBackup() { RestoreBackupableDB::RestoreBackupableDB(Env* db_env, const BackupableDBOptions& options) - : backup_engine_(new BackupEngine(db_env, options)) {} + : backup_engine_(new BackupEngineImpl(db_env, options)) {} RestoreBackupableDB::~RestoreBackupableDB() { delete backup_engine_; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 1b9175ae2..91f66786a 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -156,7 +156,6 @@ class TestEnv : public EnvWrapper { Status NewSequentialFile(const std::string& f, unique_ptr* r, const EnvOptions& options) { - opened_files_.push_back(f); if (dummy_sequential_file_) { r->reset(new TestEnv::DummySequentialFile()); return Status::OK(); @@ -167,6 +166,7 @@ class TestEnv : public EnvWrapper { Status NewWritableFile(const std::string& f, unique_ptr* r, const EnvOptions& options) { + written_files_.push_back(f); if (limit_written_files_ <= 0) { return Status::IOError("Sorry, can't do this"); } @@ -174,14 +174,14 @@ class TestEnv : public EnvWrapper { return EnvWrapper::NewWritableFile(f, r, options); } - void AssertOpenedFiles(std::vector& should_have_opened) { - sort(should_have_opened.begin(), should_have_opened.end()); - sort(opened_files_.begin(), opened_files_.end()); - ASSERT_TRUE(opened_files_ == should_have_opened); + void AssertWrittenFiles(std::vector& should_have_written) { + sort(should_have_written.begin(), should_have_written.end()); + sort(written_files_.begin(), written_files_.end()); + ASSERT_TRUE(written_files_ == should_have_written); } - void ClearOpenedFiles() { - opened_files_.clear(); + void ClearWrittenFiles() { + written_files_.clear(); } void SetLimitWrittenFiles(uint64_t limit) { @@ -194,7 +194,7 @@ class TestEnv : public EnvWrapper { private: bool dummy_sequential_file_ = false; - std::vector opened_files_; + std::vector written_files_; uint64_t limit_written_files_ = 1000000; }; // TestEnv @@ -241,6 +241,46 @@ class FileManager : public EnvWrapper { return s; } + Status CorruptChecksum(const std::string& fname, bool appear_valid) { + std::string metadata; + Status s = ReadFileToString(this, fname, &metadata); + if (!s.ok()) { + return s; + } + s = DeleteFile(fname); + if (!s.ok()) { + return s; + } + + std::vector positions; + auto pos = metadata.find(" crc32 "); + if (pos == std::string::npos) { + return Status::Corruption("checksum not found"); + } + do { + positions.push_back(pos); + pos = metadata.find(" crc32 ", pos + 6); + } while (pos != std::string::npos); + + pos = positions[rnd_.Next() % positions.size()]; + if (metadata.size() < pos + 7) { + return Status::Corruption("bad CRC32 checksum value"); + } + + if (appear_valid) { + if (metadata[pos + 8] == '\n') { + // single digit value, safe to insert one more digit + metadata.insert(pos + 8, 1, '0'); + } else { + metadata.erase(pos + 8, 1); + } + } else { + metadata[pos + 7] = 'a'; + } + + return WriteToFile(fname, metadata); + } + Status WriteToFile(const std::string& fname, const std::string& data) { unique_ptr file; EnvOptions env_options; @@ -251,6 +291,7 @@ class FileManager : public EnvWrapper { } return file->Append(Slice(data)); } + private: Random rnd_; }; // FileManager @@ -414,30 +455,43 @@ TEST(BackupableDBTest, NoDoubleCopy) { // should write 5 DB files + LATEST_BACKUP + one meta file test_backup_env_->SetLimitWrittenFiles(7); - test_db_env_->ClearOpenedFiles(); + test_backup_env_->ClearWrittenFiles(); test_db_env_->SetLimitWrittenFiles(0); dummy_db_->live_files_ = { "/00010.sst", "/00011.sst", "/CURRENT", "/MANIFEST-01" }; dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; ASSERT_OK(db_->CreateNewBackup(false)); - std::vector should_have_openened = dummy_db_->live_files_; - should_have_openened.push_back("/00011.log"); - AppendPath(dbname_, should_have_openened); - test_db_env_->AssertOpenedFiles(should_have_openened); + std::vector should_have_written = { + "/shared/00010.sst.tmp", + "/shared/00011.sst.tmp", + "/private/1.tmp/CURRENT", + "/private/1.tmp/MANIFEST-01", + "/private/1.tmp/00011.log", + "/meta/1.tmp", + "/LATEST_BACKUP.tmp" + }; + AppendPath(dbname_ + "_backup", should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); // should write 4 new DB files + LATEST_BACKUP + one meta file // should not write/copy 00010.sst, since it's already there! test_backup_env_->SetLimitWrittenFiles(6); - test_db_env_->ClearOpenedFiles(); + test_backup_env_->ClearWrittenFiles(); dummy_db_->live_files_ = { "/00010.sst", "/00015.sst", "/CURRENT", "/MANIFEST-01" }; dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; ASSERT_OK(db_->CreateNewBackup(false)); // should not open 00010.sst - it's already there - should_have_openened = { "/00015.sst", "/CURRENT", - "/MANIFEST-01", "/00011.log" }; - AppendPath(dbname_, should_have_openened); - test_db_env_->AssertOpenedFiles(should_have_openened); + should_have_written = { + "/shared/00015.sst.tmp", + "/private/2.tmp/CURRENT", + "/private/2.tmp/MANIFEST-01", + "/private/2.tmp/00011.log", + "/meta/2.tmp", + "/LATEST_BACKUP.tmp" + }; + AppendPath(dbname_ + "_backup", should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); ASSERT_OK(db_->DeleteBackup(1)); ASSERT_EQ(true, @@ -465,6 +519,8 @@ TEST(BackupableDBTest, NoDoubleCopy) { // 3. Corrupted backup meta file or missing backuped file - we should // not be able to open that backup, but all other backups should be // fine +// 4. Corrupted checksum value - if the checksum is not a valid uint32_t, +// db open should fail, otherwise, it aborts during the restore process. TEST(BackupableDBTest, CorruptionsTest) { const int keys_iteration = 5000; Random rnd(6); @@ -521,12 +577,29 @@ TEST(BackupableDBTest, CorruptionsTest) { CloseRestoreDB(); ASSERT_TRUE(!s.ok()); - // new backup should be 4! + // --------- case 4. corrupted checksum value ---- + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false)); + // checksum of backup 3 is an invalid value, this can be detected at + // db open time, and it reverts to the previous backup automatically + AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5); + // checksum of the backup 2 appears to be valid, this can cause checksum + // mismatch and abort restore process + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true)); + ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2")); + OpenRestoreDB(); + ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2")); + s = restore_db_->RestoreDBFromBackup(2, dbname_, dbname_); + ASSERT_TRUE(!s.ok()); + ASSERT_OK(restore_db_->DeleteBackup(2)); + CloseRestoreDB(); + AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5); + + // new backup should be 2! OpenBackupableDB(); - FillDB(db_.get(), keys_iteration * 3, keys_iteration * 4); + FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2); ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2))); CloseBackupableDB(); - AssertBackupConsistency(4, 0, keys_iteration * 4, keys_iteration * 5); + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5); } // open DB, write, close DB, backup, restore, repeat