rocksdb/db/transaction_log_impl.cc
Mayank Agarwal fe3713961e Features in Transaction log iterator
Summary:
* Logstore requests a valid change of reutrning an empty iterator and not an error in case of no log files.
* Changed the code to return the writebatch containing the sequence number requested from GetupdatesSince even if it lies in the middle. Earlier we used to return the next writebatch,. This also allows me oto guarantee that no files played upon by the iterator are redundant. I mean the starting log file has at least a sequence number >= the sequence number requested form GetupdatesSince.
* Cleaned up redundant logic in Iterator::Next and made a new function SeekToStartSequence for greater readability and maintainibilty.
* Modified a test in db_test accordingly
Please check the logic carefully and suggest improvements. I have a separate patch out for more improvements like restricting reader to read till written sequences.

Test Plan:
* transaction log iterator tests in db_test,
* db_repl_stress.
* rocks_log_iterator_test in fbcode/wormhole/rocksdb/test - 2 tests thriving on hacks till now can get simplified
* testing on the shadow setup for sigma with replication

Reviewers: dhruba, haobo, kailiu, sdong

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13437
2013-10-14 18:16:21 -07:00

185 lines
5.8 KiB
C++

#include "db/transaction_log_impl.h"
#include "db/write_batch_internal.h"
namespace rocksdb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir,
const Options* options,
const EnvOptions& soptions,
const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files,
SequenceNumber const * const lastFlushedSequence) :
dir_(dir),
options_(options),
soptions_(soptions),
startingSequenceNumber_(seq),
files_(std::move(files)),
started_(false),
isValid_(false),
currentFileIndex_(0),
lastFlushedSequence_(lastFlushedSequence) {
assert(startingSequenceNumber_ <= *lastFlushedSequence_);
assert(files_ != nullptr);
reporter_.env = options_->env;
reporter_.info_log = options_->info_log.get();
SeekToStartSequence(); // Seek till starting sequence
}
Status TransactionLogIteratorImpl::OpenLogFile(
const LogFile* logFile,
unique_ptr<SequentialFile>* file) {
Env* env = options_->env;
if (logFile->Type() == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber());
return env->NewSequentialFile(fname, file, soptions_);
} else {
std::string fname = LogFileName(dir_, logFile->LogNumber());
Status status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
// If cannot open file in DB directory.
// Try the archive dir, as it could have moved in the meanwhile.
fname = ArchivedLogFileName(dir_, logFile->LogNumber());
status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
return Status::IOError(" Requested file not present in the dir");
}
}
return status;
}
}
BatchResult TransactionLogIteratorImpl::GetBatch() {
assert(isValid_); // cannot call in a non valid state.
BatchResult result;
result.sequence = currentBatchSeq_;
result.writeBatchPtr = std::move(currentBatch_);
return result;
}
Status TransactionLogIteratorImpl::status() {
return currentStatus_;
}
bool TransactionLogIteratorImpl::Valid() {
return started_ && isValid_;
}
void TransactionLogIteratorImpl::SeekToStartSequence() {
std::string scratch;
Slice record;
isValid_ = false;
if (startingSequenceNumber_ > *lastFlushedSequence_) {
currentStatus_ = Status::IOError("Looking for a sequence, "
"which is not flushed yet.");
return;
}
if (files_->size() == 0) {
return;
}
Status s = OpenLogReader(files_->at(0).get());
if (!s.ok()) {
currentStatus_ = s;
return;
}
while (currentLogReader_->ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter_.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
UpdateCurrentWriteBatch(record);
if (currentBatchSeq_ + currentBatchCount_ - 1 >=
startingSequenceNumber_) {
assert(currentBatchSeq_ <= *lastFlushedSequence_);
isValid_ = true;
started_ = true; // set started_ as we could seek till starting sequence
return;
} else {
isValid_ = false;
}
}
// Could not find start sequence in first file. Normally this must be the
// only file. Otherwise log the error and let the iterator return next entry
if (files_->size() != 1) {
currentStatus_ = Status::Corruption("Start sequence was not found, "
"skipping to the next available");
reporter_.Corruption(0, currentStatus_);
started_ = true; // Let Next find next available entry
Next();
}
}
void TransactionLogIteratorImpl::Next() {
// TODO:Next() says that it requires Valid to be true but this is not true
// assert(Valid());
std::string scratch;
Slice record;
isValid_ = false;
if (!started_) { // Runs every time until we can seek to the start sequence
return SeekToStartSequence();
}
while(true) {
assert(currentLogReader_);
if (currentBatchSeq_ < *lastFlushedSequence_) {
if (currentLogReader_->IsEOF()) {
currentLogReader_->UnmarkEOF();
}
while (currentLogReader_->ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter_.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
} else {
UpdateCurrentWriteBatch(record);
return;
}
}
}
// Open the next file
if (currentFileIndex_ < files_->size() - 1) {
++currentFileIndex_;
Status status =OpenLogReader(files_->at(currentFileIndex_).get());
if (!status.ok()) {
isValid_ = false;
currentStatus_ = status;
return;
}
} else {
isValid_ = false;
if (currentBatchSeq_ == *lastFlushedSequence_) {
currentStatus_ = Status::OK();
} else {
currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
}
return;
}
}
}
void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
WriteBatch* batch = new WriteBatch();
WriteBatchInternal::SetContents(batch, record);
currentBatchSeq_ = WriteBatchInternal::Sequence(batch);
currentBatchCount_ = WriteBatchInternal::Count(batch);
currentBatch_.reset(batch);
isValid_ = true;
currentStatus_ = Status::OK();
}
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
unique_ptr<SequentialFile> file;
Status status = OpenLogFile(logFile, &file);
if (!status.ok()) {
return status;
}
assert(file);
currentLogReader_.reset(
new log::Reader(std::move(file), &reporter_, true, 0)
);
return Status::OK();
}
} // namespace rocksdb