diff --git a/HISTORY.md b/HISTORY.md index 746f1d83d..014384d5c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Fix potential file descriptor leakage in PosixEnv's IsDirectory() and NewRandomAccessFile(). * Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s). * In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early. +* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further. ## 6.10.2 (6/5/2020) ### Bug fix diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index e3ccf492b..e0beb14af 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1999,6 +1999,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) { } #endif // !ROCKSDB_LITE +TEST_F(DBBasicTest, ManifestChecksumMismatch) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + ASSERT_OK(Put("bar", "value")); + ASSERT_OK(Flush()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) { + auto* crc = reinterpret_cast(arg); + *crc = *crc + 1; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions write_opts; + write_opts.disableWAL = true; + Status s = db_->Put(write_opts, "foo", "value"); + ASSERT_OK(s); + ASSERT_OK(Flush()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_OK(Put("foo", "value1")); + ASSERT_OK(Flush()); + s = TryReopen(options); + ASSERT_TRUE(s.IsCorruption()); +} + class DBBasicTestMultiGet : public DBTestBase { public: DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache, diff --git a/db/log_writer.cc b/db/log_writer.cc index 04d3f64cc..e290eae62 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { // Compute the crc of the record type and the payload. crc = crc32c::Extend(crc, ptr, n); crc = crc32c::Mask(crc); // Adjust for storage + TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", + &crc); EncodeFixed32(buf, crc); // Write the header and the payload diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index cd5197878..bdab482ff 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler( assert(version_set_ != nullptr); } -Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) { +void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status, + std::string* db_id) { Slice record; std::string scratch; + assert(log_read_status); + assert(log_read_status->ok()); + size_t recovered_edits = 0; Status s = Initialize(); - while (reader.ReadRecord(&record, &scratch) && s.ok()) { + while (s.ok() && reader.ReadRecord(&record, &scratch) && + log_read_status->ok()) { VersionEdit edit; s = edit.DecodeFrom(record); if (!s.ok()) { @@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) { } } } + if (!log_read_status->ok()) { + s = *log_read_status; + } CheckIterationResult(reader, &s); if (!s.ok()) { status_ = s; } - return s; } Status VersionEditHandler::Initialize() { diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 7df940d6f..3c239bdf7 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -40,7 +40,8 @@ class VersionEditHandler { virtual ~VersionEditHandler() {} - Status Iterate(log::Reader& reader, std::string* db_id); + void Iterate(log::Reader& reader, Status* log_read_status, + std::string* db_id); const Status& status() const { return status_; } diff --git a/db/version_set.cc b/db/version_set.cc index e487b6a01..50e1d0c7e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -10,6 +10,7 @@ #include "db/version_set.h" #include + #include #include #include @@ -19,6 +20,7 @@ #include #include #include + #include "compaction/compaction.h" #include "db/internal_stats.h" #include "db/log_reader.h" @@ -50,6 +52,7 @@ #include "table/table_reader.h" #include "table/two_level_iterator.h" #include "test_util/sync_point.h" +#include "util/cast_util.h" #include "util/coding.h" #include "util/stop_watch.h" #include "util/string_util.h" @@ -4383,24 +4386,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname, if (dbname.back() != '/') { manifest_path->push_back('/'); } - *manifest_path += fname; + manifest_path->append(fname); return Status::OK(); } Status VersionSet::ReadAndRecover( - log::Reader* reader, AtomicGroupReadBuffer* read_buffer, + log::Reader& reader, AtomicGroupReadBuffer* read_buffer, const std::unordered_map& name_to_options, std::unordered_map& column_families_not_found, std::unordered_map>& builders, - VersionEditParams* version_edit_params, std::string* db_id) { - assert(reader != nullptr); + Status* log_read_status, VersionEditParams* version_edit_params, + std::string* db_id) { assert(read_buffer != nullptr); + assert(log_read_status != nullptr); Status s; Slice record; std::string scratch; size_t recovered_edits = 0; - while (reader->ReadRecord(&record, &scratch) && s.ok()) { + while (s.ok() && reader.ReadRecord(&record, &scratch) && + log_read_status->ok()) { VersionEdit edit; s = edit.DecodeFrom(record); if (!s.ok()) { @@ -4444,6 +4449,9 @@ Status VersionSet::ReadAndRecover( } } } + if (!log_read_status->ok()) { + s = *log_read_status; + } if (!s.ok()) { // Clear the buffer if we fail to decode/apply an edit. read_buffer->Clear(); @@ -4490,8 +4498,7 @@ Status VersionSet::Recover( db_options_->log_readahead_size)); } - std::unordered_map> - builders; + VersionBuilderMap builders; // add default column family auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); @@ -4513,12 +4520,13 @@ Status VersionSet::Recover( VersionEditParams version_edit_params; { VersionSet::LogReporter reporter; - reporter.status = &s; + Status log_read_status; + reporter.status = &log_read_status; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, true /* checksum */, 0 /* log_number */); AtomicGroupReadBuffer read_buffer; - s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options, - column_families_not_found, builders, + s = ReadAndRecover(reader, &read_buffer, cf_name_to_options, + column_families_not_found, builders, &log_read_status, &version_edit_params, db_id); current_manifest_file_size = reader.GetReadOffset(); assert(current_manifest_file_size != 0); @@ -4783,21 +4791,20 @@ Status VersionSet::TryRecoverFromOneManifest( db_options_->log_readahead_size)); } + assert(s.ok()); VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, /*checksum=*/true, /*log_num=*/0); - { - VersionEditHandlerPointInTime handler_pit(read_only, column_families, - const_cast(this)); + VersionEditHandlerPointInTime handler_pit(read_only, column_families, + const_cast(this)); - s = handler_pit.Iterate(reader, db_id); + handler_pit.Iterate(reader, &s, db_id); - assert(nullptr != has_missing_table_file); - *has_missing_table_file = handler_pit.HasMissingFiles(); - } + assert(nullptr != has_missing_table_file); + *has_missing_table_file = handler_pit.HasMissingFiles(); - return s; + return handler_pit.status(); } Status VersionSet::ListColumnFamilies(std::vector* column_families, @@ -5878,8 +5885,7 @@ Status ReactiveVersionSet::Recover( // In recovery, nobody else can access it, so it's fine to set it to be // initialized earlier. default_cfd->set_initialized(); - std::unordered_map> - builders; + VersionBuilderMap builders; std::unordered_map column_families_not_found; builders.insert( std::make_pair(0, std::unique_ptr( @@ -5887,7 +5893,7 @@ Status ReactiveVersionSet::Recover( manifest_reader_status->reset(new Status()); manifest_reporter->reset(new LogReporter()); - static_cast(manifest_reporter->get())->status = + static_cast_with_check(manifest_reporter->get())->status = manifest_reader_status->get(); Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader); log::Reader* reader = manifest_reader->get(); @@ -5896,10 +5902,9 @@ Status ReactiveVersionSet::Recover( VersionEdit version_edit; while (s.ok() && retry < 1) { assert(reader != nullptr); - Slice record; - std::string scratch; - s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options, - column_families_not_found, builders, &version_edit); + s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options, + column_families_not_found, builders, + manifest_reader_status->get(), &version_edit); if (s.ok()) { bool enough = version_edit.has_next_file_number_ && version_edit.has_log_number_ && diff --git a/db/version_set.h b/db/version_set.h index ccab5428b..75e82941d 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1102,6 +1102,10 @@ class VersionSet { void SetIOStatusOK() { io_status_ = IOStatus::OK(); } protected: + using VersionBuilderMap = + std::unordered_map>; + struct ManifestWriter; friend class Version; @@ -1113,7 +1117,9 @@ class VersionSet { struct LogReporter : public log::Reader::Reporter { Status* status; virtual void Corruption(size_t /*bytes*/, const Status& s) override { - if (this->status->ok()) *this->status = s; + if (status->ok()) { + *status = s; + } } }; @@ -1144,13 +1150,14 @@ class VersionSet { const VersionEdit* edit); Status ReadAndRecover( - log::Reader* reader, AtomicGroupReadBuffer* read_buffer, + log::Reader& reader, AtomicGroupReadBuffer* read_buffer, const std::unordered_map& name_to_options, std::unordered_map& column_families_not_found, std::unordered_map< uint32_t, std::unique_ptr>& builders, - VersionEditParams* version_edit, std::string* db_id = nullptr); + Status* log_read_status, VersionEditParams* version_edit, + std::string* db_id = nullptr); // REQUIRES db mutex Status ApplyOneVersionEditToBuilder( @@ -1279,8 +1286,7 @@ class ReactiveVersionSet : public VersionSet { std::unique_ptr* manifest_reader); private: - std::unordered_map> - active_version_builders_; + VersionBuilderMap active_version_builders_; AtomicGroupReadBuffer read_buffer_; // Number of version edits to skip by ReadAndApply at the beginning of a new // MANIFEST created by primary.