diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index c6dfbd304..29e309d5f 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -23,6 +23,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( started_(false), isValid_(false), currentFileIndex_(0), + currentBatchSeq_(0), + currentBatchCount_(0), lastFlushedSequence_(lastFlushedSequence) { assert(startingSequenceNumber_ <= *lastFlushedSequence_); assert(files_ != nullptr); @@ -33,8 +35,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( } Status TransactionLogIteratorImpl::OpenLogFile( - const LogFile* logFile, - unique_ptr* file) { + const LogFile* logFile, + unique_ptr* file) { Env* env = options_->env; if (logFile->Type() == kArchivedLogFile) { std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber()); @@ -48,7 +50,7 @@ Status TransactionLogIteratorImpl::OpenLogFile( fname = ArchivedLogFileName(dir_, logFile->LogNumber()); status = env->NewSequentialFile(fname, file, soptions_); if (!status.ok()) { - return Status::IOError(" Requested file not present in the dir"); + return Status::IOError("Requested file not present in the dir"); } } return status; @@ -71,54 +73,73 @@ bool TransactionLogIteratorImpl::Valid() { return started_ && isValid_; } -void TransactionLogIteratorImpl::SeekToStartSequence() { - std::string scratch; - Slice record; - isValid_ = false; - if (startingSequenceNumber_ > *lastFlushedSequence_) { - currentStatus_ = Status::IOError("Looking for a sequence, " - "which is not flushed yet."); - return; +bool TransactionLogIteratorImpl::RestrictedRead( + Slice* record, + std::string* scratch) { + // Don't read if no more complete entries to read from logs + if (currentBatchSeq_ >= *lastFlushedSequence_) { + return false; + } + return currentLogReader_->ReadRecord(record, scratch); +} + +void TransactionLogIteratorImpl::SeekToStartSequence( + uint64_t startFileIndex, + bool strict) { + std::string scratch; + Slice record; + started_ = false; + isValid_ = false; + if (startingSequenceNumber_ > *lastFlushedSequence_) { + currentStatus_ = Status::IOError("Looking for a sequence, " + "which is not flushed yet."); + return; + } + if (files_->size() <= startFileIndex) { + return; + } + Status s = OpenLogReader(files_->at(startFileIndex).get()); + if (!s.ok()) { + currentStatus_ = s; + return; + } + while (RestrictedRead(&record, &scratch)) { + if (record.size() < 12) { + reporter_.Corruption( + record.size(), Status::Corruption("very small log record")); + continue; } - if (files_->size() == 0) { - return; - } - Status s = OpenLogReader(files_->at(0).get()); - if (!s.ok()) { - currentStatus_ = s; - return; - } - while (currentLogReader_->ReadRecord(&record, &scratch)) { - if (record.size() < 12) { - reporter_.Corruption( - record.size(), Status::Corruption("log record too small")); - continue; - } - UpdateCurrentWriteBatch(record); - if (currentBatchSeq_ + currentBatchCount_ - 1 >= - startingSequenceNumber_) { - assert(currentBatchSeq_ <= *lastFlushedSequence_); - isValid_ = true; - started_ = true; // set started_ as we could seek till starting sequence + UpdateCurrentWriteBatch(record); + if (currentBatchSeq_ + currentBatchCount_ - 1 >= + startingSequenceNumber_) { + if (strict && currentBatchSeq_ != startingSequenceNumber_) { + currentStatus_ = Status::Corruption("Gap in sequence number. Could not " + "seek to required sequence number"); + reporter_.Info(currentStatus_.ToString().c_str()); return; - } else { - isValid_ = false; + } else if (strict) { + reporter_.Info("Could seek required sequence number. Iterator will " + "continue."); } + isValid_ = true; + started_ = true; // set started_ as we could seek till starting sequence + return; + } else { + isValid_ = false; } - // Could not find start sequence in first file. Normally this must be the - // only file. Otherwise log the error and let the iterator return next entry - if (files_->size() != 1) { - currentStatus_ = Status::Corruption("Start sequence was not found, " - "skipping to the next available"); - reporter_.Corruption(0, currentStatus_); - started_ = true; // Let Next find next available entry - Next(); - } + } + // Could not find start sequence in first file. Normally this must be the + // only file. Otherwise log the error and let the iterator return next entry + if (files_->size() != 1) { + currentStatus_ = Status::Corruption("Start sequence was not found, " + "skipping to the next available"); + reporter_.Corruption(0, currentStatus_); + started_ = true; // Let Next find next available entry + Next(); + } } void TransactionLogIteratorImpl::Next() { - // TODO:Next() says that it requires Valid to be true but this is not true - // assert(Valid()); std::string scratch; Slice record; isValid_ = false; @@ -134,11 +155,10 @@ void TransactionLogIteratorImpl::Next() { while (currentLogReader_->ReadRecord(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( - record.size(), Status::Corruption("log record too small")); + record.size(), Status::Corruption("very small log record")); continue; } else { - UpdateCurrentWriteBatch(record); - return; + return UpdateCurrentWriteBatch(record); } } } @@ -164,11 +184,44 @@ void TransactionLogIteratorImpl::Next() { } } +bool TransactionLogIteratorImpl::IsBatchContinuous( + const WriteBatch* batch, + const SequenceNumber expectedSeq) { + assert(batch); + SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch); + if (started_ && batchSeq != expectedSeq) { + char buf[200]; + snprintf(buf, sizeof(buf), + "Discontinuity in log records. Got seq=%lu, Expected seq=%lu, " + "Last flushed seq=%lu. Log iterator will seek the correct batch.", + batchSeq, expectedSeq, *lastFlushedSequence_); + reporter_.Info(buf); + return false; + } + return true; +} + void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { WriteBatch* batch = new WriteBatch(); WriteBatchInternal::SetContents(batch, record); + + SequenceNumber expectedSeq = currentBatchSeq_ + currentBatchCount_; + if (!IsBatchContinuous(batch, expectedSeq)) { + // Seek to the batch having expected sequence number + if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) { + // Expected batch must lie in the previous log file + currentFileIndex_--; + currentFileIndex_ = (currentFileIndex_ >= 0) ? currentFileIndex_ : 0; + } + startingSequenceNumber_ = expectedSeq; + return SeekToStartSequence(currentFileIndex_, true); + } + currentBatchSeq_ = WriteBatchInternal::Sequence(batch); currentBatchCount_ = WriteBatchInternal::Count(batch); + // currentBatchSeq_ can only change here + assert(currentBatchSeq_ <= *lastFlushedSequence_); + currentBatch_.reset(batch); isValid_ = true; currentStatus_ = Status::OK(); diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index b17a80a2e..d08d192ac 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -21,6 +21,9 @@ struct LogReporter : public log::Reader::Reporter { virtual void Corruption(size_t bytes, const Status& s) { Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str()); } + virtual void Info(const char* s) { + Log(info_log, "%s", s); + } }; class LogFileImpl : public LogFile { @@ -81,7 +84,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const std::string& dir_; const Options* options_; const EnvOptions& soptions_; - const SequenceNumber startingSequenceNumber_; + SequenceNumber startingSequenceNumber_; std::unique_ptr files_; bool started_; bool isValid_; // not valid when it starts of. @@ -91,12 +94,18 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { unique_ptr currentLogReader_; Status OpenLogFile(const LogFile* logFile, unique_ptr* file); LogReporter reporter_; - SequenceNumber const * const lastFlushedSequence_; - SequenceNumber currentBatchSeq_; // sequence number at start of current batch uint64_t currentBatchCount_; // count in current batch + SequenceNumber const * const lastFlushedSequence_; - void SeekToStartSequence(); + // Reads from transaction log only if the writebatch record has been written + bool RestrictedRead(Slice* record, std::string* scratch); + // Seeks to startingSequenceNumber reading from startFileIndex in files_. + // If strict is set,then must get a batch starting with startingSequenceNumber + void SeekToStartSequence(uint64_t startFileIndex = 0, bool strict = false); + // Check if batch is continuous starting from expectedSeq, else return false + bool IsBatchContinuous(const WriteBatch* batch, SequenceNumber expectedSeq); + // Update current batch if a continuous batch is found, else return false void UpdateCurrentWriteBatch(const Slice& record); Status OpenLogReader(const LogFile* file); };