From 27c15fb67e44a84e7a32271aa0518e9304a341da Mon Sep 17 00:00:00 2001 From: Abhishek Kona Date: Thu, 21 Mar 2013 15:12:35 -0700 Subject: [PATCH] TransactionLogIter should stall at the last record. Currently it errors out Summary: * Add a method to check if the log reader is at EOF. * If we know a record has been flushed force the log_reader to believe it is not at EOF, using a new method UnMarkEof(). This does not work with MMpaed files. Test Plan: added a unit test. Reviewers: dhruba, heyongqiang Reviewed By: heyongqiang CC: leveldb Differential Revision: https://reviews.facebook.net/D9567 --- db/db_test.cc | 152 +++++++++++++++------------- db/log_reader.h | 11 ++ db/transaction_log_iterator_impl.cc | 39 +++---- 3 files changed, 111 insertions(+), 91 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index e092384af..143a1fc5c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -517,6 +517,26 @@ class DBTest { } return result; } + + Options OptionsForLogIterTest() { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.WAL_ttl_seconds = 1000; + return options; + } + + std::unique_ptr OpenTransactionLogIter( + const SequenceNumber seq) { + unique_ptr iter; + Status status = dbfull()->GetUpdatesSince(seq, &iter); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(iter->Valid()); + return std::move(iter); + } + + std::string DummyString(size_t len, char c = 'a') { + return std::string(len, c); + } }; TEST(DBTest, Empty) { @@ -2591,88 +2611,74 @@ TEST(DBTest, WALArchival) { } -TEST(DBTest, TransactionLogIterator) { - std::string value(1024, '1'); - Options options = CurrentOptions(); - options.create_if_missing = true; - options.WAL_ttl_seconds = 1000; - DestroyAndReopen(&options); - Put("key1", value); - Put("key2", value); - Put("key2", value); - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U); - { - unique_ptr iter; - Status status = dbfull()->GetUpdatesSince(0, &iter); - ASSERT_TRUE(status.ok()); - ASSERT_TRUE(iter->Valid()); - int i = 0; - SequenceNumber lastSequence = 0; - while (iter->Valid()) { - BatchResult res = iter->GetBatch(); - ASSERT_TRUE(res.sequence > lastSequence); - ++i; - lastSequence = res.sequence; - ASSERT_TRUE(iter->status().ok()); - iter->Next(); - } - ASSERT_EQ(i, 3); - } - Reopen(&options); - { - Put("key4", value); - Put("key5", value); - Put("key6", value); - } - { - unique_ptr iter; - Status status = dbfull()->GetUpdatesSince(0, &iter); - ASSERT_TRUE(status.ok()); - ASSERT_TRUE(iter->Valid()); - int i = 0; - SequenceNumber lastSequence = 0; - while (iter->Valid()) { - BatchResult res = iter->GetBatch(); - ASSERT_TRUE(res.sequence > lastSequence); - lastSequence = res.sequence; - ASSERT_TRUE(iter->status().ok()); - iter->Next(); - ++i; - } - ASSERT_EQ(i, 6); - } -} - -TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { - std::string value(1024, '1'); - Options options = CurrentOptions(); - options.create_if_missing = true; - options.WAL_ttl_seconds = 1000; - DestroyAndReopen(&options); - // Do a plain Reopen. - Put("key1", value); - // Two reopens should create a zero record WAL file. - Reopen(&options); - Reopen(&options); - - Put("key2", value); - unique_ptr iter; - Status status = dbfull()->GetUpdatesSince(0, &iter); - ASSERT_TRUE(status.ok()); - ASSERT_TRUE(iter->Valid()); - ASSERT_TRUE(status.ok()); - ASSERT_TRUE(iter->Valid()); +void ExpectRecords( + const int expected_no_records, + std::unique_ptr& iter) { int i = 0; SequenceNumber lastSequence = 0; while (iter->Valid()) { BatchResult res = iter->GetBatch(); ASSERT_TRUE(res.sequence > lastSequence); + ++i; lastSequence = res.sequence; ASSERT_TRUE(iter->status().ok()); iter->Next(); - ++i; } - ASSERT_EQ(i, 2); + ASSERT_EQ(i, expected_no_records); +} + +TEST(DBTest, TransactionLogIterator) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + Put("key1", DummyString(1024)); + Put("key2", DummyString(1024)); + Put("key2", DummyString(1024)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U); + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(3, iter); + } + Reopen(&options); + { + Put("key4", DummyString(1024)); + Put("key5", DummyString(1024)); + Put("key6", DummyString(1024)); + } + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(6, iter); + } +} + +TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + // Do a plain Reopen. + Put("key1", DummyString(1024)); + // Two reopens should create a zero record WAL file. + Reopen(&options); + Reopen(&options); + + Put("key2", DummyString(1024)); + + auto iter = OpenTransactionLogIter(0); + ExpectRecords(2, iter); +} + +TEST(DBTest, TransactionLogIteratorStallAtLastRecord) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + Put("key1", DummyString(1024)); + auto iter = OpenTransactionLogIter(0); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + ASSERT_OK(iter->status()); + Put("key2", DummyString(1024)); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); } TEST(DBTest, ReadCompaction) { diff --git a/db/log_reader.h b/db/log_reader.h index 77ed5796d..271286877 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -59,6 +59,17 @@ class Reader { // Undefined before the first call to ReadRecord. uint64_t LastRecordOffset(); + // returns true if the reader has encountered an eof condition. + bool IsEOF() { + return eof_; + } + + // when we know more data has been written to the file. we can use this + // function to force the reader to look again in the file. + void UnmarkEOF() { + eof_ = false; + } + SequentialFile* file() { return file_.get(); } private: diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_iterator_impl.cc index 59f3c2461..676e0c907 100644 --- a/db/transaction_log_iterator_impl.cc +++ b/db/transaction_log_iterator_impl.cc @@ -1,6 +1,7 @@ #include "db/transaction_log_iterator_impl.h" #include "db/write_batch_internal.h" #include "db/filename.h" + namespace leveldb { TransactionLogIteratorImpl::TransactionLogIteratorImpl( @@ -103,7 +104,7 @@ void TransactionLogIteratorImpl::Next() { } UpdateCurrentWriteBatch(record); if (currentSequence_ >= sequenceNumber_) { - assert(currentSequence_ < *lastFlushedSequence_); + assert(currentSequence_ <= *lastFlushedSequence_); isValid_ = true; currentLogReader_ = std::move(reader); break; @@ -120,23 +121,20 @@ void TransactionLogIteratorImpl::Next() { LOOK_NEXT_FILE: assert(currentLogReader_); bool openNextFile = true; - - if (currentSequence_ == *lastFlushedSequence_) { - // The last update has been read. and next is being called. - isValid_ = false; - currentStatus_ = Status::OK(); - } - - while (currentLogReader_->ReadRecord(&record, &scratch) && - currentSequence_ < *lastFlushedSequence_) { - if (record.size() < 12) { - reporter.Corruption( - record.size(), Status::Corruption("log record too small")); - continue; - } else { - UpdateCurrentWriteBatch(record); - openNextFile = false; - break; + if (currentSequence_ < *lastFlushedSequence_) { + if (currentLogReader_->IsEOF()) { + currentLogReader_->UnmarkEOF(); + } + while (currentLogReader_->ReadRecord(&record, &scratch)) { + if (record.size() < 12) { + reporter.Corruption( + record.size(), Status::Corruption("log record too small")); + continue; + } else { + UpdateCurrentWriteBatch(record); + openNextFile = false; + break; + } } } @@ -154,6 +152,10 @@ LOOK_NEXT_FILE: currentLogReader_.reset( new log::Reader(std::move(file), &reporter, true, 0)); goto LOOK_NEXT_FILE; + } else if (currentSequence_ == *lastFlushedSequence_) { + // The last update has been read. and next is being called. + isValid_ = false; + currentStatus_ = Status::OK(); } else { // LOOKED AT FILES. WE ARE DONE HERE. isValid_ = false; @@ -169,6 +171,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { WriteBatchInternal::SetContents(batch, record); currentSequence_ = WriteBatchInternal::Sequence(batch); currentBatch_.reset(batch); + isValid_ = true; } } // namespace leveldb