Added multi WAL log testing to recovery tests.

Summary: Currently there is no test in the suite to test the case where
there are multiple WAL files and there is a corruption in one of them. We have
tests for single WAL file corruption scenarios. Added tests to mock
the scenarios for all combinations of recovery modes and corruption in
specified file locations.

Test Plan: Run make check

Reviewers: sdong igor

CC: leveldb@

Task ID: #7501229

Blame Rev:
This commit is contained in:
krad 2015-06-29 14:57:18 -07:00
parent 4f56632b16
commit e2e3d84b2c

View File

@ -8704,28 +8704,78 @@ TEST_F(DBTest, TransactionLogIteratorCorruptedLog) {
//
// Test WAL recovery for the various modes available
// TODO krad:
// 1. Add tests when there are more than one log file
//
class RecoveryTestHelper {
public:
// Recreate and fill the store with some data
static size_t FillData(DBTest* test, const Options& options) {
size_t count = 0;
test->DestroyAndReopen(options);
// Number of WAL files to generate
static const int kWALFilesCount = 10;
// Starting number for the WAL file name like 00010.log
static const int kWALFileOffset = 10;
// Keys to be written per WAL file
static const int kKeysPerWALFile = 1024;
// Size of the value
static const int kValueSize = 10;
for (int i = 0; i < 1024; i++) {
test->Put("key" + ToString(i), test->DummyString(10));
++count;
// Create WAL files with values filled in
static void FillData(DBTest* test, Options& options,
const size_t wal_count, size_t & count) {
DBOptions & db_options = options;
count = 0;
shared_ptr<Cache> table_cache = NewLRUCache(50000, 16);
EnvOptions env_options;
WriteBuffer write_buffer(db_options.db_write_buffer_size);
unique_ptr<VersionSet> versions;
unique_ptr<WalManager> wal_manager;
WriteController write_controller;
versions.reset(new VersionSet(test->dbname_, &db_options, env_options,
table_cache.get(), &write_buffer,
&write_controller));
wal_manager.reset(new WalManager(db_options, env_options));
std::unique_ptr<log::Writer> current_log_writer;
for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
uint64_t current_log_number = j;
std::string fname = LogFileName(test->dbname_, current_log_number);
unique_ptr<WritableFile> file;
ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
current_log_writer.reset(new log::Writer(std::move(file)));
for (int i = 0; i < kKeysPerWALFile; i++) {
std::string key = "key" + ToString(count++);
std::string value = test->DummyString(kValueSize);
assert(current_log_writer.get() != nullptr);
uint64_t seq = versions->LastSequence() + 1;
WriteBatch batch;
batch.Put(key, value);
WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
versions->SetLastSequence(seq);
}
}
}
// Recreate and fill the store with some data
static size_t FillData(DBTest* test, Options& options) {
options.create_if_missing = true;
test->DestroyAndReopen(options);
test->Close();
size_t count = 0;
FillData(test, options, kWALFilesCount, count);
return count;
}
// Read back all the keys we wrote and return the number of keys found
static size_t GetData(DBTest* test) {
size_t count = 0;
for (size_t i = 0; i < 1024; i++) {
for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
if (test->Get("key" + ToString(i)) != "NOT_FOUND") {
++count;
}
@ -8733,6 +8783,23 @@ class RecoveryTestHelper {
return count;
}
// Manuall corrupt the specified WAL
static void CorruptWAL(DBTest * test, Options& options,
const double off, const double len,
const int wal_file_id, const bool trunc = false) {
Env* env = options.env;
std::string fname = LogFileName(test->dbname_, wal_file_id);
uint64_t size;
ASSERT_OK(env->GetFileSize(fname, &size));
ASSERT_GT(size, 0);
if (trunc) {
ASSERT_EQ(0, truncate(fname.c_str(), size * off));
} else {
InduceCorruption(fname, size * off, size * len);
}
}
// Overwrite data with 'a' from offset for length len
static void InduceCorruption(const std::string& filename, uint32_t offset,
uint32_t len) {
@ -8749,23 +8816,6 @@ class RecoveryTestHelper {
close(fd);
}
// Corrupt the last WAL file from (filesize * off) for length (filesize * len)
static void CorruptWAL(DBTest* test, const double off, const double len,
const bool trunc = false) {
rocksdb::VectorLogPtr wal_files;
ASSERT_OK(test->dbfull()->GetSortedWalFiles(wal_files));
ASSERT_EQ(wal_files.size(), 1);
const auto logfile_path =
test->dbname_ + "/" + wal_files.front()->PathName();
auto size = wal_files.front()->SizeFileBytes();
if (trunc) {
ASSERT_EQ(0, truncate(logfile_path.c_str(), size * off));
} else {
InduceCorruption(logfile_path, size * off, size * len);
}
}
};
// Test scope:
@ -8773,18 +8823,23 @@ class RecoveryTestHelper {
// at the end of any of the logs
// - We do not expect to open the data store for corruption
TEST_F(DBTest, kTolerateCorruptedTailRecords) {
for (auto trunc : {true, false}) {
for (int i = 0; i < 4; i++) {
const int jstart = RecoveryTestHelper::kWALFileOffset;
const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
for (auto trunc : {true, false}) { /* Corruption style */
for (int i = 0; i < 4; i++) { /* Corruption offset position */
for (int j = jstart; j < jend; j++) { /* WAL file */
// Fill data for testing
Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options);
// test checksum failure or parsing
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc);
RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3,
/*len%=*/ .1, /*wal=*/ j, trunc);
if (trunc) {
options.wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_TRUE(i == 0 || recovered_row_count > 0);
@ -8797,55 +8852,75 @@ TEST_F(DBTest, kTolerateCorruptedTailRecords) {
}
}
}
}
// Test scope:
// We don't expect the data store to be opened if there is any corruption
// (leading, middle or trailing -- incomplete writes or corruption)
TEST_F(DBTest, kAbsoluteConsistency) {
const int jstart = RecoveryTestHelper::kWALFileOffset;
const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
// Verify clean slate behavior
Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options);
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
for (auto trunc : {true, false}) {
for (int i = 0; i < 4; i++) {
for (auto trunc : {true, false}) { /* Corruption style */
for (int i = 0; i < 4; i++) { /* Corruption offset position */
if (trunc && i == 0) {
continue;
}
options = CurrentOptions();
RecoveryTestHelper::FillData(this, options);
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc);
for (int j = jstart; j < jend; j++) { /* wal files */
// fill with new date
RecoveryTestHelper::FillData(this, options);
// corrupt the wal
RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3,
/*len%=*/.1, j, trunc);
// verify
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
options.create_if_missing = false;
ASSERT_NOK(TryReopen(options));
}
}
}
}
// Test scope:
// - We expect to open data store under all circumstances
// - We expect only data upto the point where the first error was encountered
TEST_F(DBTest, kPointInTimeRecovery) {
for (auto trunc : {true, false}) {
for (int i = 0; i < 4; i++) {
const int jstart = RecoveryTestHelper::kWALFileOffset;
const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
const int maxkeys = RecoveryTestHelper::kWALFilesCount *
RecoveryTestHelper::kKeysPerWALFile;
for (auto trunc : {true, false}) { /* Corruption style */
for (int i = 0; i < 4; i++) { /* Offset of corruption */
for (int j = jstart; j < jend; j++) { /* WAL file */
// Fill data for testing
Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options);
// test checksum failure or parsing
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc);
// Corrupt the wal
RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3,
/*len%=*/.1, j, trunc);
// Verify
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
// Probe data for invariants
size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_LT(recovered_row_count, row_count);
// verify that the keys are sequential and there is no break
bool expect_data = true;
for (size_t j = 0; j < 1024; ++j) {
for (size_t k = 0; k < maxkeys; ++k) {
bool found = Get("key" + ToString(i)) != "NOT_FOUND";
if (expect_data && !found) {
expect_data = false;
@ -8853,8 +8928,15 @@ TEST_F(DBTest, kPointInTimeRecovery) {
ASSERT_EQ(found, expect_data);
}
ASSERT_TRUE(i != 0 || recovered_row_count == 0);
ASSERT_TRUE(i != 1 || recovered_row_count < row_count / 2);
const size_t min = RecoveryTestHelper::kKeysPerWALFile *
(j - RecoveryTestHelper::kWALFileOffset);
ASSERT_GE(recovered_row_count, min);
if (!trunc && i != 0) {
const size_t max = RecoveryTestHelper::kKeysPerWALFile *
(j - RecoveryTestHelper::kWALFileOffset + 1);
ASSERT_LE(recovered_row_count, max);
}
}
}
}
}
@ -8863,17 +8945,26 @@ TEST_F(DBTest, kPointInTimeRecovery) {
// - We expect to open the data store under all scenarios
// - We expect to have recovered records past the corruption zone
TEST_F(DBTest, kSkipAnyCorruptedRecords) {
for (auto trunc : {true, false}) {
for (int i = 0; i < 4; i++) {
const int jstart = RecoveryTestHelper::kWALFileOffset;
const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
for (auto trunc : {true, false}) { /* Corruption style */
for (int i = 0; i < 4; i++) { /* Corruption offset */
for (int j = jstart; j < jend; j++) { /* wal files */
// Fill data for testing
Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options);
// induce leading corruption
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc);
// Corrupt the WAL
RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3,
/*len%=*/.1, j, trunc);
// Verify behavior
options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
// Probe data for invariants
size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_LT(recovered_row_count, row_count);
@ -8883,6 +8974,7 @@ TEST_F(DBTest, kSkipAnyCorruptedRecords) {
}
}
}
}
TEST_F(DBTest, TransactionLogIteratorBatchOperations) {
do {