rocksdb/db/transaction_log_impl.h
Mayank Agarwal ae8e0770b4 Disallow transaction log iterator to skip sequences
Summary:
This is expected to solve the "gaps in transaction log iterator" problem.
* After a lot of observations on the gaps on the sigmafio machines I found that it is due to a race between log reader and writer always.
* So when we drop the wormhole subscription and refresh the iterator, the gaps are not there.
* It is NOT due to some boundary or corner case left unattended in the iterator logic because I checked many instances of the gaps against their log files with ldb. The log files are NOT corrupted also.
* The solution is to not allow the iterator to read incompletely written sequences and detect gaps inside itself and invalidate it which will cause the application to refresh the iterator normally and seek to the required sequence properly.
* Thus, the iterator can at least guarantee that it will not give any gaps.

Test Plan:
* db_test based log iterator tests
* db_repl_stress
* testing on sigmafio setup to see gaps go away

Reviewers: dhruba, haobo

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13593
2013-10-22 11:45:35 -07:00

113 lines
3.6 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#pragma once
#include <vector>
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/transaction_log.h"
#include "db/log_reader.h"
#include "db/filename.h"
namespace rocksdb {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str());
}
virtual void Info(const char* s) {
Log(info_log, "%s", s);
}
};
class LogFileImpl : public LogFile {
public:
LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
uint64_t sizeBytes) :
logNumber_(logNum),
type_(logType),
startSequence_(startSeq),
sizeFileBytes_(sizeBytes) {
}
std::string PathName() const {
if (type_ == kArchivedLogFile) {
return ArchivedLogFileName("", logNumber_);
}
return LogFileName("", logNumber_);
}
uint64_t LogNumber() const { return logNumber_; }
WalFileType Type() const { return type_; }
SequenceNumber StartSequence() const { return startSequence_; }
uint64_t SizeFileBytes() const { return sizeFileBytes_; }
bool operator < (const LogFile& that) const {
return LogNumber() < that.LogNumber();
}
private:
uint64_t logNumber_;
WalFileType type_;
SequenceNumber startSequence_;
uint64_t sizeFileBytes_;
};
class TransactionLogIteratorImpl : public TransactionLogIterator {
public:
TransactionLogIteratorImpl(const std::string& dir,
const Options* options,
const EnvOptions& soptions,
const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files,
SequenceNumber const * const lastFlushedSequence);
virtual bool Valid();
virtual void Next();
virtual Status status();
virtual BatchResult GetBatch();
private:
const std::string& dir_;
const Options* options_;
const EnvOptions& soptions_;
SequenceNumber startingSequenceNumber_;
std::unique_ptr<VectorLogPtr> files_;
bool started_;
bool isValid_; // not valid when it starts of.
Status currentStatus_;
size_t currentFileIndex_;
std::unique_ptr<WriteBatch> currentBatch_;
unique_ptr<log::Reader> currentLogReader_;
Status OpenLogFile(const LogFile* logFile, unique_ptr<SequentialFile>* file);
LogReporter reporter_;
SequenceNumber currentBatchSeq_; // sequence number at start of current batch
uint64_t currentBatchCount_; // count in current batch
SequenceNumber const * const lastFlushedSequence_;
// Reads from transaction log only if the writebatch record has been written
bool RestrictedRead(Slice* record, std::string* scratch);
// Seeks to startingSequenceNumber reading from startFileIndex in files_.
// If strict is set,then must get a batch starting with startingSequenceNumber
void SeekToStartSequence(uint64_t startFileIndex = 0, bool strict = false);
// Check if batch is continuous starting from expectedSeq, else return false
bool IsBatchContinuous(const WriteBatch* batch, SequenceNumber expectedSeq);
// Update current batch if a continuous batch is found, else return false
void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile* file);
};
} // namespace rocksdb