From 78711524b75955052f240f50a398a1ee53bd2226 Mon Sep 17 00:00:00 2001 From: zensan Date: Wed, 30 Mar 2016 23:05:22 +0530 Subject: [PATCH 1/3] In all the places where log records are read, there was a check that record.size() should not be less than 12. This "magic number" seems to be the WriteBatch header (8 byte sequence and 4 byte count). Replaced all the places where "12" was used by WriteBatchInternal::kHeader. --- db/db_impl.cc | 2 +- db/repair.cc | 2 +- db/transaction_log_impl.cc | 4 ++-- db/wal_manager.cc | 2 +- db/write_batch.cc | 6 ++---- db/write_batch_internal.h | 4 ++++ tools/ldb_cmd.cc | 2 +- 7 files changed, 12 insertions(+), 10 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 05051032e..6124313b8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1186,7 +1186,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, continue_replay_log && reader.ReadRecord(&record, &scratch, db_options_.wal_recovery_mode) && status.ok()) { - if (record.size() < 12) { + if (record.size() < WriteBatchInternal::kHeader) { reporter.Corruption(record.size(), Status::Corruption("log record too small")); continue; diff --git a/db/repair.cc b/db/repair.cc index 6aa72f792..1681fd585 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -265,7 +265,7 @@ class Repairer { mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { - if (record.size() < 12) { + if (record.size() < WriteBatchInternal::kHeader) { reporter.Corruption( record.size(), Status::Corruption("log record too small")); continue; diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 624a3af99..8002d935f 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -107,7 +107,7 @@ void TransactionLogIteratorImpl::SeekToStartSequence( return; } while (RestrictedRead(&record, &scratch)) { - if (record.size() < 12) { + if (record.size() < WriteBatchInternal::kHeader) { reporter_.Corruption( record.size(), Status::Corruption("very small log record")); continue; @@ -167,7 +167,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { currentLogReader_->UnmarkEOF(); } while (RestrictedRead(&record, &scratch)) { - if (record.size() < 12) { + if (record.size() < WriteBatchInternal::kHeader) { reporter_.Corruption( record.size(), Status::Corruption("very small log record")); continue; diff --git a/db/wal_manager.cc b/db/wal_manager.cc index e1d911e6e..da4f33aab 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -455,7 +455,7 @@ Status WalManager::ReadFirstLine(const std::string& fname, if (reader.ReadRecord(&record, &scratch) && (status.ok() || !db_options_.paranoid_checks)) { - if (record.size() < 12) { + if (record.size() < WriteBatchInternal::kHeader) { reporter.Corruption(record.size(), Status::Corruption("log record too small")); // TODO read record's till the first no corrupt entry? diff --git a/db/write_batch.cc b/db/write_batch.cc index 3742ae694..4b8f87a5e 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -81,8 +81,6 @@ struct BatchContentClassifier : public WriteBatch::Handler { } // anon namespace -// WriteBatch header has an 8-byte sequence number followed by a 4-byte count. -static const size_t kHeader = 12; struct SavePoint { size_t size; // size of rep_ @@ -96,8 +94,8 @@ struct SavePoints { WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr), content_flags_(0), rep_() { - rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader); - rep_.resize(kHeader); + rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? reserved_bytes : WriteBatchInternal::kHeader); + rep_.resize(WriteBatchInternal::kHeader); } WriteBatch::WriteBatch(const std::string& rep) diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 1e7f61e69..3987645ef 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -63,6 +63,10 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { // WriteBatch that we don't want in the public WriteBatch interface. class WriteBatchInternal { public: + + // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. + static const size_t kHeader = 12; + // WriteBatch methods with column_family_id instead of ColumnFamilyHandle* static void Put(WriteBatch* batch, uint32_t column_family_id, const Slice& key, const Slice& value); diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index be743955d..2f4c3e2ad 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1576,7 +1576,7 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values, } while (reader.ReadRecord(&record, &scratch)) { row.str(""); - if (record.size() < 12) { + if (record.size() < WriteBatchInternal::kHeader) { reporter.Corruption(record.size(), Status::Corruption("log record too small")); } else { From 24420947d9e144f52a931a98a43e64228de9ac4c Mon Sep 17 00:00:00 2001 From: Sandeep Joshi Date: Wed, 30 Mar 2016 23:13:00 +0530 Subject: [PATCH 2/3] Replace kHeader by WriteBatchInternal::kHeader in few more places kHeader was moved from write_batch.cc to header file because it is being used wherever the number "12" was being used to check for record size --- db/write_batch.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/db/write_batch.cc b/db/write_batch.cc index 4b8f87a5e..f27c81fe9 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -144,7 +144,7 @@ bool WriteBatch::Handler::Continue() { void WriteBatch::Clear() { rep_.clear(); - rep_.resize(kHeader); + rep_.resize(WriteBatchInternal::kHeader); content_flags_.store(0, std::memory_order_relaxed); @@ -247,11 +247,11 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, Status WriteBatch::Iterate(Handler* handler) const { Slice input(rep_); - if (input.size() < kHeader) { + if (input.size() < WriteBatchInternal::kHeader) { return Status::Corruption("malformed WriteBatch (too small)"); } - input.remove_prefix(kHeader); + input.remove_prefix(WriteBatchInternal::kHeader); Slice key, value, blob; int found = 0; Status s; @@ -327,7 +327,7 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } -size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { return kHeader; } +size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { return WriteBatchInternal::kHeader; } void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) { @@ -830,15 +830,15 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* batch, } void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { - assert(contents.size() >= kHeader); + assert(contents.size() >= WriteBatchInternal::kHeader); b->rep_.assign(contents.data(), contents.size()); b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); } void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { SetCount(dst, Count(dst) + Count(src)); - assert(src->rep_.size() >= kHeader); - dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); + assert(src->rep_.size() >= WriteBatchInternal::kHeader); + dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src->rep_.size() - WriteBatchInternal::kHeader); dst->content_flags_.store( dst->content_flags_.load(std::memory_order_relaxed) | src->content_flags_.load(std::memory_order_relaxed), @@ -850,7 +850,7 @@ size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize, if (leftByteSize == 0 || rightByteSize == 0) { return leftByteSize + rightByteSize; } else { - return leftByteSize + rightByteSize - kHeader; + return leftByteSize + rightByteSize - WriteBatchInternal::kHeader; } } From 63e8f1b55b2490e4c65140d84d8f606b27059e61 Mon Sep 17 00:00:00 2001 From: Sandeep Joshi Date: Thu, 31 Mar 2016 08:26:55 +0530 Subject: [PATCH 3/3] Formatted lines to adhere to 80 char limit --- db/write_batch.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/db/write_batch.cc b/db/write_batch.cc index f27c81fe9..73532a2da 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -94,7 +94,8 @@ struct SavePoints { WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr), content_flags_(0), rep_() { - rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? reserved_bytes : WriteBatchInternal::kHeader); + rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? + reserved_bytes : WriteBatchInternal::kHeader); rep_.resize(WriteBatchInternal::kHeader); } @@ -327,7 +328,9 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } -size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { return WriteBatchInternal::kHeader; } +size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { + return WriteBatchInternal::kHeader; +} void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) { @@ -838,7 +841,8 @@ void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { SetCount(dst, Count(dst) + Count(src)); assert(src->rep_.size() >= WriteBatchInternal::kHeader); - dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src->rep_.size() - WriteBatchInternal::kHeader); + dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, + src->rep_.size() - WriteBatchInternal::kHeader); dst->content_flags_.store( dst->content_flags_.load(std::memory_order_relaxed) | src->content_flags_.load(std::memory_order_relaxed),