Adding log filter to inspect and filter log records on recovery
This commit is contained in:
parent
9eaff629e3
commit
59a0c219bb
@ -63,6 +63,7 @@
|
||||
#include "rocksdb/status.h"
|
||||
#include "rocksdb/table.h"
|
||||
#include "rocksdb/version.h"
|
||||
#include "rocksdb/wal_filter.h"
|
||||
#include "table/block.h"
|
||||
#include "table/block_based_table_factory.h"
|
||||
#include "table/merger.h"
|
||||
@ -1153,6 +1154,26 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
||||
}
|
||||
WriteBatchInternal::SetContents(&batch, record);
|
||||
|
||||
if (db_options_.wal_filter != nullptr) {
|
||||
WALFilter::WALProcessingOption walProcessingOption =
|
||||
db_options_.wal_filter->LogRecord(batch);
|
||||
|
||||
switch (walProcessingOption) {
|
||||
case WALFilter::WALProcessingOption::kContinueProcessing:
|
||||
//do nothing, proceeed normally
|
||||
break;
|
||||
case WALFilter::WALProcessingOption::kIgnoreCurrentRecord:
|
||||
//skip current record
|
||||
continue;
|
||||
case WALFilter::WALProcessingOption::kStopReplay:
|
||||
//skip current record and stop replay
|
||||
continue_replay_log = false;
|
||||
continue;
|
||||
default:
|
||||
assert(false); //unhandled case
|
||||
}
|
||||
}
|
||||
|
||||
// If column family was not found, it might mean that the WAL write
|
||||
// batch references to the column family that was dropped after the
|
||||
// insert. We don't want to fail the whole write batch in that case --
|
||||
|
177
db/db_test.cc
177
db/db_test.cc
@ -48,6 +48,7 @@
|
||||
#include "rocksdb/table.h"
|
||||
#include "rocksdb/table_properties.h"
|
||||
#include "rocksdb/thread_status.h"
|
||||
#include "rocksdb/wal_filter.h"
|
||||
#include "rocksdb/utilities/write_batch_with_index.h"
|
||||
#include "rocksdb/utilities/checkpoint.h"
|
||||
#include "rocksdb/utilities/optimistic_transaction_db.h"
|
||||
@ -9669,6 +9670,182 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
|
||||
ASSERT_EQ(true, done.load());
|
||||
}
|
||||
|
||||
TEST_F(DBTest, WalFilterTest) {
|
||||
class TestWALFilter : public WALFilter {
|
||||
private:
|
||||
// Processing option that is requested to be applied at the given index
|
||||
WALFilter::WALProcessingOption m_walProcessingOption;
|
||||
// Index at which to apply m_walProcessingOption
|
||||
// At other indexes default WALProcessingOption::kContinueProcessing is
|
||||
// returned.
|
||||
size_t m_applyOptionAtRecordIndex;
|
||||
// Current record index, incremented with each record encountered.
|
||||
size_t m_currentRecordIndex;
|
||||
public:
|
||||
TestWALFilter(WALFilter::WALProcessingOption walProcessingOption,
|
||||
size_t applyOptionForRecordIndex) :
|
||||
m_walProcessingOption(walProcessingOption),
|
||||
m_applyOptionAtRecordIndex(applyOptionForRecordIndex),
|
||||
m_currentRecordIndex(0) { }
|
||||
|
||||
virtual WALProcessingOption LogRecord(const WriteBatch & batch) const override {
|
||||
WALFilter::WALProcessingOption optionToReturn;
|
||||
|
||||
if (m_currentRecordIndex == m_applyOptionAtRecordIndex) {
|
||||
optionToReturn = m_walProcessingOption;
|
||||
}
|
||||
else {
|
||||
optionToReturn = WALProcessingOption::kContinueProcessing;
|
||||
}
|
||||
|
||||
// Filter is passed as a const object for RocksDB to not modify the
|
||||
// object, however we modify it for our own purpose here and hence
|
||||
// cast the constness away.
|
||||
(const_cast<TestWALFilter*>(this)->m_currentRecordIndex)++;
|
||||
|
||||
return optionToReturn;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "TestWALFilter";
|
||||
}
|
||||
};
|
||||
|
||||
// Create 3 batches with two keys each
|
||||
std::vector<std::vector<std::string>> batchKeys(3);
|
||||
|
||||
batchKeys[0].push_back("key1");
|
||||
batchKeys[0].push_back("key2");
|
||||
batchKeys[1].push_back("key3");
|
||||
batchKeys[1].push_back("key4");
|
||||
batchKeys[2].push_back("key5");
|
||||
batchKeys[2].push_back("key6");
|
||||
|
||||
// Test with all WAL processing options
|
||||
for (char option = 0;
|
||||
option < static_cast<char>(WALFilter::WALProcessingOption::kWALProcessingOptionMax);
|
||||
option++) {
|
||||
Options options = OptionsForLogIterTest();
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({ "pikachu" }, options);
|
||||
{
|
||||
// Write given keys in given batches
|
||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
||||
WriteBatch batch;
|
||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
||||
batch.Put(handles_[0], batchKeys[i][j], DummyString(1024));
|
||||
}
|
||||
dbfull()->Write(WriteOptions(), &batch);
|
||||
}
|
||||
|
||||
WALFilter::WALProcessingOption walProcessingOption =
|
||||
static_cast<WALFilter::WALProcessingOption>(option);
|
||||
|
||||
// Create a test filter that would apply walProcessingOption at the first
|
||||
// record
|
||||
size_t applyOptionForRecordIndex = 1;
|
||||
TestWALFilter testWalFilter(walProcessingOption,
|
||||
applyOptionForRecordIndex);
|
||||
|
||||
// Reopen database with option to use WAL filter
|
||||
options = OptionsForLogIterTest();
|
||||
options.wal_filter = &testWalFilter;
|
||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
||||
|
||||
// Compute which keys we expect to be found
|
||||
// and which we expect not to be found after recovery.
|
||||
std::vector<Slice> keysMustExist;
|
||||
std::vector<Slice> keysMustNotExist;
|
||||
switch (walProcessingOption) {
|
||||
case WALFilter::WALProcessingOption::kContinueProcessing: {
|
||||
fprintf(stderr, "Testing with complete WAL processing,"
|
||||
" i.e. the default case\n");
|
||||
//we expect all records to be processed
|
||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
||||
keysMustExist.push_back(Slice(batchKeys[i][j]));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case WALFilter::WALProcessingOption::kIgnoreCurrentRecord: {
|
||||
fprintf(stderr, "Testing with ignoring record %d only\n",
|
||||
applyOptionForRecordIndex);
|
||||
// We expect the record with applyOptionForRecordIndex to be not
|
||||
// found.
|
||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
||||
if (i == applyOptionForRecordIndex) {
|
||||
keysMustNotExist.push_back(Slice(batchKeys[i][j]));
|
||||
}
|
||||
else {
|
||||
keysMustExist.push_back(Slice(batchKeys[i][j]));
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case WALFilter::WALProcessingOption::kStopReplay: {
|
||||
fprintf(stderr, "Testing with stopping replay from record %d\n",
|
||||
applyOptionForRecordIndex);
|
||||
// We expect records beyond applyOptionForRecordIndex to be not
|
||||
// found.
|
||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
||||
if (i >= applyOptionForRecordIndex) {
|
||||
keysMustNotExist.push_back(Slice(batchKeys[i][j]));
|
||||
}
|
||||
else {
|
||||
keysMustExist.push_back(Slice(batchKeys[i][j]));
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
assert(false); //unhandled case
|
||||
}
|
||||
|
||||
bool checkedAfterReopen = false;
|
||||
|
||||
while (true)
|
||||
{
|
||||
// Ensure that expected keys exist after recovery
|
||||
std::vector<std::string> values;
|
||||
if (keysMustExist.size() > 0) {
|
||||
std::vector<Status> status_list = dbfull()->MultiGet(ReadOptions(),
|
||||
keysMustExist,
|
||||
&values);
|
||||
for (size_t i = 0; i < keysMustExist.size(); i++) {
|
||||
ASSERT_OK(status_list[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that discarded keys don't exist after recovery
|
||||
if (keysMustNotExist.size() > 0) {
|
||||
std::vector<Status> status_list = dbfull()->MultiGet(ReadOptions(),
|
||||
keysMustNotExist,
|
||||
&values);
|
||||
for (size_t i = 0; i < keysMustNotExist.size(); i++) {
|
||||
ASSERT_TRUE(status_list[i].IsNotFound());
|
||||
}
|
||||
}
|
||||
|
||||
if (checkedAfterReopen) {
|
||||
break;
|
||||
}
|
||||
|
||||
//reopen database again to make sure previous log(s) are not used
|
||||
//(even if they were skipped)
|
||||
//reopn database with option to use WAL filter
|
||||
options = OptionsForLogIterTest();
|
||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
||||
|
||||
checkedAfterReopen = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif
|
||||
|
@ -47,6 +47,7 @@ class Slice;
|
||||
class SliceTransform;
|
||||
class Statistics;
|
||||
class InternalKeyComparator;
|
||||
class WALFilter;
|
||||
|
||||
// DB contents are stored in a set of blocks, each of which holds a
|
||||
// sequence of key,value pairs. Each block may be compressed before
|
||||
@ -1132,6 +1133,13 @@ struct DBOptions {
|
||||
// Default: nullptr (disabled)
|
||||
// Not supported in ROCKSDB_LITE mode!
|
||||
std::shared_ptr<Cache> row_cache;
|
||||
|
||||
// A filter object supplied to be invoked while processing write-ahead-logs
|
||||
// (WALs) during recovery. The filter provides a way to inspect log
|
||||
// records, ignoring a particular record or skipping replay.
|
||||
// The filter is invoked at startup and is invoked from a single-thread
|
||||
// currently.
|
||||
const WALFilter * wal_filter;
|
||||
};
|
||||
|
||||
// Options to control the behavior of a database (passed to DB::Open)
|
||||
|
63
include/rocksdb/wal_filter.h
Normal file
63
include/rocksdb/wal_filter.h
Normal file
@ -0,0 +1,63 @@
|
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#ifndef STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_
|
||||
#define STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class WriteBatch;
|
||||
|
||||
// WALFilter allows an application to inspect write-ahead-log (WAL)
|
||||
// records or modify their processing on recovery.
|
||||
// Please see the details below.
|
||||
class WALFilter {
|
||||
public:
|
||||
enum class WALProcessingOption {
|
||||
// Continue processing as usual
|
||||
kContinueProcessing = 0,
|
||||
// Ignore the current record but continue processing of log(s)
|
||||
kIgnoreCurrentRecord = 1,
|
||||
// Stop replay of logs and discard logs
|
||||
// Logs won't be replayed on subsequent recovery
|
||||
kStopReplay = 2,
|
||||
// Marker for enum count
|
||||
kWALProcessingOptionMax = 3
|
||||
};
|
||||
|
||||
virtual ~WALFilter() { };
|
||||
|
||||
// LogRecord is invoked for each log record encountered for all the logs
|
||||
// during replay on logs on recovery. This method can be used to:
|
||||
// * inspect the record (using the batch parameter)
|
||||
// * ignoring current record
|
||||
// (by returning WALProcessingOption::kIgnoreCurrentRecord)
|
||||
// * stop log replay
|
||||
// (by returning kStop replay) - please note that this implies
|
||||
// discarding the logs from current record onwards.
|
||||
virtual WALProcessingOption LogRecord(const WriteBatch & batch) const = 0;
|
||||
|
||||
// Returns a name that identifies this WAL filter.
|
||||
// The name will be printed to LOG file on start up for diagnosis.
|
||||
virtual const char* Name() const = 0;
|
||||
};
|
||||
|
||||
// Default implementation of WALFilter that does not alter WAL processing
|
||||
class DefaultWALFilter : WALFilter {
|
||||
virtual WALProcessingOption LogRecord(const WriteBatch & batch) const override {
|
||||
return WALProcessingOption::kContinueProcessing;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "DefaultWALFilter";
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_
|
@ -29,6 +29,7 @@
|
||||
#include "rocksdb/slice_transform.h"
|
||||
#include "rocksdb/table.h"
|
||||
#include "rocksdb/table_properties.h"
|
||||
#include "rocksdb/wal_filter.h"
|
||||
#include "table/block_based_table_factory.h"
|
||||
#include "util/compression.h"
|
||||
#include "util/statistics.h"
|
||||
@ -254,7 +255,8 @@ DBOptions::DBOptions()
|
||||
enable_thread_tracking(false),
|
||||
delayed_write_rate(1024U * 1024U),
|
||||
skip_stats_update_on_db_open(false),
|
||||
wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords) {
|
||||
wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords),
|
||||
wal_filter(nullptr) {
|
||||
}
|
||||
|
||||
DBOptions::DBOptions(const Options& options)
|
||||
@ -309,7 +311,8 @@ DBOptions::DBOptions(const Options& options)
|
||||
delayed_write_rate(options.delayed_write_rate),
|
||||
skip_stats_update_on_db_open(options.skip_stats_update_on_db_open),
|
||||
wal_recovery_mode(options.wal_recovery_mode),
|
||||
row_cache(options.row_cache) {}
|
||||
row_cache(options.row_cache),
|
||||
wal_filter(options.wal_filter){}
|
||||
|
||||
static const char* const access_hints[] = {
|
||||
"NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"
|
||||
@ -406,6 +409,8 @@ void DBOptions::Dump(Logger* log) const {
|
||||
} else {
|
||||
Header(log, " Options.row_cache: None");
|
||||
}
|
||||
Header(log, " Options.wal_filter: %s",
|
||||
wal_filter ? wal_filter->Name() : "None");
|
||||
} // DBOptions::Dump
|
||||
|
||||
void ColumnFamilyOptions::Dump(Logger* log) const {
|
||||
|
Loading…
Reference in New Issue
Block a user