rocksdb/db/transaction_log_impl.cc
Naman Gupta cbf4a06427 Add option for storing transaction logs in a separate dir
Summary: In some cases, you might not want to store the data log (write ahead log) files in the same dir as the sst files. An example use case is leaf, which stores sst files in tmpfs. And would like to save the log files in a separate dir (disk) to save memory.

Test Plan: make all. Ran db_test test. A few test failing. P2785018. If you guys don't see an obvious problem with the code, maybe somebody from the rocksdb team could help me debug the issue here. Running this on leaf worked well. I could see logs stored on disk, and deleted appropriately after compactions. Obviously this is only one set of options. The unit tests cover different options. Seems like I'm missing some edge cases.

Reviewers: dhruba, haobo, leveldb

CC: xinyaohu, sumeet

Differential Revision: https://reviews.facebook.net/D13239
2013-10-08 17:40:27 -07:00

172 lines
5.1 KiB
C++

#include "db/transaction_log_impl.h"
#include "db/write_batch_internal.h"
namespace rocksdb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir,
const Options* options,
const EnvOptions& soptions,
const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files,
SequenceNumber const * const lastFlushedSequence) :
dir_(dir),
options_(options),
soptions_(soptions),
startingSequenceNumber_(seq),
files_(std::move(files)),
started_(false),
isValid_(false),
currentFileIndex_(0),
lastFlushedSequence_(lastFlushedSequence) {
assert(startingSequenceNumber_ <= *lastFlushedSequence_);
assert(files_.get() != nullptr);
reporter_.env = options_->env;
reporter_.info_log = options_->info_log.get();
}
Status TransactionLogIteratorImpl::OpenLogFile(
const LogFile* logFile,
unique_ptr<SequentialFile>* file) {
Env* env = options_->env;
if (logFile->Type() == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber());
return env->NewSequentialFile(fname, file, soptions_);
} else {
std::string fname = LogFileName(dir_, logFile->LogNumber());
Status status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
// If cannot open file in DB directory.
// Try the archive dir, as it could have moved in the meanwhile.
fname = ArchivedLogFileName(dir_, logFile->LogNumber());
status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
return Status::IOError(" Requested file not present in the dir");
}
}
return status;
}
}
BatchResult TransactionLogIteratorImpl::GetBatch() {
assert(isValid_); // cannot call in a non valid state.
BatchResult result;
result.sequence = currentSequence_;
result.writeBatchPtr = std::move(currentBatch_);
return result;
}
Status TransactionLogIteratorImpl::status() {
return currentStatus_;
}
bool TransactionLogIteratorImpl::Valid() {
return started_ && isValid_;
}
void TransactionLogIteratorImpl::Next() {
LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get();
// First seek to the given seqNo. in the current file.
std::string scratch;
Slice record;
if (!started_) {
started_ = true; // this piece only runs onced.
isValid_ = false;
if (startingSequenceNumber_ > *lastFlushedSequence_) {
currentStatus_ = Status::IOError("Looking for a sequence, "
"which is not flushed yet.");
return;
}
Status s = OpenLogReader(currentLogFile);
if (!s.ok()) {
currentStatus_ = s;
isValid_ = false;
return;
}
while (currentLogReader_->ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter_.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
UpdateCurrentWriteBatch(record);
if (currentSequence_ >= startingSequenceNumber_) {
assert(currentSequence_ <= *lastFlushedSequence_);
isValid_ = true;
break;
} else {
isValid_ = false;
}
}
if (isValid_) {
// Done for this iteration
return;
}
}
bool openNextFile = true;
while(openNextFile) {
assert(currentLogReader_);
if (currentSequence_ < *lastFlushedSequence_) {
if (currentLogReader_->IsEOF()) {
currentLogReader_->UnmarkEOF();
}
while (currentLogReader_->ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter_.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
} else {
UpdateCurrentWriteBatch(record);
openNextFile = false;
break;
}
}
}
if (openNextFile) {
if (currentFileIndex_ < files_.get()->size() - 1) {
++currentFileIndex_;
Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get());
if (!status.ok()) {
isValid_ = false;
currentStatus_ = status;
return;
}
} else {
isValid_ = false;
openNextFile = false;
if (currentSequence_ == *lastFlushedSequence_) {
currentStatus_ = Status::OK();
} else {
currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
}
}
}
}
}
void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
WriteBatch* batch = new WriteBatch();
WriteBatchInternal::SetContents(batch, record);
currentSequence_ = WriteBatchInternal::Sequence(batch);
currentBatch_.reset(batch);
isValid_ = true;
currentStatus_ = Status::OK();
}
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
unique_ptr<SequentialFile> file;
Status status = OpenLogFile(logFile, &file);
if (!status.ok()) {
return status;
}
assert(file);
currentLogReader_.reset(
new log::Reader(std::move(file), &reporter_, true, 0)
);
return Status::OK();
}
} // namespace rocksdb