TransactionLogIter should stall at the last record. Currently it errors out

Summary:
* Add a method to check if the log reader is at EOF.
* If we know a record has been flushed force the log_reader to believe it is not at EOF, using a new method UnMarkEof().

This does not work with MMpaed files.

Test Plan: added a unit test.

Reviewers: dhruba, heyongqiang

Reviewed By: heyongqiang

CC: leveldb

Differential Revision: https://reviews.facebook.net/D9567
This commit is contained in:
Abhishek Kona 2013-03-21 15:12:35 -07:00
parent 38d54832f7
commit 27c15fb67e
3 changed files with 111 additions and 91 deletions

View File

@ -517,6 +517,26 @@ class DBTest {
} }
return result; return result;
} }
Options OptionsForLogIterTest() {
Options options = CurrentOptions();
options.create_if_missing = true;
options.WAL_ttl_seconds = 1000;
return options;
}
std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
const SequenceNumber seq) {
unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(seq, &iter);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(iter->Valid());
return std::move(iter);
}
std::string DummyString(size_t len, char c = 'a') {
return std::string(len, c);
}
}; };
TEST(DBTest, Empty) { TEST(DBTest, Empty) {
@ -2591,88 +2611,74 @@ TEST(DBTest, WALArchival) {
} }
TEST(DBTest, TransactionLogIterator) { void ExpectRecords(
std::string value(1024, '1'); const int expected_no_records,
Options options = CurrentOptions(); std::unique_ptr<TransactionLogIterator>& iter) {
options.create_if_missing = true;
options.WAL_ttl_seconds = 1000;
DestroyAndReopen(&options);
Put("key1", value);
Put("key2", value);
Put("key2", value);
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
{
unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(iter->Valid());
int i = 0;
SequenceNumber lastSequence = 0;
while (iter->Valid()) {
BatchResult res = iter->GetBatch();
ASSERT_TRUE(res.sequence > lastSequence);
++i;
lastSequence = res.sequence;
ASSERT_TRUE(iter->status().ok());
iter->Next();
}
ASSERT_EQ(i, 3);
}
Reopen(&options);
{
Put("key4", value);
Put("key5", value);
Put("key6", value);
}
{
unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(iter->Valid());
int i = 0;
SequenceNumber lastSequence = 0;
while (iter->Valid()) {
BatchResult res = iter->GetBatch();
ASSERT_TRUE(res.sequence > lastSequence);
lastSequence = res.sequence;
ASSERT_TRUE(iter->status().ok());
iter->Next();
++i;
}
ASSERT_EQ(i, 6);
}
}
TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) {
std::string value(1024, '1');
Options options = CurrentOptions();
options.create_if_missing = true;
options.WAL_ttl_seconds = 1000;
DestroyAndReopen(&options);
// Do a plain Reopen.
Put("key1", value);
// Two reopens should create a zero record WAL file.
Reopen(&options);
Reopen(&options);
Put("key2", value);
unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(status.ok());
ASSERT_TRUE(iter->Valid());
int i = 0; int i = 0;
SequenceNumber lastSequence = 0; SequenceNumber lastSequence = 0;
while (iter->Valid()) { while (iter->Valid()) {
BatchResult res = iter->GetBatch(); BatchResult res = iter->GetBatch();
ASSERT_TRUE(res.sequence > lastSequence); ASSERT_TRUE(res.sequence > lastSequence);
++i;
lastSequence = res.sequence; lastSequence = res.sequence;
ASSERT_TRUE(iter->status().ok()); ASSERT_TRUE(iter->status().ok());
iter->Next(); iter->Next();
++i;
} }
ASSERT_EQ(i, 2); ASSERT_EQ(i, expected_no_records);
}
TEST(DBTest, TransactionLogIterator) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
Put("key1", DummyString(1024));
Put("key2", DummyString(1024));
Put("key2", DummyString(1024));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
{
auto iter = OpenTransactionLogIter(0);
ExpectRecords(3, iter);
}
Reopen(&options);
{
Put("key4", DummyString(1024));
Put("key5", DummyString(1024));
Put("key6", DummyString(1024));
}
{
auto iter = OpenTransactionLogIter(0);
ExpectRecords(6, iter);
}
}
TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
// Do a plain Reopen.
Put("key1", DummyString(1024));
// Two reopens should create a zero record WAL file.
Reopen(&options);
Reopen(&options);
Put("key2", DummyString(1024));
auto iter = OpenTransactionLogIter(0);
ExpectRecords(2, iter);
}
TEST(DBTest, TransactionLogIteratorStallAtLastRecord) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
Put("key1", DummyString(1024));
auto iter = OpenTransactionLogIter(0);
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(!iter->Valid());
ASSERT_OK(iter->status());
Put("key2", DummyString(1024));
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
} }
TEST(DBTest, ReadCompaction) { TEST(DBTest, ReadCompaction) {

View File

@ -59,6 +59,17 @@ class Reader {
// Undefined before the first call to ReadRecord. // Undefined before the first call to ReadRecord.
uint64_t LastRecordOffset(); uint64_t LastRecordOffset();
// returns true if the reader has encountered an eof condition.
bool IsEOF() {
return eof_;
}
// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
void UnmarkEOF() {
eof_ = false;
}
SequentialFile* file() { return file_.get(); } SequentialFile* file() { return file_.get(); }
private: private:

View File

@ -1,6 +1,7 @@
#include "db/transaction_log_iterator_impl.h" #include "db/transaction_log_iterator_impl.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "db/filename.h" #include "db/filename.h"
namespace leveldb { namespace leveldb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl( TransactionLogIteratorImpl::TransactionLogIteratorImpl(
@ -103,7 +104,7 @@ void TransactionLogIteratorImpl::Next() {
} }
UpdateCurrentWriteBatch(record); UpdateCurrentWriteBatch(record);
if (currentSequence_ >= sequenceNumber_) { if (currentSequence_ >= sequenceNumber_) {
assert(currentSequence_ < *lastFlushedSequence_); assert(currentSequence_ <= *lastFlushedSequence_);
isValid_ = true; isValid_ = true;
currentLogReader_ = std::move(reader); currentLogReader_ = std::move(reader);
break; break;
@ -120,23 +121,20 @@ void TransactionLogIteratorImpl::Next() {
LOOK_NEXT_FILE: LOOK_NEXT_FILE:
assert(currentLogReader_); assert(currentLogReader_);
bool openNextFile = true; bool openNextFile = true;
if (currentSequence_ < *lastFlushedSequence_) {
if (currentSequence_ == *lastFlushedSequence_) { if (currentLogReader_->IsEOF()) {
// The last update has been read. and next is being called. currentLogReader_->UnmarkEOF();
isValid_ = false; }
currentStatus_ = Status::OK(); while (currentLogReader_->ReadRecord(&record, &scratch)) {
} if (record.size() < 12) {
reporter.Corruption(
while (currentLogReader_->ReadRecord(&record, &scratch) && record.size(), Status::Corruption("log record too small"));
currentSequence_ < *lastFlushedSequence_) { continue;
if (record.size() < 12) { } else {
reporter.Corruption( UpdateCurrentWriteBatch(record);
record.size(), Status::Corruption("log record too small")); openNextFile = false;
continue; break;
} else { }
UpdateCurrentWriteBatch(record);
openNextFile = false;
break;
} }
} }
@ -154,6 +152,10 @@ LOOK_NEXT_FILE:
currentLogReader_.reset( currentLogReader_.reset(
new log::Reader(std::move(file), &reporter, true, 0)); new log::Reader(std::move(file), &reporter, true, 0));
goto LOOK_NEXT_FILE; goto LOOK_NEXT_FILE;
} else if (currentSequence_ == *lastFlushedSequence_) {
// The last update has been read. and next is being called.
isValid_ = false;
currentStatus_ = Status::OK();
} else { } else {
// LOOKED AT FILES. WE ARE DONE HERE. // LOOKED AT FILES. WE ARE DONE HERE.
isValid_ = false; isValid_ = false;
@ -169,6 +171,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
WriteBatchInternal::SetContents(batch, record); WriteBatchInternal::SetContents(batch, record);
currentSequence_ = WriteBatchInternal::Sequence(batch); currentSequence_ = WriteBatchInternal::Sequence(batch);
currentBatch_.reset(batch); currentBatch_.reset(batch);
isValid_ = true;
} }
} // namespace leveldb } // namespace leveldb