Adding test for contiguous WAL detection

Summary:
Add a test to detect that when WAL gets truncated,
seq no's are checked to be contiguous.

This test is put in ColumnFamilyTest as it has the necessary
infrastructure/functions for flushing column families, which
we use to ensure 2 active WAL files

Test Plan:
This is a test, no feature has been added.
This test fails today and hence disabled

Reviewers: sdong

Reviewed By: sdong

Subscribers: lgalanis, dhruba, andrewkr, pritamdamania

Differential Revision: https://reviews.facebook.net/D59253
This commit is contained in:
Anirban Rahut 2016-06-06 14:40:36 -07:00
parent 098da83483
commit a73b26f601

View File

@ -13,6 +13,7 @@
#include <thread>
#include "db/db_impl.h"
#include "db/db_test_util.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
@ -26,6 +27,8 @@
namespace rocksdb {
static const int kValueSize = 1000;
namespace {
std::string RandomString(Random* rnd, int len) {
std::string r;
@ -70,6 +73,77 @@ class ColumnFamilyTest : public testing::Test {
delete env_;
}
// Return the value to associate with the specified key
Slice Value(int k, std::string* storage) {
if (k == 0) {
// Ugh. Random seed of 0 used to produce no entropy. This code
// preserves the implementation that was in place when all of the
// magic values in this file were picked.
*storage = std::string(kValueSize, ' ');
return Slice(*storage);
} else {
Random r(k);
return test::RandomString(&r, kValueSize, storage);
}
}
void Build(int base, int n, int flush_every = 0) {
std::string key_space, value_space;
WriteBatch batch;
for (int i = 0; i < n; i++) {
if (flush_every != 0 && i != 0 && i % flush_every == 0) {
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_FlushMemTable();
}
int keyi = base + i;
Slice key(DBTestBase::Key(keyi));
batch.Clear();
batch.Put(handles_[0], key, Value(keyi, &value_space));
batch.Put(handles_[1], key, Value(keyi, &value_space));
batch.Put(handles_[2], key, Value(keyi, &value_space));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
}
void CheckMissed() {
uint64_t next_expected = 0;
uint64_t missed = 0;
int bad_keys = 0;
int bad_values = 0;
int correct = 0;
std::string value_space;
for (int cf = 0; cf < 3; cf++) {
next_expected = 0;
Iterator* iter = db_->NewIterator(ReadOptions(false, true), handles_[cf]);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
uint64_t key;
Slice in(iter->key());
in.remove_prefix(3);
if (!ConsumeDecimalNumber(&in, &key) || !in.empty() ||
key < next_expected) {
bad_keys++;
continue;
}
missed += (key - next_expected);
next_expected = key + 1;
if (iter->value() != Value(static_cast<int>(key), &value_space)) {
bad_values++;
} else {
correct++;
}
}
delete iter;
}
ASSERT_EQ(0, bad_keys);
ASSERT_EQ(0, bad_values);
ASSERT_EQ(0, missed);
(void)correct;
}
void Close() {
for (auto h : handles_) {
if (h) {
@ -2597,6 +2671,80 @@ TEST_F(ColumnFamilyTest, LogSyncConflictFlush) {
Close();
}
#endif
// this test is placed here, because the infrastructure for Column Family
// test is being used to ensure a roll of wal files.
// Basic idea is to test that WAL truncation is being detected and not
// ignored
TEST_F(ColumnFamilyTest, DISABLED_LogTruncationTest) {
Open();
CreateColumnFamiliesAndReopen({"one", "two"});
Build(0, 100);
// Flush the 0th column family to force a roll of the wal log
Flush(0);
// Add some more entries
Build(100, 100);
std::vector<std::string> filenames;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
// collect wal files
std::vector<std::string> logfs;
for (size_t i = 0; i < filenames.size(); i++) {
uint64_t number;
FileType type;
if (!(ParseFileName(filenames[i], &number, &type))) continue;
if (type != kLogFile) continue;
logfs.push_back(filenames[i]);
}
std::sort(logfs.begin(), logfs.end());
ASSERT_GE(logfs.size(), 2);
// Take the last but one file, and truncate it
std::string fpath = dbname_ + "/" + logfs[logfs.size() - 2];
std::vector<std::string> names_save = names_;
uint64_t fsize;
ASSERT_OK(env_->GetFileSize(fpath, &fsize));
ASSERT_GT(fsize, 0);
Close();
std::string backup_logs = dbname_ + "/backup_logs";
std::string t_fpath = backup_logs + "/" + logfs[logfs.size() - 2];
ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
// Not sure how easy it is to make this data driven.
// need to read back the WAL file and truncate last 10
// entries
CopyFile(fpath, t_fpath, fsize - 9180);
ASSERT_OK(env_->DeleteFile(fpath));
ASSERT_OK(env_->RenameFile(t_fpath, fpath));
db_options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
OpenReadOnly(names_save);
CheckMissed();
Close();
Open(names_save);
CheckMissed();
Close();
// cleanup
env_->DeleteDir(backup_logs);
}
} // namespace rocksdb
int main(int argc, char** argv) {