From 3d33da75efa22332dde32e5f2c583b1b8b899a6a Mon Sep 17 00:00:00 2001 From: Schalk-Willem Kruger Date: Mon, 27 Jan 2014 14:49:10 -0800 Subject: [PATCH 1/6] Fix UnmarkEOF for partial blocks Summary: Blocks in the transaction log are a fixed size, but the last block in the transaction log file is usually a partial block. When a new record is added after the reader hit the end of the file, a new physical record will be appended to the last block. ReadPhysicalRecord can only read full blocks and assumes that the file position indicator is aligned to the start of a block. If the reader is forced to read further by simply clearing the EOF flag, ReadPhysicalRecord will read a full block starting from somewhere in the middle of a real block, causing it to lose alignment and to have a partial physical record at the end of the read buffer. This will result in length mismatches and checksum failures. When the log file is tailed for replication this will cause the log iterator to become invalid, necessitating the creation of a new iterator which will have to read the log file from scratch. This diff fixes this issue by reading the remaining portion of the last block we read from. This is done when the reader is forced to read further (UnmarkEOF is called). Test Plan: - Added unit tests - Stress test (with replication). Check dbdir/LOG file for corruptions. - Test on test tier Reviewers: emayanke, haobo, dhruba Reviewed By: haobo CC: vamsi, sheki, dhruba, kailiu, igor Differential Revision: https://reviews.facebook.net/D15249 --- db/log_reader.cc | 70 +++++++++++++++++- db/log_reader.h | 12 +++- db/log_test.cc | 179 ++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 232 insertions(+), 29 deletions(-) diff --git a/db/log_reader.cc b/db/log_reader.cc index 6596cd84f..1dc567413 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -28,6 +28,8 @@ Reader::Reader(unique_ptr&& file, Reporter* reporter, backing_store_(new char[kBlockSize]), buffer_(), eof_(false), + read_error_(false), + eof_offset_(0), last_record_offset_(0), end_of_buffer_offset_(0), initial_offset_(initial_offset) { @@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() { return last_record_offset_; } +void Reader::UnmarkEOF() { + if (read_error_) { + return; + } + + eof_ = false; + + if (eof_offset_ == 0) { + return; + } + + // If the EOF was in the middle of a block (a partial block was read) we have + // to read the rest of the block as ReadPhysicalRecord can only read full + // blocks and expects the file position indicator to be aligned to the start + // of a block. + // + // consumed_bytes + buffer_size() + remaining == kBlockSize + + size_t consumed_bytes = eof_offset_ - buffer_.size(); + size_t remaining = kBlockSize - eof_offset_; + + // backing_store_ is used to concatenate what is left in buffer_ and + // the remainder of the block. If buffer_ already uses backing_store_, + // we just append the new data. + if (buffer_.data() != backing_store_ + consumed_bytes) { + // Buffer_ does not use backing_store_ for storage. + // Copy what is left in buffer_ to backing_store. + memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size()); + } + + Slice read_buffer; + Status status = file_->Read(remaining, &read_buffer, + backing_store_ + eof_offset_); + + size_t added = read_buffer.size(); + end_of_buffer_offset_ += added; + + if (!status.ok()) { + if (added > 0) { + ReportDrop(added, status); + } + + read_error_ = true; + return; + } + + if (read_buffer.data() != backing_store_ + eof_offset_) { + // Read did not write to backing_store_ + memmove(backing_store_ + eof_offset_, read_buffer.data(), + read_buffer.size()); + } + + buffer_ = Slice(backing_store_ + consumed_bytes, + eof_offset_ + added - consumed_bytes); + + if (added < remaining) { + eof_ = true; + eof_offset_ += added; + } else { + eof_offset_ = 0; + } +} + void Reader::ReportCorruption(size_t bytes, const char* reason) { ReportDrop(bytes, Status::Corruption(reason)); } @@ -184,7 +249,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) { unsigned int Reader::ReadPhysicalRecord(Slice* result) { while (true) { if (buffer_.size() < (size_t)kHeaderSize) { - if (!eof_) { + if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); Status status = file_->Read(kBlockSize, &buffer_, backing_store_); @@ -192,10 +257,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { if (!status.ok()) { buffer_.clear(); ReportDrop(kBlockSize, status); - eof_ = true; + read_error_ = true; return kEof; } else if (buffer_.size() < (size_t)kBlockSize) { eof_ = true; + eof_offset_ = buffer_.size(); } continue; } else if (buffer_.size() == 0) { diff --git a/db/log_reader.h b/db/log_reader.h index 8e277c821..81d334da2 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -69,9 +69,10 @@ class Reader { // when we know more data has been written to the file. we can use this // function to force the reader to look again in the file. - void UnmarkEOF() { - eof_ = false; - } + // Also aligns the file position indicator to the start of the next block + // by reading the rest of the data from the EOF position to the end of the + // block that was partially read. + void UnmarkEOF(); SequentialFile* file() { return file_.get(); } @@ -82,6 +83,11 @@ class Reader { char* const backing_store_; Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize + bool read_error_; // Error occurred while reading from file + + // Offset of the file position indicator within the last block when an + // EOF was detected. + size_t eof_offset_; // Offset of the last record returned by ReadRecord. uint64_t last_record_offset_; diff --git a/db/log_test.cc b/db/log_test.cc index dedbff0aa..636518835 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -47,36 +47,93 @@ class LogTest { public: std::string contents_; + explicit StringDest(Slice& reader_contents) : + WritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) { + reader_contents_ = Slice(contents_.data(), 0); + }; + virtual Status Close() { return Status::OK(); } - virtual Status Flush() { return Status::OK(); } + virtual Status Flush() { + ASSERT_TRUE(reader_contents_.size() <= last_flush_); + size_t offset = last_flush_ - reader_contents_.size(); + reader_contents_ = Slice( + contents_.data() + offset, + contents_.size() - offset); + last_flush_ = contents_.size(); + + return Status::OK(); + } virtual Status Sync() { return Status::OK(); } virtual Status Append(const Slice& slice) { contents_.append(slice.data(), slice.size()); return Status::OK(); } + void Drop(size_t bytes) { + contents_.resize(contents_.size() - bytes); + reader_contents_ = Slice( + reader_contents_.data(), reader_contents_.size() - bytes); + last_flush_ = contents_.size(); + } + + private: + Slice& reader_contents_; + size_t last_flush_; }; class StringSource : public SequentialFile { public: - Slice contents_; + Slice& contents_; bool force_error_; + size_t force_error_position_; + bool force_eof_; + size_t force_eof_position_; bool returned_partial_; - StringSource() : force_error_(false), returned_partial_(false) { } + explicit StringSource(Slice& contents) : + contents_(contents), + force_error_(false), + force_error_position_(0), + force_eof_(false), + force_eof_position_(0), + returned_partial_(false) { } virtual Status Read(size_t n, Slice* result, char* scratch) { ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error"; if (force_error_) { - force_error_ = false; - returned_partial_ = true; - return Status::Corruption("read error"); + if (force_error_position_ >= n) { + force_error_position_ -= n; + } else { + *result = Slice(contents_.data(), force_error_position_); + contents_.remove_prefix(force_error_position_); + force_error_ = false; + returned_partial_ = true; + return Status::Corruption("read error"); + } } if (contents_.size() < n) { n = contents_.size(); returned_partial_ = true; } - *result = Slice(contents_.data(), n); + + if (force_eof_) { + if (force_eof_position_ >= n) { + force_eof_position_ -= n; + } else { + force_eof_ = false; + n = force_eof_position_; + returned_partial_ = true; + } + } + + // By using scratch we ensure that caller has control over the + // lifetime of result.data() + memcpy(scratch, contents_.data(), n); + *result = Slice(scratch, n); + contents_.remove_prefix(n); return Status::OK(); } @@ -123,10 +180,10 @@ class LogTest { src->contents_ = dest_contents(); } + Slice reader_contents_; unique_ptr dest_holder_; unique_ptr source_holder_; ReportCollector report_; - bool reading_; Writer writer_; Reader reader_; @@ -135,16 +192,15 @@ class LogTest { static uint64_t initial_offset_last_record_offsets_[]; public: - LogTest() : dest_holder_(new StringDest), - source_holder_(new StringSource), - reading_(false), + LogTest() : reader_contents_(), + dest_holder_(new StringDest(reader_contents_)), + source_holder_(new StringSource(reader_contents_)), writer_(std::move(dest_holder_)), reader_(std::move(source_holder_), &report_, true/*checksum*/, 0/*initial_offset*/) { } void Write(const std::string& msg) { - ASSERT_TRUE(!reading_) << "Write() after starting to read"; writer_.AddRecord(Slice(msg)); } @@ -153,10 +209,6 @@ class LogTest { } std::string Read() { - if (!reading_) { - reading_ = true; - reset_source_contents(); - } std::string scratch; Slice record; if (reader_.ReadRecord(&record, &scratch)) { @@ -175,7 +227,9 @@ class LogTest { } void ShrinkSize(int bytes) { - dest_contents().resize(dest_contents().size() - bytes); + auto dest = dynamic_cast(writer_.file()); + assert(dest); + dest->Drop(bytes); } void FixChecksum(int header_offset, int len) { @@ -185,9 +239,10 @@ class LogTest { EncodeFixed32(&dest_contents()[header_offset], crc); } - void ForceError() { + void ForceError(size_t position = 0) { auto src = dynamic_cast(reader_.file()); src->force_error_ = true; + src->force_error_position_ = position; } size_t DroppedBytes() const { @@ -198,6 +253,22 @@ class LogTest { return report_.message_; } + void ForceEOF(size_t position = 0) { + auto src = dynamic_cast(reader_.file()); + src->force_eof_ = true; + src->force_eof_position_ = position; + } + + void UnmarkEOF() { + auto src = dynamic_cast(reader_.file()); + src->returned_partial_ = false; + reader_.UnmarkEOF(); + } + + bool IsEOF() { + return reader_.IsEOF(); + } + // Returns OK iff recorded error message contains "msg" std::string MatchError(const std::string& msg) const { if (report_.message_.find(msg) == std::string::npos) { @@ -217,9 +288,7 @@ class LogTest { void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, WrittenBytes() + offset_past_end)); @@ -231,9 +300,7 @@ class LogTest { void CheckInitialOffsetRecord(uint64_t initial_offset, int expected_record_offset) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, initial_offset)); @@ -520,6 +587,70 @@ TEST(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } +TEST(LogTest, ClearEofSingleBlock) { + Write("foo"); + Write("bar"); + ForceEOF(3 + kHeaderSize + 2); + ASSERT_EQ("foo", Read()); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_TRUE(IsEOF()); + ASSERT_EQ("EOF", Read()); + Write("xxx"); + UnmarkEOF(); + ASSERT_EQ("xxx", Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofMultiBlock) { + size_t num_full_blocks = 5; + size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25; + Write(BigString("foo", n)); + Write(BigString("bar", n)); + ForceEOF(n + num_full_blocks * kHeaderSize + 10); + ASSERT_EQ(BigString("foo", n), Read()); + ASSERT_TRUE(IsEOF()); + UnmarkEOF(); + ASSERT_EQ(BigString("bar", n), Read()); + ASSERT_TRUE(IsEOF()); + Write(BigString("xxx", n)); + UnmarkEOF(); + ASSERT_EQ(BigString("xxx", n), Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofError) { + // If an error occurs during Read() in UnmarkEOF(), the records contained + // in the buffer should be returned on subsequent calls of ReadRecord() + // until no more full records are left, whereafter ReadRecord() should return + // false to indicate that it cannot read any further. + + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + ASSERT_TRUE(IsEOF()); + Write("xxx"); + ForceError(0); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); +} + +TEST(LogTest, ClearEofError2) { + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + Write("xxx"); + ForceError(3); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ(3U, DroppedBytes()); + ASSERT_EQ("OK", MatchError("read error")); +} + } // namespace log } // namespace rocksdb From 90f29ccbef4c49b7ecf80e97f50572d996d17fa8 Mon Sep 17 00:00:00 2001 From: Mark Callaghan Date: Wed, 22 Jan 2014 14:37:20 -0800 Subject: [PATCH 2/6] Update monitoring to include average time per compaction and stall Summary: The new columns are msComp and msStall that provide average time per compaction and stall for that level in milliseconds. Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ 0 8 15 1.5 2 0 30 0 0 30 0.0 0.0 15.5 0 0 0 0 16 112 0.2 1.3 7568 1 8 16 1.6 1 26 26 15 11 16 3.5 17.6 18.1 8 6 13 7 3 362 0.0 0.0 0 2 1 2 0.0 0 0 2 0 0 2 0.0 0.0 18.4 0 0 0 0 1 50 0.0 0.0 0 Task ID: # Blame Rev: Test Plan: run db_bench Revert Plan: Database Impact: Memcache Impact: Other Notes: EImportant: - begin *PUBLIC* platform impact section - Bugzilla: # - end platform impact - Reviewers: haobo Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D15345 --- db/db_impl.cc | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 50fb3da8f..a52e02abe 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3545,8 +3545,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { // Pardon the long line but I think it is easier to read this way. snprintf(buf, sizeof(buf), " Compactions\n" - "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall Stall-cnt\n" - "--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" + "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" + "------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" ); value->append(buf); for (int level = 0; level < current->NumberLevels(); level++) { @@ -3566,9 +3566,21 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { total_bytes_read += bytes_read; total_bytes_written += stats_[level].bytes_written; + uint64_t stalls = level == 0 ? + (stall_level0_slowdown_count_ + + stall_level0_num_files_count_ + + stall_memtable_compaction_count_) : + stall_leveln_slowdown_count_[level]; + + double stall_us = level == 0 ? + (stall_level0_slowdown_ + + stall_level0_num_files_ + + stall_memtable_compaction_) : + stall_leveln_slowdown_[level]; + snprintf( buf, sizeof(buf), - "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n", + "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f %9lu\n", level, files, current->NumLevelBytes(level) / 1048576.0, @@ -3590,8 +3602,13 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { stats_[level].files_out_levelnp1, stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, stats_[level].count, - stall_leveln_slowdown_[level] / 1000000.0, - (unsigned long) stall_leveln_slowdown_count_[level]); + (int) ((double) stats_[level].micros / + 1000.0 / + (stats_[level].count + 1)), + (double) stall_us / 1000.0 / (stalls + 1), + stall_us / 1000000.0, + (unsigned long) stalls); + total_slowdown += stall_leveln_slowdown_[level]; total_slowdown_count += stall_leveln_slowdown_count_[level]; value->append(buf); From 9dc29414e3b6e59da4f670e9af3318d8670d6745 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Tue, 28 Jan 2014 09:43:36 -0800 Subject: [PATCH 3/6] add checksum for backup files Summary: Keep checksum of each backuped file in meta file. When it restores these files, compute their checksum on the fly and compare against what is in the meta file. Fail the restore process if checksum mismatch. Test Plan: unit test Reviewers: haobo, igor, sdong, kailiu Reviewed By: igor CC: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D15381 --- utilities/backupable/backupable_db.cc | 200 +++++++++++++++++---- utilities/backupable/backupable_db_test.cc | 115 +++++++++--- 2 files changed, 256 insertions(+), 59 deletions(-) diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 8e5ef5220..6048082d8 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -10,6 +10,7 @@ #include "utilities/backupable_db.h" #include "db/filename.h" #include "util/coding.h" +#include "util/crc32c.h" #include "rocksdb/transaction_log.h" #define __STDC_FORMAT_MACROS @@ -48,12 +49,22 @@ class BackupEngine { void DeleteBackupsNewerThan(uint64_t sequence_number); private: + struct FileInfo { + FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum) + : refs(0), filename(fname), size(sz), checksum_value(checksum) {} + + int refs; + const std::string filename; + const uint64_t size; + uint32_t checksum_value; + }; + class BackupMeta { public: BackupMeta(const std::string& meta_filename, - std::unordered_map* file_refs, Env* env) + std::unordered_map* file_infos, Env* env) : timestamp_(0), size_(0), meta_filename_(meta_filename), - file_refs_(file_refs), env_(env) {} + file_infos_(file_infos), env_(env) {} ~BackupMeta() {} @@ -73,7 +84,8 @@ class BackupEngine { return sequence_number_; } - void AddFile(const std::string& filename, uint64_t size); + Status AddFile(const FileInfo& file_info); + void Delete(); bool Empty() { @@ -96,7 +108,7 @@ class BackupEngine { std::string const meta_filename_; // files with relative paths (without "/" prefix!!) std::vector files_; - std::unordered_map* file_refs_; + std::unordered_map* file_infos_; Env* env_; static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB @@ -141,6 +153,7 @@ class BackupEngine { Env* dst_env, bool sync, uint64_t* size = nullptr, + uint32_t* checksum_value = nullptr, uint64_t size_limit = 0); // if size_limit == 0, there is no size limit, copy everything Status BackupFile(BackupID backup_id, @@ -149,15 +162,21 @@ class BackupEngine { const std::string& src_dir, const std::string& src_fname, // starts with "/" uint64_t size_limit = 0); + + Status CalculateChecksum(const std::string& src, + Env* src_env, + uint64_t size_limit, + uint32_t* checksum_value); + // Will delete all the files we don't need anymore // If full_scan == true, it will do the full scan of files/ directory - // and delete all the files that are not referenced from backuped_file_refs_ + // and delete all the files that are not referenced from backuped_file_infos__ void GarbageCollection(bool full_scan); // backup state data BackupID latest_backup_id_; std::map backups_; - std::unordered_map backuped_file_refs_; + std::unordered_map backuped_file_infos_; std::vector obsolete_backups_; std::atomic stop_backup_; @@ -198,7 +217,7 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) assert(backups_.find(backup_id) == backups_.end()); backups_.insert(std::make_pair( backup_id, BackupMeta(GetBackupMetaFile(backup_id), - &backuped_file_refs_, backup_env_))); + &backuped_file_infos_, backup_env_))); } if (options_.destroy_old_data) { // Destory old data @@ -302,7 +321,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { assert(backups_.find(new_backup_id) == backups_.end()); auto ret = backups_.insert(std::make_pair( new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id), - &backuped_file_refs_, backup_env_))); + &backuped_file_infos_, backup_env_))); assert(ret.second == true); auto& new_backup = ret.first->second; new_backup.RecordTimestamp(); @@ -478,10 +497,19 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id, "/" + dst; Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); - s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false); + uint32_t checksum_value; + s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false, + nullptr /* size */, &checksum_value); if (!s.ok()) { break; } + + const auto iter = backuped_file_infos_.find(file); + assert(iter != backuped_file_infos_.end()); + if (iter->second.checksum_value != checksum_value) { + s = Status::Corruption("Checksum check failed"); + break; + } } Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str()); @@ -555,6 +583,7 @@ Status BackupEngine::CopyFile(const std::string& src, Env* dst_env, bool sync, uint64_t* size, + uint32_t* checksum_value, uint64_t size_limit) { Status s; unique_ptr dst_file; @@ -564,6 +593,9 @@ Status BackupEngine::CopyFile(const std::string& src, if (size != nullptr) { *size = 0; } + if (checksum_value != nullptr) { + *checksum_value = 0; + } // Check if size limit is set. if not, set it to very big number if (size_limit == 0) { @@ -589,12 +621,19 @@ Status BackupEngine::CopyFile(const std::string& src, copy_file_buffer_size_ : size_limit; s = src_file->Read(buffer_to_read, &data, buf.get()); size_limit -= data.size(); + + if (!s.ok()) { + return s; + } + if (size != nullptr) { *size += data.size(); } - if (s.ok()) { - s = dst_file->Append(data); + if (checksum_value != nullptr) { + *checksum_value = crc32c::Extend(*checksum_value, data.data(), + data.size()); } + s = dst_file->Append(data); } while (s.ok() && data.size() > 0 && size_limit > 0); if (s.ok() && sync) { @@ -629,9 +668,15 @@ Status BackupEngine::BackupFile(BackupID backup_id, // if it's shared, we also need to check if it exists -- if it does, // no need to copy it again + uint32_t checksum_value = 0; if (shared && backup_env_->FileExists(dst_path)) { backup_env_->GetFileSize(dst_path, &size); // Ignore error - Log(options_.info_log, "%s already present", src_fname.c_str()); + Log(options_.info_log, "%s already present, calculate checksum", + src_fname.c_str()); + s = CalculateChecksum(src_dir + src_fname, + db_env_, + size_limit, + &checksum_value); } else { Log(options_.info_log, "Copying %s", src_fname.c_str()); s = CopyFile(src_dir + src_fname, @@ -640,22 +685,63 @@ Status BackupEngine::BackupFile(BackupID backup_id, backup_env_, options_.sync, &size, + &checksum_value, size_limit); if (s.ok() && shared) { s = backup_env_->RenameFile(dst_path_tmp, dst_path); } } if (s.ok()) { - backup->AddFile(dst_relative, size); + s = backup->AddFile(FileInfo(dst_relative, size, checksum_value)); } return s; } +Status BackupEngine::CalculateChecksum(const std::string& src, + Env* src_env, + uint64_t size_limit, + uint32_t* checksum_value) { + *checksum_value = 0; + if (size_limit == 0) { + size_limit = std::numeric_limits::max(); + } + + EnvOptions env_options; + env_options.use_mmap_writes = false; + + std::unique_ptr src_file; + Status s = src_env->NewSequentialFile(src, &src_file, env_options); + if (!s.ok()) { + return s; + } + + std::unique_ptr buf(new char[copy_file_buffer_size_]); + Slice data; + + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return Status::Incomplete("Backup stopped"); + } + size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ? + copy_file_buffer_size_ : size_limit; + s = src_file->Read(buffer_to_read, &data, buf.get()); + + if (!s.ok()) { + return s; + } + + size_limit -= data.size(); + *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size()); + } while (data.size() > 0 && size_limit > 0); + + return s; +} + void BackupEngine::GarbageCollection(bool full_scan) { Log(options_.info_log, "Starting garbage collection"); std::vector to_delete; - for (auto& itr : backuped_file_refs_) { - if (itr.second == 0) { + for (auto& itr : backuped_file_infos_) { + if (itr.second.refs == 0) { Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first)); Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), s.ToString().c_str()); @@ -663,7 +749,7 @@ void BackupEngine::GarbageCollection(bool full_scan) { } } for (auto& td : to_delete) { - backuped_file_refs_.erase(td); + backuped_file_infos_.erase(td); } if (!full_scan) { // take care of private dirs -- if full_scan == true, then full_scan will @@ -686,7 +772,7 @@ void BackupEngine::GarbageCollection(bool full_scan) { for (auto& child : shared_children) { std::string rel_fname = GetSharedFileRel(child); // if it's not refcounted, delete it - if (backuped_file_refs_.find(rel_fname) == backuped_file_refs_.end()) { + if (backuped_file_infos_.find(rel_fname) == backuped_file_infos_.end()) { // this might be a directory, but DeleteFile will just fail in that // case, so we're good Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname)); @@ -731,23 +817,34 @@ void BackupEngine::GarbageCollection(bool full_scan) { // ------- BackupMeta class -------- -void BackupEngine::BackupMeta::AddFile(const std::string& filename, - uint64_t size) { - size_ += size; - files_.push_back(filename); - auto itr = file_refs_->find(filename); - if (itr == file_refs_->end()) { - file_refs_->insert(std::make_pair(filename, 1)); +Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) { + size_ += file_info.size; + files_.push_back(file_info.filename); + + auto itr = file_infos_->find(file_info.filename); + if (itr == file_infos_->end()) { + auto ret = file_infos_->insert({file_info.filename, file_info}); + if (ret.second) { + ret.first->second.refs = 1; + } else { + // if this happens, something is seriously wrong + return Status::Corruption("In memory metadata insertion error"); + } } else { - ++itr->second; // increase refcount if already present + if (itr->second.checksum_value != file_info.checksum_value) { + return Status::Corruption("Checksum mismatch for existing backup file"); + } + ++itr->second.refs; // increase refcount if already present } + + return Status::OK(); } void BackupEngine::BackupMeta::Delete() { - for (auto& file : files_) { - auto itr = file_refs_->find(file); - assert(itr != file_refs_->end()); - --(itr->second); // decrease refcount + for (const auto& file : files_) { + auto itr = file_infos_->find(file); + assert(itr != file_infos_->end()); + --(itr->second.refs); // decrease refcount } files_.clear(); // delete meta file @@ -759,8 +856,8 @@ void BackupEngine::BackupMeta::Delete() { // // // -// -// +// +// // ... // TODO: maybe add checksum? Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { @@ -790,18 +887,40 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { sscanf(data.data(), "%u%n", &num_files, &bytes_read); data.remove_prefix(bytes_read + 1); // +1 for '\n' - std::vector> files; + std::vector files; for (uint32_t i = 0; s.ok() && i < num_files; ++i) { - std::string filename = GetSliceUntil(&data, '\n').ToString(); + auto line = GetSliceUntil(&data, '\n'); + std::string filename = GetSliceUntil(&line, ' ').ToString(); + uint64_t size; s = env_->GetFileSize(backup_dir + "/" + filename, &size); - files.push_back(std::make_pair(filename, size)); + + if (line.empty()) { + return Status::Corruption("File checksum is missing"); + } + + uint32_t checksum_value = 0; + if (line.starts_with("crc32 ")) { + line.remove_prefix(6); + sscanf(line.data(), "%u", &checksum_value); + if (memcmp(line.data(), std::to_string(checksum_value).c_str(), + line.size() - 1) != 0) { + return Status::Corruption("Invalid checksum value"); + } + } else { + return Status::Corruption("Unknown checksum type"); + } + + files.emplace_back(filename, size, checksum_value); } if (s.ok()) { - for (auto file : files) { - AddFile(file.first, file.second); + for (const auto& file_info : files) { + s = AddFile(file_info); + if (!s.ok()) { + break; + } } } @@ -825,8 +944,13 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) { len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n", sequence_number_); len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size()); - for (size_t i = 0; i < files_.size(); ++i) { - len += snprintf(buf.get() + len, buf_size - len, "%s\n", files_[i].c_str()); + for (const auto& file : files_) { + const auto& iter = file_infos_->find(file); + + assert(iter != file_infos_->end()); + // use crc32 for now, switch to something else if needed + len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n", + file.c_str(), iter->second.checksum_value); } s = backup_meta_file->Append(Slice(buf.get(), (size_t)len)); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index de240558f..c5909f8e7 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -154,7 +154,6 @@ class TestEnv : public EnvWrapper { Status NewSequentialFile(const std::string& f, unique_ptr* r, const EnvOptions& options) { - opened_files_.push_back(f); if (dummy_sequential_file_) { r->reset(new TestEnv::DummySequentialFile()); return Status::OK(); @@ -165,6 +164,7 @@ class TestEnv : public EnvWrapper { Status NewWritableFile(const std::string& f, unique_ptr* r, const EnvOptions& options) { + written_files_.push_back(f); if (limit_written_files_ <= 0) { return Status::IOError("Sorry, can't do this"); } @@ -172,14 +172,14 @@ class TestEnv : public EnvWrapper { return EnvWrapper::NewWritableFile(f, r, options); } - void AssertOpenedFiles(std::vector& should_have_opened) { - sort(should_have_opened.begin(), should_have_opened.end()); - sort(opened_files_.begin(), opened_files_.end()); - ASSERT_TRUE(opened_files_ == should_have_opened); + void AssertWrittenFiles(std::vector& should_have_written) { + sort(should_have_written.begin(), should_have_written.end()); + sort(written_files_.begin(), written_files_.end()); + ASSERT_TRUE(written_files_ == should_have_written); } - void ClearOpenedFiles() { - opened_files_.clear(); + void ClearWrittenFiles() { + written_files_.clear(); } void SetLimitWrittenFiles(uint64_t limit) { @@ -192,7 +192,7 @@ class TestEnv : public EnvWrapper { private: bool dummy_sequential_file_ = false; - std::vector opened_files_; + std::vector written_files_; uint64_t limit_written_files_ = 1000000; }; // TestEnv @@ -239,6 +239,46 @@ class FileManager : public EnvWrapper { return s; } + Status CorruptChecksum(const std::string& fname, bool appear_valid) { + std::string metadata; + Status s = ReadFileToString(this, fname, &metadata); + if (!s.ok()) { + return s; + } + s = DeleteFile(fname); + if (!s.ok()) { + return s; + } + + std::vector positions; + auto pos = metadata.find(" crc32 "); + if (pos == std::string::npos) { + return Status::Corruption("checksum not found"); + } + do { + positions.push_back(pos); + pos = metadata.find(" crc32 ", pos + 6); + } while (pos != std::string::npos); + + pos = positions[rnd_.Next() % positions.size()]; + if (metadata.size() < pos + 7) { + return Status::Corruption("bad CRC32 checksum value"); + } + + if (appear_valid) { + if (metadata[pos + 8] == '\n') { + // single digit value, safe to insert one more digit + metadata.insert(pos + 8, 1, '0'); + } else { + metadata.erase(pos + 8, 1); + } + } else { + metadata[pos + 7] = 'a'; + } + + return WriteToFile(fname, metadata); + } + Status WriteToFile(const std::string& fname, const std::string& data) { unique_ptr file; EnvOptions env_options; @@ -249,6 +289,7 @@ class FileManager : public EnvWrapper { } return file->Append(Slice(data)); } + private: Random rnd_; }; // FileManager @@ -412,30 +453,43 @@ TEST(BackupableDBTest, NoDoubleCopy) { // should write 5 DB files + LATEST_BACKUP + one meta file test_backup_env_->SetLimitWrittenFiles(7); - test_db_env_->ClearOpenedFiles(); + test_backup_env_->ClearWrittenFiles(); test_db_env_->SetLimitWrittenFiles(0); dummy_db_->live_files_ = { "/00010.sst", "/00011.sst", "/CURRENT", "/MANIFEST-01" }; dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; ASSERT_OK(db_->CreateNewBackup(false)); - std::vector should_have_openened = dummy_db_->live_files_; - should_have_openened.push_back("/00011.log"); - AppendPath(dbname_, should_have_openened); - test_db_env_->AssertOpenedFiles(should_have_openened); + std::vector should_have_written = { + "/shared/00010.sst.tmp", + "/shared/00011.sst.tmp", + "/private/1.tmp/CURRENT", + "/private/1.tmp/MANIFEST-01", + "/private/1.tmp/00011.log", + "/meta/1.tmp", + "/LATEST_BACKUP.tmp" + }; + AppendPath(dbname_ + "_backup", should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); // should write 4 new DB files + LATEST_BACKUP + one meta file // should not write/copy 00010.sst, since it's already there! test_backup_env_->SetLimitWrittenFiles(6); - test_db_env_->ClearOpenedFiles(); + test_backup_env_->ClearWrittenFiles(); dummy_db_->live_files_ = { "/00010.sst", "/00015.sst", "/CURRENT", "/MANIFEST-01" }; dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; ASSERT_OK(db_->CreateNewBackup(false)); // should not open 00010.sst - it's already there - should_have_openened = { "/00015.sst", "/CURRENT", - "/MANIFEST-01", "/00011.log" }; - AppendPath(dbname_, should_have_openened); - test_db_env_->AssertOpenedFiles(should_have_openened); + should_have_written = { + "/shared/00015.sst.tmp", + "/private/2.tmp/CURRENT", + "/private/2.tmp/MANIFEST-01", + "/private/2.tmp/00011.log", + "/meta/2.tmp", + "/LATEST_BACKUP.tmp" + }; + AppendPath(dbname_ + "_backup", should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); ASSERT_OK(db_->DeleteBackup(1)); ASSERT_EQ(true, @@ -463,6 +517,8 @@ TEST(BackupableDBTest, NoDoubleCopy) { // 3. Corrupted backup meta file or missing backuped file - we should // not be able to open that backup, but all other backups should be // fine +// 4. Corrupted checksum value - if the checksum is not a valid uint32_t, +// db open should fail, otherwise, it aborts during the restore process. TEST(BackupableDBTest, CorruptionsTest) { const int keys_iteration = 5000; Random rnd(6); @@ -519,12 +575,29 @@ TEST(BackupableDBTest, CorruptionsTest) { CloseRestoreDB(); ASSERT_TRUE(!s.ok()); - // new backup should be 4! + // --------- case 4. corrupted checksum value ---- + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false)); + // checksum of backup 3 is an invalid value, this can be detected at + // db open time, and it reverts to the previous backup automatically + AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5); + // checksum of the backup 2 appears to be valid, this can cause checksum + // mismatch and abort restore process + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true)); + ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2")); + OpenRestoreDB(); + ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2")); + s = restore_db_->RestoreDBFromBackup(2, dbname_, dbname_); + ASSERT_TRUE(!s.ok()); + ASSERT_OK(restore_db_->DeleteBackup(2)); + CloseRestoreDB(); + AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5); + + // new backup should be 2! OpenBackupableDB(); - FillDB(db_.get(), keys_iteration * 3, keys_iteration * 4); + FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2); ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2))); CloseBackupableDB(); - AssertBackupConsistency(4, 0, keys_iteration * 4, keys_iteration * 5); + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5); } // open DB, write, close DB, backup, restore, repeat From ec2fa4a6908d9f470bf3eeaf159a8a7e009ed0e9 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 28 Jan 2014 11:26:07 -0800 Subject: [PATCH 4/6] Export BackupEngine Summary: Lots of clients have problems with using StackableDB interface. It's nice to have BackupableDB as a layer on top of DB, but not necessary. This diff exports BackupEngine, which can be used to create backups without forcing clients to use StackableDB interface. Test Plan: backupable_db_test Reviewers: dhruba, ljin, swk Reviewed By: ljin CC: leveldb, benj Differential Revision: https://reviews.facebook.net/D15477 --- include/utilities/backupable_db.h | 25 +++++++- utilities/backupable/backupable_db.cc | 86 +++++++++++++-------------- 2 files changed, 66 insertions(+), 45 deletions(-) diff --git a/include/utilities/backupable_db.h b/include/utilities/backupable_db.h index fbe2ae8a3..b6eb139e7 100644 --- a/include/utilities/backupable_db.h +++ b/include/utilities/backupable_db.h @@ -68,8 +68,6 @@ struct BackupableDBOptions { destroy_old_data(_destroy_old_data) { } }; -class BackupEngine; - typedef uint32_t BackupID; struct BackupInfo { @@ -82,6 +80,29 @@ struct BackupInfo { : backup_id(_backup_id), timestamp(_timestamp), size(_size) {} }; +// Please see the documentation in BackupableDB and RestoreBackupableDB +class BackupEngine { + public: + virtual ~BackupEngine() {} + + virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false) = 0; + virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0; + virtual Status DeleteBackup(BackupID backup_id) = 0; + virtual void StopBackup() = 0; + + virtual void GetBackupInfo(std::vector* backup_info) = 0; + virtual Status RestoreDBFromBackup(BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) = 0; + virtual Status RestoreDBFromLatestBackup(const std::string& db_dir, + const std::string& wal_dir) = 0; + + virtual void DeleteBackupsNewerThan(uint64_t sequence_number) = 0; +}; + +extern BackupEngine* CreateNewBackupEngine(Env* db_env, + const BackupableDBOptions& options); + // Stack your DB with BackupableDB to be able to backup the DB class BackupableDB : public StackableDB { public: diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 6048082d8..d44fbb6ad 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -26,11 +26,11 @@ namespace rocksdb { -// -------- BackupEngine class --------- -class BackupEngine { +// -------- BackupEngineImpl class --------- +class BackupEngineImpl : public BackupEngine { public: - BackupEngine(Env* db_env, const BackupableDBOptions& options); - ~BackupEngine(); + BackupEngineImpl(Env* db_env, const BackupableDBOptions& options); + ~BackupEngineImpl(); Status CreateNewBackup(DB* db, bool flush_before_backup = false); Status PurgeOldBackups(uint32_t num_backups_to_keep); Status DeleteBackup(BackupID backup_id); @@ -188,7 +188,13 @@ class BackupEngine { static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB }; -BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) +BackupEngine* CreateNewBackupEngine(Env* db_env, + const BackupableDBOptions& options) { + return new BackupEngineImpl(db_env, options); +} + +BackupEngineImpl::BackupEngineImpl(Env* db_env, + const BackupableDBOptions& options) : stop_backup_(false), options_(options), db_env_(db_env), @@ -271,11 +277,9 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) latest_backup_id_); } -BackupEngine::~BackupEngine() { - LogFlush(options_.info_log); -} +BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); } -void BackupEngine::DeleteBackupsNewerThan(uint64_t sequence_number) { +void BackupEngineImpl::DeleteBackupsNewerThan(uint64_t sequence_number) { for (auto backup : backups_) { if (backup.second.GetSequenceNumber() > sequence_number) { Log(options_.info_log, @@ -295,7 +299,7 @@ void BackupEngine::DeleteBackupsNewerThan(uint64_t sequence_number) { GarbageCollection(false); } -Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { +Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { Status s; std::vector live_files; VectorLogPtr live_wal_files; @@ -405,7 +409,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { return s; } -Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) { +Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) { Log(options_.info_log, "Purging old backups, keeping %u", num_backups_to_keep); while (num_backups_to_keep < backups_.size()) { @@ -418,7 +422,7 @@ Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) { return Status::OK(); } -Status BackupEngine::DeleteBackup(BackupID backup_id) { +Status BackupEngineImpl::DeleteBackup(BackupID backup_id) { Log(options_.info_log, "Deleting backup %u", backup_id); auto backup = backups_.find(backup_id); if (backup == backups_.end()) { @@ -431,7 +435,7 @@ Status BackupEngine::DeleteBackup(BackupID backup_id) { return Status::OK(); } -void BackupEngine::GetBackupInfo(std::vector* backup_info) { +void BackupEngineImpl::GetBackupInfo(std::vector* backup_info) { backup_info->reserve(backups_.size()); for (auto& backup : backups_) { if (!backup.second.Empty()) { @@ -441,9 +445,9 @@ void BackupEngine::GetBackupInfo(std::vector* backup_info) { } } -Status BackupEngine::RestoreDBFromBackup(BackupID backup_id, - const std::string &db_dir, - const std::string &wal_dir) { +Status BackupEngineImpl::RestoreDBFromBackup(BackupID backup_id, + const std::string& db_dir, + const std::string& wal_dir) { auto backup_itr = backups_.find(backup_id); if (backup_itr == backups_.end()) { return Status::NotFound("Backup not found"); @@ -517,7 +521,7 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id, } // latest backup id is an ASCII representation of latest backup id -Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) { +Status BackupEngineImpl::GetLatestBackupFileContents(uint32_t* latest_backup) { Status s; unique_ptr file; s = backup_env_->NewSequentialFile(GetLatestBackupFile(), @@ -547,7 +551,7 @@ Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) { // writing 4 bytes to the file is atomic alright, but we should *never* // do something like 1. delete file, 2. write new file // We write to a tmp file and then atomically rename -Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) { +Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) { Status s; unique_ptr file; EnvOptions env_options; @@ -577,14 +581,11 @@ Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) { return s; } -Status BackupEngine::CopyFile(const std::string& src, - const std::string& dst, - Env* src_env, - Env* dst_env, - bool sync, - uint64_t* size, - uint32_t* checksum_value, - uint64_t size_limit) { +Status BackupEngineImpl::CopyFile(const std::string& src, + const std::string& dst, Env* src_env, + Env* dst_env, bool sync, uint64_t* size, + uint32_t* checksum_value, + uint64_t size_limit) { Status s; unique_ptr dst_file; unique_ptr src_file; @@ -644,12 +645,10 @@ Status BackupEngine::CopyFile(const std::string& src, } // src_fname will always start with "/" -Status BackupEngine::BackupFile(BackupID backup_id, - BackupMeta* backup, - bool shared, - const std::string& src_dir, - const std::string& src_fname, - uint64_t size_limit) { +Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, + bool shared, const std::string& src_dir, + const std::string& src_fname, + uint64_t size_limit) { assert(src_fname.size() > 0 && src_fname[0] == '/'); std::string dst_relative = src_fname.substr(1); @@ -697,10 +696,9 @@ Status BackupEngine::BackupFile(BackupID backup_id, return s; } -Status BackupEngine::CalculateChecksum(const std::string& src, - Env* src_env, - uint64_t size_limit, - uint32_t* checksum_value) { +Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env, + uint64_t size_limit, + uint32_t* checksum_value) { *checksum_value = 0; if (size_limit == 0) { size_limit = std::numeric_limits::max(); @@ -737,7 +735,7 @@ Status BackupEngine::CalculateChecksum(const std::string& src, return s; } -void BackupEngine::GarbageCollection(bool full_scan) { +void BackupEngineImpl::GarbageCollection(bool full_scan) { Log(options_.info_log, "Starting garbage collection"); std::vector to_delete; for (auto& itr : backuped_file_infos_) { @@ -817,7 +815,7 @@ void BackupEngine::GarbageCollection(bool full_scan) { // ------- BackupMeta class -------- -Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) { +Status BackupEngineImpl::BackupMeta::AddFile(const FileInfo& file_info) { size_ += file_info.size; files_.push_back(file_info.filename); @@ -840,7 +838,7 @@ Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) { return Status::OK(); } -void BackupEngine::BackupMeta::Delete() { +void BackupEngineImpl::BackupMeta::Delete() { for (const auto& file : files_) { auto itr = file_infos_->find(file); assert(itr != file_infos_->end()); @@ -860,7 +858,8 @@ void BackupEngine::BackupMeta::Delete() { // // ... // TODO: maybe add checksum? -Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { +Status BackupEngineImpl::BackupMeta::LoadFromFile( + const std::string& backup_dir) { assert(Empty()); Status s; unique_ptr backup_meta_file; @@ -927,7 +926,7 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { return s; } -Status BackupEngine::BackupMeta::StoreToFile(bool sync) { +Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) { Status s; unique_ptr backup_meta_file; EnvOptions env_options; @@ -969,7 +968,8 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) { // --- BackupableDB methods -------- BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options) - : StackableDB(db), backup_engine_(new BackupEngine(db->GetEnv(), options)) { + : StackableDB(db), + backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) { if (options.share_table_files) { backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber()); } @@ -1003,7 +1003,7 @@ void BackupableDB::StopBackup() { RestoreBackupableDB::RestoreBackupableDB(Env* db_env, const BackupableDBOptions& options) - : backup_engine_(new BackupEngine(db_env, options)) {} + : backup_engine_(new BackupEngineImpl(db_env, options)) {} RestoreBackupableDB::~RestoreBackupableDB() { delete backup_engine_; From e5ec7384a0f35bcff21e48b168ad32a4113b892e Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 28 Jan 2014 16:01:53 -0800 Subject: [PATCH 5/6] Better interface to create BackupEngine Summary: I think it looks nicer. In RocksDB we have both styles, but I think that static method is the more common version. Test Plan: backupable_db_test Reviewers: ljin, benj, swk Reviewed By: ljin CC: leveldb Differential Revision: https://reviews.facebook.net/D15519 --- include/utilities/backupable_db.h | 6 +++--- utilities/backupable/backupable_db.cc | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/utilities/backupable_db.h b/include/utilities/backupable_db.h index b6eb139e7..ab3a1ed80 100644 --- a/include/utilities/backupable_db.h +++ b/include/utilities/backupable_db.h @@ -85,6 +85,9 @@ class BackupEngine { public: virtual ~BackupEngine() {} + static BackupEngine* NewBackupEngine(Env* db_env, + const BackupableDBOptions& options); + virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false) = 0; virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0; virtual Status DeleteBackup(BackupID backup_id) = 0; @@ -100,9 +103,6 @@ class BackupEngine { virtual void DeleteBackupsNewerThan(uint64_t sequence_number) = 0; }; -extern BackupEngine* CreateNewBackupEngine(Env* db_env, - const BackupableDBOptions& options); - // Stack your DB with BackupableDB to be able to backup the DB class BackupableDB : public StackableDB { public: diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index d44fbb6ad..da225e22b 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -188,8 +188,8 @@ class BackupEngineImpl : public BackupEngine { static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB }; -BackupEngine* CreateNewBackupEngine(Env* db_env, - const BackupableDBOptions& options) { +BackupEngine* BackupEngine::NewBackupEngine( + Env* db_env, const BackupableDBOptions& options) { return new BackupEngineImpl(db_env, options); } From 5d2c62822e0feb4620b4c68194a2a1d43bf7ca6f Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 28 Jan 2014 16:02:51 -0800 Subject: [PATCH 6/6] Only get the manifest file size if there is no error Summary: I came across this while working on column families. CorruptionTest::RecoverWriteError threw a SIGSEG because the descriptor_log_->file() was nullptr. I'm not sure why it doesn't happen in master, but better safe than sorry. @kailiu, can we get this in release, too? Test Plan: make check Reviewers: kailiu, dhruba, haobo Reviewed By: haobo CC: leveldb, kailiu Differential Revision: https://reviews.facebook.net/D15513 --- db/version_set.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index bf778c9a9..18081d748 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1547,8 +1547,10 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, } } - // find offset in manifest file where this version is stored. - new_manifest_file_size = descriptor_log_->file()->GetFileSize(); + if (s.ok()) { + // find offset in manifest file where this version is stored. + new_manifest_file_size = descriptor_log_->file()->GetFileSize(); + } LogFlush(options_->info_log); mu->Lock();