Handle multiple batches in single log record - allow app to return a new batch + allow app to return corrupted record status

This commit is contained in:
Praveen Rao 2015-10-19 13:27:40 -07:00
parent cc4d13e0a8
commit 0c59691dde
5 changed files with 339 additions and 154 deletions

View File

@ -1156,22 +1156,67 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (db_options_.wal_filter != nullptr) { if (db_options_.wal_filter != nullptr) {
WALFilter::WALProcessingOption walProcessingOption = WriteBatch new_batch;
db_options_.wal_filter->LogRecord(batch); bool batch_changed = false;
WalFilter::WalProcessingOption walProcessingOption =
db_options_.wal_filter->LogRecord(batch, &new_batch, &batch_changed);
switch (walProcessingOption) { switch (walProcessingOption) {
case WALFilter::WALProcessingOption::kContinueProcessing: case WalFilter::WalProcessingOption::kContinueProcessing:
//do nothing, proceeed normally //do nothing, proceeed normally
break; break;
case WALFilter::WALProcessingOption::kIgnoreCurrentRecord: case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
//skip current record //skip current record
continue; continue;
case WALFilter::WALProcessingOption::kStopReplay: case WalFilter::WalProcessingOption::kStopReplay:
//skip current record and stop replay //skip current record and stop replay
continue_replay_log = false; continue_replay_log = false;
continue; continue;
default: case WalFilter::WalProcessingOption::kCorruptedRecord: {
status = Status::Corruption("Corruption reported by Wal Filter ",
db_options_.wal_filter->Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
reporter.Corruption(record.size(), status);
continue;
}
break;
}
default: {
assert(false); //unhandled case assert(false); //unhandled case
status = Status::NotSupported("Unknown WalProcessingOption returned"
" by Wal Filter ", db_options_.wal_filter->Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
}
else {
// Ignore the error with current record processing.
continue;
}
}
}
if (batch_changed) {
// Make sure that the count in the new batch is
// within the orignal count.
int new_count = WriteBatchInternal::Count(&new_batch);
int original_count = WriteBatchInternal::Count(&batch);
if (new_count > original_count) {
// Question: should this be treated as an error ??
// Would it cause problems if #num records > diff in seq#?
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d log filter %s returned "
"more records (%d) than original (%d)", log_number,
db_options_.wal_recovery_mode, db_options_.wal_filter->Name(),
new_count, original_count);
}
// Set the same sequence number in the new_batch
// as the original batch.
WriteBatchInternal::SetSequence(&new_batch,
WriteBatchInternal::Sequence(&batch));
batch = new_batch;
} }
} }
#endif //ROCKSDB_LITE #endif //ROCKSDB_LITE

View File

@ -9888,44 +9888,73 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
namespace {
void ValidateKeyExistence(DB* db,
const std::vector<Slice>& keysMustExist,
const std::vector<Slice>& keysMustNotExist) {
// Ensure that expected keys exist
std::vector<std::string> values;
if (keysMustExist.size() > 0) {
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
keysMustExist,
&values);
for (size_t i = 0; i < keysMustExist.size(); i++) {
ASSERT_OK(status_list[i]);
}
}
// Ensure that given keys don't exist
if (keysMustNotExist.size() > 0) {
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
keysMustNotExist,
&values);
for (size_t i = 0; i < keysMustNotExist.size(); i++) {
ASSERT_TRUE(status_list[i].IsNotFound());
}
}
}
} //namespace
TEST_F(DBTest, WalFilterTest) { TEST_F(DBTest, WalFilterTest) {
class TestWALFilter : public WALFilter { class TestWalFilter : public WalFilter {
private: private:
// Processing option that is requested to be applied at the given index // Processing option that is requested to be applied at the given index
WALFilter::WALProcessingOption m_walProcessingOption; WalFilter::WalProcessingOption WalProcessingOption_;
// Index at which to apply m_walProcessingOption // Index at which to apply WalProcessingOption_
// At other indexes default WALProcessingOption::kContinueProcessing is // At other indexes default WalProcessingOption::kContinueProcessing is
// returned. // returned.
size_t m_applyOptionAtRecordIndex; size_t applyOptionAtRecordIndex_;
// Current record index, incremented with each record encountered. // Current record index, incremented with each record encountered.
size_t m_currentRecordIndex; size_t currentRecordIndex_;
public: public:
TestWALFilter(WALFilter::WALProcessingOption walProcessingOption, TestWalFilter(WalFilter::WalProcessingOption WalProcessingOption,
size_t applyOptionForRecordIndex) : size_t applyOptionForRecordIndex) :
m_walProcessingOption(walProcessingOption), WalProcessingOption_(WalProcessingOption),
m_applyOptionAtRecordIndex(applyOptionForRecordIndex), applyOptionAtRecordIndex_(applyOptionForRecordIndex),
m_currentRecordIndex(0) { } currentRecordIndex_(0) { }
virtual WALProcessingOption LogRecord(const WriteBatch & batch) const override { virtual WalProcessingOption LogRecord(const WriteBatch & batch,
WALFilter::WALProcessingOption optionToReturn; WriteBatch* new_batch, bool* batch_changed) const override {
WalFilter::WalProcessingOption optionToReturn;
if (m_currentRecordIndex == m_applyOptionAtRecordIndex) { if (currentRecordIndex_ == applyOptionAtRecordIndex_) {
optionToReturn = m_walProcessingOption; optionToReturn = WalProcessingOption_;
} }
else { else {
optionToReturn = WALProcessingOption::kContinueProcessing; optionToReturn = WalProcessingOption::kContinueProcessing;
} }
// Filter is passed as a const object for RocksDB to not modify the // Filter is passed as a const object for RocksDB to not modify the
// object, however we modify it for our own purpose here and hence // object, however we modify it for our own purpose here and hence
// cast the constness away. // cast the constness away.
(const_cast<TestWALFilter*>(this)->m_currentRecordIndex)++; (const_cast<TestWalFilter*>(this)->currentRecordIndex_)++;
return optionToReturn; return optionToReturn;
} }
virtual const char* Name() const override { virtual const char* Name() const override {
return "TestWALFilter"; return "TestWalFilter";
} }
}; };
@ -9940,13 +9969,13 @@ TEST_F(DBTest, WalFilterTest) {
batchKeys[2].push_back("key6"); batchKeys[2].push_back("key6");
// Test with all WAL processing options // Test with all WAL processing options
for (char option = 0; for (int option = 0;
option < static_cast<char>(WALFilter::WALProcessingOption::kWALProcessingOptionMax); option < static_cast<int>(WalFilter::WalProcessingOption::kWalProcessingOptionMax);
option++) { option++) {
Options options = OptionsForLogIterTest(); Options options = OptionsForLogIterTest();
DestroyAndReopen(options); DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options); CreateAndReopenWithCF({ "pikachu" }, options);
{
// Write given keys in given batches // Write given keys in given batches
for (size_t i = 0; i < batchKeys.size(); i++) { for (size_t i = 0; i < batchKeys.size(); i++) {
WriteBatch batch; WriteBatch batch;
@ -9956,28 +9985,39 @@ TEST_F(DBTest, WalFilterTest) {
dbfull()->Write(WriteOptions(), &batch); dbfull()->Write(WriteOptions(), &batch);
} }
WALFilter::WALProcessingOption walProcessingOption = WalFilter::WalProcessingOption WalProcessingOption =
static_cast<WALFilter::WALProcessingOption>(option); static_cast<WalFilter::WalProcessingOption>(option);
// Create a test filter that would apply walProcessingOption at the first // Create a test filter that would apply WalProcessingOption at the first
// record // record
size_t applyOptionForRecordIndex = 1; size_t applyOptionForRecordIndex = 1;
TestWALFilter testWalFilter(walProcessingOption, TestWalFilter testWalFilter(WalProcessingOption,
applyOptionForRecordIndex); applyOptionForRecordIndex);
// Reopen database with option to use WAL filter // Reopen database with option to use WAL filter
options = OptionsForLogIterTest(); options = OptionsForLogIterTest();
options.wal_filter = &testWalFilter; options.wal_filter = &testWalFilter;
Status status = TryReopenWithColumnFamilies({ "default", "pikachu" },
options);
if (WalProcessingOption ==
WalFilter::WalProcessingOption::kCorruptedRecord) {
assert(!status.ok());
// In case of corruption we can turn off paranoid_checks to reopen
// databse
options.paranoid_checks = false;
ReopenWithColumnFamilies({ "default", "pikachu" }, options); ReopenWithColumnFamilies({ "default", "pikachu" }, options);
} else {
assert(status.ok());
}
// Compute which keys we expect to be found // Compute which keys we expect to be found
// and which we expect not to be found after recovery. // and which we expect not to be found after recovery.
std::vector<Slice> keysMustExist; std::vector<Slice> keysMustExist;
std::vector<Slice> keysMustNotExist; std::vector<Slice> keysMustNotExist;
switch (walProcessingOption) { switch (WalProcessingOption) {
case WALFilter::WALProcessingOption::kContinueProcessing: { case WalFilter::WalProcessingOption::kCorruptedRecord:
fprintf(stderr, "Testing with complete WAL processing," case WalFilter::WalProcessingOption::kContinueProcessing: {
" i.e. the default case\n"); fprintf(stderr, "Testing with complete WAL processing\n");
//we expect all records to be processed //we expect all records to be processed
for (size_t i = 0; i < batchKeys.size(); i++) { for (size_t i = 0; i < batchKeys.size(); i++) {
for (size_t j = 0; j < batchKeys[i].size(); j++) { for (size_t j = 0; j < batchKeys[i].size(); j++) {
@ -9986,7 +10026,7 @@ TEST_F(DBTest, WalFilterTest) {
} }
break; break;
} }
case WALFilter::WALProcessingOption::kIgnoreCurrentRecord: { case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n", fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
applyOptionForRecordIndex); applyOptionForRecordIndex);
// We expect the record with applyOptionForRecordIndex to be not // We expect the record with applyOptionForRecordIndex to be not
@ -10003,7 +10043,7 @@ TEST_F(DBTest, WalFilterTest) {
} }
break; break;
} }
case WALFilter::WALProcessingOption::kStopReplay: { case WalFilter::WalProcessingOption::kStopReplay: {
fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n", fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n",
applyOptionForRecordIndex); applyOptionForRecordIndex);
// We expect records beyond applyOptionForRecordIndex to be not // We expect records beyond applyOptionForRecordIndex to be not
@ -10028,26 +10068,9 @@ TEST_F(DBTest, WalFilterTest) {
while (true) while (true)
{ {
// Ensure that expected keys exist after recovery // Ensure that expected keys exists
std::vector<std::string> values; // and not expected keys don't exist after recovery
if (keysMustExist.size() > 0) { ValidateKeyExistence(db_, keysMustExist, keysMustNotExist);
std::vector<Status> status_list = dbfull()->MultiGet(ReadOptions(),
keysMustExist,
&values);
for (size_t i = 0; i < keysMustExist.size(); i++) {
ASSERT_OK(status_list[i]);
}
}
// Ensure that discarded keys don't exist after recovery
if (keysMustNotExist.size() > 0) {
std::vector<Status> status_list = dbfull()->MultiGet(ReadOptions(),
keysMustNotExist,
&values);
for (size_t i = 0; i < keysMustNotExist.size(); i++) {
ASSERT_TRUE(status_list[i].IsNotFound());
}
}
if (checkedAfterReopen) { if (checkedAfterReopen) {
break; break;
@ -10062,6 +10085,139 @@ TEST_F(DBTest, WalFilterTest) {
checkedAfterReopen = true; checkedAfterReopen = true;
} }
} }
}
TEST_F(DBTest, WalFilterTestWithChangeBatch) {
class ChangeBatchHandler : public WriteBatch::Handler {
private:
// Whether we have already added a key to new batch
size_t m_numKeysAdded;
// Batch to insert keys in
WriteBatch* newWriteBatch_;
// Number of keys to add in the new batch
size_t m_numKeysToAddInNewBatch;
public:
ChangeBatchHandler(WriteBatch* newWriteBatch,
size_t numKeysToAddInNewBatch) :
newWriteBatch_(newWriteBatch),
m_numKeysToAddInNewBatch(numKeysToAddInNewBatch),
m_numKeysAdded(0){ }
virtual void Put(const Slice& key, const Slice& value) override {
if (m_numKeysAdded < m_numKeysToAddInNewBatch) {
newWriteBatch_->Put(key, value);
++m_numKeysAdded;
}
}
};
class TestWalFilterWithChangeBatch : public WalFilter {
private:
// Index at which to start changing records
size_t m_changeRecordsFromIndex;
// Number of keys to add in the new batch
size_t m_numKeysToAddInNewBatch;
// Current record index, incremented with each record encountered.
size_t currentRecordIndex_;
public:
TestWalFilterWithChangeBatch(
size_t changeRecordsFromIndex,
size_t numKeysToAddInNewBatch) :
m_changeRecordsFromIndex(changeRecordsFromIndex),
m_numKeysToAddInNewBatch(numKeysToAddInNewBatch),
currentRecordIndex_(0) { }
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
WriteBatch* new_batch, bool* batch_changed) const override {
if (currentRecordIndex_ >= m_changeRecordsFromIndex) {
ChangeBatchHandler handler(new_batch, m_numKeysToAddInNewBatch);
batch.Iterate(&handler);
*batch_changed = true;
}
// Filter is passed as a const object for RocksDB to not modify the
// object, however we modify it for our own purpose here and hence
// cast the constness away.
(const_cast<TestWalFilterWithChangeBatch*>(this)->currentRecordIndex_)++;
return WalProcessingOption::kContinueProcessing;
}
virtual const char* Name() const override {
return "TestWalFilterWithChangeBatch";
}
};
std::vector<std::vector<std::string>> batchKeys(3);
batchKeys[0].push_back("key1");
batchKeys[0].push_back("key2");
batchKeys[1].push_back("key3");
batchKeys[1].push_back("key4");
batchKeys[2].push_back("key5");
batchKeys[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options);
// Write given keys in given batches
for (size_t i = 0; i < batchKeys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batchKeys[i].size(); j++) {
batch.Put(handles_[0], batchKeys[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
// Create a test filter that would apply WalProcessingOption at the first
// record
size_t changeRecordsFromIndex = 1;
size_t numKeysToAddInNewBatch = 1;
TestWalFilterWithChangeBatch testWalFilterWithChangeBatch(
changeRecordsFromIndex, numKeysToAddInNewBatch);
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &testWalFilterWithChangeBatch;
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
// Ensure that all keys exist before m_changeRecordsFromIndex
// And after that index only single key exists
// as our filter adds only single key for each batch
std::vector<Slice> keysMustExist;
std::vector<Slice> keysMustNotExist;
for (size_t i = 0; i < batchKeys.size(); i++) {
for (size_t j = 0; j < batchKeys[i].size(); j++) {
if (i >= changeRecordsFromIndex && j >= numKeysToAddInNewBatch) {
keysMustNotExist.push_back(Slice(batchKeys[i][j]));
}
else {
keysMustExist.push_back(Slice(batchKeys[i][j]));
}
}
}
bool checkedAfterReopen = false;
while (true)
{
// Ensure that expected keys exists
// and not expected keys don't exist after recovery
ValidateKeyExistence(db_, keysMustExist, keysMustNotExist);
if (checkedAfterReopen) {
break;
}
//reopen database again to make sure previous log(s) are not used
//(even if they were skipped)
//reopn database with option to use WAL filter
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
checkedAfterReopen = true;
} }
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

View File

@ -46,7 +46,7 @@ class Slice;
class SliceTransform; class SliceTransform;
class Statistics; class Statistics;
class InternalKeyComparator; class InternalKeyComparator;
class WALFilter; class WalFilter;
// DB contents are stored in a set of blocks, each of which holds a // DB contents are stored in a set of blocks, each of which holds a
// sequence of key,value pairs. Each block may be compressed before // sequence of key,value pairs. Each block may be compressed before
@ -1138,7 +1138,7 @@ struct DBOptions {
// records, ignoring a particular record or skipping replay. // records, ignoring a particular record or skipping replay.
// The filter is invoked at startup and is invoked from a single-thread // The filter is invoked at startup and is invoked from a single-thread
// currently. // currently.
const WALFilter * wal_filter; const WalFilter* wal_filter;
#endif //ROCKSDB_LITE #endif //ROCKSDB_LITE
}; };

View File

@ -2,14 +2,8 @@
// This source code is licensed under the BSD-style license found in the // This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_ #pragma once
#define STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_
#ifndef ROCKSDB_LITE
namespace rocksdb { namespace rocksdb {
@ -18,9 +12,9 @@ class WriteBatch;
// WALFilter allows an application to inspect write-ahead-log (WAL) // WALFilter allows an application to inspect write-ahead-log (WAL)
// records or modify their processing on recovery. // records or modify their processing on recovery.
// Please see the details below. // Please see the details below.
class WALFilter { class WalFilter {
public: public:
enum class WALProcessingOption { enum class WalProcessingOption {
// Continue processing as usual // Continue processing as usual
kContinueProcessing = 0, kContinueProcessing = 0,
// Ignore the current record but continue processing of log(s) // Ignore the current record but continue processing of log(s)
@ -28,40 +22,30 @@ public:
// Stop replay of logs and discard logs // Stop replay of logs and discard logs
// Logs won't be replayed on subsequent recovery // Logs won't be replayed on subsequent recovery
kStopReplay = 2, kStopReplay = 2,
// Corrupted record detected by filter
kCorruptedRecord = 3,
// Marker for enum count // Marker for enum count
kWALProcessingOptionMax = 3 kWalProcessingOptionMax = 4
}; };
virtual ~WALFilter() { }; virtual ~WalFilter() { };
// LogRecord is invoked for each log record encountered for all the logs // LogRecord is invoked for each log record encountered for all the logs
// during replay on logs on recovery. This method can be used to: // during replay on logs on recovery. This method can be used to:
// * inspect the record (using the batch parameter) // * inspect the record (using the batch parameter)
// * ignoring current record // * ignoring current record
// (by returning WALProcessingOption::kIgnoreCurrentRecord) // (by returning WalProcessingOption::kIgnoreCurrentRecord)
// * reporting corrupted record
// (by returning WalProcessingOption::kCorruptedRecord)
// * stop log replay // * stop log replay
// (by returning kStop replay) - please note that this implies // (by returning kStop replay) - please note that this implies
// discarding the logs from current record onwards. // discarding the logs from current record onwards.
virtual WALProcessingOption LogRecord(const WriteBatch & batch) const = 0; virtual WalProcessingOption LogRecord(const WriteBatch& batch,
WriteBatch* new_batch, bool* batch_changed) const = 0;
// Returns a name that identifies this WAL filter. // Returns a name that identifies this WAL filter.
// The name will be printed to LOG file on start up for diagnosis. // The name will be printed to LOG file on start up for diagnosis.
virtual const char* Name() const = 0; virtual const char* Name() const = 0;
}; };
// Default implementation of WALFilter that does not alter WAL processing
class DefaultWALFilter : WALFilter {
virtual WALProcessingOption LogRecord(const WriteBatch & batch) const override {
return WALProcessingOption::kContinueProcessing;
}
virtual const char* Name() const override {
return "DefaultWALFilter";
}
};
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE
#endif // STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_

View File

@ -258,7 +258,7 @@ DBOptions::DBOptions()
skip_stats_update_on_db_open(false), skip_stats_update_on_db_open(false),
wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords) wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords)
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
,wal_filter(nullptr) , wal_filter(nullptr)
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
{ {
} }
@ -318,7 +318,7 @@ DBOptions::DBOptions(const Options& options)
wal_recovery_mode(options.wal_recovery_mode), wal_recovery_mode(options.wal_recovery_mode),
row_cache(options.row_cache) row_cache(options.row_cache)
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
,wal_filter(options.wal_filter) , wal_filter(options.wal_filter)
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
{ {
} }