Fail recovery if filter provides more records than original and corresponding unit-test, fix naming conventions
This commit is contained in:
parent
7951b9b079
commit
32cdec634e
@ -1161,10 +1161,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
WriteBatch new_batch;
|
WriteBatch new_batch;
|
||||||
bool batch_changed = false;
|
bool batch_changed = false;
|
||||||
|
|
||||||
WalFilter::WalProcessingOption walProcessingOption =
|
WalFilter::WalProcessingOption wal_processing_option =
|
||||||
db_options_.wal_filter->LogRecord(batch, &new_batch, &batch_changed);
|
db_options_.wal_filter->LogRecord(batch, &new_batch, &batch_changed);
|
||||||
|
|
||||||
switch (walProcessingOption) {
|
switch (wal_processing_option) {
|
||||||
case WalFilter::WalProcessingOption::kContinueProcessing:
|
case WalFilter::WalProcessingOption::kContinueProcessing:
|
||||||
//do nothing, proceeed normally
|
//do nothing, proceeed normally
|
||||||
break;
|
break;
|
||||||
@ -1206,13 +1206,15 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
int new_count = WriteBatchInternal::Count(&new_batch);
|
int new_count = WriteBatchInternal::Count(&new_batch);
|
||||||
int original_count = WriteBatchInternal::Count(&batch);
|
int original_count = WriteBatchInternal::Count(&batch);
|
||||||
if (new_count > original_count) {
|
if (new_count > original_count) {
|
||||||
// Question: should this be treated as an error ??
|
Log(InfoLogLevel::FATAL_LEVEL, db_options_.info_log,
|
||||||
// Would it cause problems if #num records > diff in seq#?
|
|
||||||
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
|
|
||||||
"Recovering log #%" PRIu64 " mode %d log filter %s returned "
|
"Recovering log #%" PRIu64 " mode %d log filter %s returned "
|
||||||
"more records (%d) than original (%d)", log_number,
|
"more records (%d) than original (%d) which is not allowed. "
|
||||||
db_options_.wal_recovery_mode, db_options_.wal_filter->Name(),
|
"Aborting recovery.",
|
||||||
new_count, original_count);
|
log_number, db_options_.wal_recovery_mode,
|
||||||
|
db_options_.wal_filter->Name(), new_count, original_count);
|
||||||
|
status = Status::NotSupported("More than original # of records "
|
||||||
|
"returned by Wal Filter ", db_options_.wal_filter->Name());
|
||||||
|
return status;
|
||||||
}
|
}
|
||||||
// Set the same sequence number in the new_batch
|
// Set the same sequence number in the new_batch
|
||||||
// as the original batch.
|
// as the original batch.
|
||||||
|
302
db/db_test.cc
302
db/db_test.cc
@ -9892,25 +9892,25 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
|
|||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
namespace {
|
namespace {
|
||||||
void ValidateKeyExistence(DB* db,
|
void ValidateKeyExistence(DB* db,
|
||||||
const std::vector<Slice>& keysMustExist,
|
const std::vector<Slice>& keys_must_exist,
|
||||||
const std::vector<Slice>& keysMustNotExist) {
|
const std::vector<Slice>& keys_must_not_exist) {
|
||||||
// Ensure that expected keys exist
|
// Ensure that expected keys exist
|
||||||
std::vector<std::string> values;
|
std::vector<std::string> values;
|
||||||
if (keysMustExist.size() > 0) {
|
if (keys_must_exist.size() > 0) {
|
||||||
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
|
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
|
||||||
keysMustExist,
|
keys_must_exist,
|
||||||
&values);
|
&values);
|
||||||
for (size_t i = 0; i < keysMustExist.size(); i++) {
|
for (size_t i = 0; i < keys_must_exist.size(); i++) {
|
||||||
ASSERT_OK(status_list[i]);
|
ASSERT_OK(status_list[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that given keys don't exist
|
// Ensure that given keys don't exist
|
||||||
if (keysMustNotExist.size() > 0) {
|
if (keys_must_not_exist.size() > 0) {
|
||||||
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
|
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
|
||||||
keysMustNotExist,
|
keys_must_not_exist,
|
||||||
&values);
|
&values);
|
||||||
for (size_t i = 0; i < keysMustNotExist.size(); i++) {
|
for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
|
||||||
ASSERT_TRUE(status_list[i].IsNotFound());
|
ASSERT_TRUE(status_list[i].IsNotFound());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -9922,37 +9922,37 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
class TestWalFilter : public WalFilter {
|
class TestWalFilter : public WalFilter {
|
||||||
private:
|
private:
|
||||||
// Processing option that is requested to be applied at the given index
|
// Processing option that is requested to be applied at the given index
|
||||||
WalFilter::WalProcessingOption WalProcessingOption_;
|
WalFilter::WalProcessingOption wal_processing_option_;
|
||||||
// Index at which to apply WalProcessingOption_
|
// Index at which to apply wal_processing_option_
|
||||||
// At other indexes default WalProcessingOption::kContinueProcessing is
|
// At other indexes default wal_processing_option::kContinueProcessing is
|
||||||
// returned.
|
// returned.
|
||||||
size_t applyOptionAtRecordIndex_;
|
size_t apply_option_at_record_index_;
|
||||||
// Current record index, incremented with each record encountered.
|
// Current record index, incremented with each record encountered.
|
||||||
size_t currentRecordIndex_;
|
size_t current_record_index_;
|
||||||
public:
|
public:
|
||||||
TestWalFilter(WalFilter::WalProcessingOption WalProcessingOption,
|
TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
|
||||||
size_t applyOptionForRecordIndex) :
|
size_t apply_option_for_record_index) :
|
||||||
WalProcessingOption_(WalProcessingOption),
|
wal_processing_option_(wal_processing_option),
|
||||||
applyOptionAtRecordIndex_(applyOptionForRecordIndex),
|
apply_option_at_record_index_(apply_option_for_record_index),
|
||||||
currentRecordIndex_(0) { }
|
current_record_index_(0) { }
|
||||||
|
|
||||||
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
|
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
|
||||||
WriteBatch* new_batch, bool* batch_changed) const override {
|
WriteBatch* new_batch, bool* batch_changed) const override {
|
||||||
WalFilter::WalProcessingOption optionToReturn;
|
WalFilter::WalProcessingOption option_to_return;
|
||||||
|
|
||||||
if (currentRecordIndex_ == applyOptionAtRecordIndex_) {
|
if (current_record_index_ == apply_option_at_record_index_) {
|
||||||
optionToReturn = WalProcessingOption_;
|
option_to_return = wal_processing_option_;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
optionToReturn = WalProcessingOption::kContinueProcessing;
|
option_to_return = WalProcessingOption::kContinueProcessing;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter is passed as a const object for RocksDB to not modify the
|
// Filter is passed as a const object for RocksDB to not modify the
|
||||||
// object, however we modify it for our own purpose here and hence
|
// object, however we modify it for our own purpose here and hence
|
||||||
// cast the constness away.
|
// cast the constness away.
|
||||||
(const_cast<TestWalFilter*>(this)->currentRecordIndex_)++;
|
(const_cast<TestWalFilter*>(this)->current_record_index_)++;
|
||||||
|
|
||||||
return optionToReturn;
|
return option_to_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual const char* Name() const override {
|
virtual const char* Name() const override {
|
||||||
@ -9961,14 +9961,14 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Create 3 batches with two keys each
|
// Create 3 batches with two keys each
|
||||||
std::vector<std::vector<std::string>> batchKeys(3);
|
std::vector<std::vector<std::string>> batch_keys(3);
|
||||||
|
|
||||||
batchKeys[0].push_back("key1");
|
batch_keys[0].push_back("key1");
|
||||||
batchKeys[0].push_back("key2");
|
batch_keys[0].push_back("key2");
|
||||||
batchKeys[1].push_back("key3");
|
batch_keys[1].push_back("key3");
|
||||||
batchKeys[1].push_back("key4");
|
batch_keys[1].push_back("key4");
|
||||||
batchKeys[2].push_back("key5");
|
batch_keys[2].push_back("key5");
|
||||||
batchKeys[2].push_back("key6");
|
batch_keys[2].push_back("key6");
|
||||||
|
|
||||||
// Test with all WAL processing options
|
// Test with all WAL processing options
|
||||||
for (int option = 0;
|
for (int option = 0;
|
||||||
@ -9979,29 +9979,29 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
CreateAndReopenWithCF({ "pikachu" }, options);
|
CreateAndReopenWithCF({ "pikachu" }, options);
|
||||||
|
|
||||||
// Write given keys in given batches
|
// Write given keys in given batches
|
||||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
WriteBatch batch;
|
WriteBatch batch;
|
||||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
batch.Put(handles_[0], batchKeys[i][j], DummyString(1024));
|
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
|
||||||
}
|
}
|
||||||
dbfull()->Write(WriteOptions(), &batch);
|
dbfull()->Write(WriteOptions(), &batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
WalFilter::WalProcessingOption WalProcessingOption =
|
WalFilter::WalProcessingOption wal_processing_option =
|
||||||
static_cast<WalFilter::WalProcessingOption>(option);
|
static_cast<WalFilter::WalProcessingOption>(option);
|
||||||
|
|
||||||
// Create a test filter that would apply WalProcessingOption at the first
|
// Create a test filter that would apply wal_processing_option at the first
|
||||||
// record
|
// record
|
||||||
size_t applyOptionForRecordIndex = 1;
|
size_t apply_option_for_record_index = 1;
|
||||||
TestWalFilter testWalFilter(WalProcessingOption,
|
TestWalFilter test_wal_filter(wal_processing_option,
|
||||||
applyOptionForRecordIndex);
|
apply_option_for_record_index);
|
||||||
|
|
||||||
// Reopen database with option to use WAL filter
|
// Reopen database with option to use WAL filter
|
||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
options.wal_filter = &testWalFilter;
|
options.wal_filter = &test_wal_filter;
|
||||||
Status status = TryReopenWithColumnFamilies({ "default", "pikachu" },
|
Status status = TryReopenWithColumnFamilies({ "default", "pikachu" },
|
||||||
options);
|
options);
|
||||||
if (WalProcessingOption ==
|
if (wal_processing_option ==
|
||||||
WalFilter::WalProcessingOption::kCorruptedRecord) {
|
WalFilter::WalProcessingOption::kCorruptedRecord) {
|
||||||
assert(!status.ok());
|
assert(!status.ok());
|
||||||
// In case of corruption we can turn off paranoid_checks to reopen
|
// In case of corruption we can turn off paranoid_checks to reopen
|
||||||
@ -10014,32 +10014,32 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
|
|
||||||
// Compute which keys we expect to be found
|
// Compute which keys we expect to be found
|
||||||
// and which we expect not to be found after recovery.
|
// and which we expect not to be found after recovery.
|
||||||
std::vector<Slice> keysMustExist;
|
std::vector<Slice> keys_must_exist;
|
||||||
std::vector<Slice> keysMustNotExist;
|
std::vector<Slice> keys_must_not_exist;
|
||||||
switch (WalProcessingOption) {
|
switch (wal_processing_option) {
|
||||||
case WalFilter::WalProcessingOption::kCorruptedRecord:
|
case WalFilter::WalProcessingOption::kCorruptedRecord:
|
||||||
case WalFilter::WalProcessingOption::kContinueProcessing: {
|
case WalFilter::WalProcessingOption::kContinueProcessing: {
|
||||||
fprintf(stderr, "Testing with complete WAL processing\n");
|
fprintf(stderr, "Testing with complete WAL processing\n");
|
||||||
//we expect all records to be processed
|
//we expect all records to be processed
|
||||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
keysMustExist.push_back(Slice(batchKeys[i][j]));
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
|
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
|
||||||
fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
|
fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
|
||||||
applyOptionForRecordIndex);
|
apply_option_for_record_index);
|
||||||
// We expect the record with applyOptionForRecordIndex to be not
|
// We expect the record with apply_option_for_record_index to be not
|
||||||
// found.
|
// found.
|
||||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
if (i == applyOptionForRecordIndex) {
|
if (i == apply_option_for_record_index) {
|
||||||
keysMustNotExist.push_back(Slice(batchKeys[i][j]));
|
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
keysMustExist.push_back(Slice(batchKeys[i][j]));
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -10047,16 +10047,16 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
}
|
}
|
||||||
case WalFilter::WalProcessingOption::kStopReplay: {
|
case WalFilter::WalProcessingOption::kStopReplay: {
|
||||||
fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n",
|
fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n",
|
||||||
applyOptionForRecordIndex);
|
apply_option_for_record_index);
|
||||||
// We expect records beyond applyOptionForRecordIndex to be not
|
// We expect records beyond apply_option_for_record_index to be not
|
||||||
// found.
|
// found.
|
||||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
if (i >= applyOptionForRecordIndex) {
|
if (i >= apply_option_for_record_index) {
|
||||||
keysMustNotExist.push_back(Slice(batchKeys[i][j]));
|
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
keysMustExist.push_back(Slice(batchKeys[i][j]));
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -10066,15 +10066,14 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
assert(false); //unhandled case
|
assert(false); //unhandled case
|
||||||
}
|
}
|
||||||
|
|
||||||
bool checkedAfterReopen = false;
|
bool checked_after_reopen = false;
|
||||||
|
|
||||||
while (true)
|
while (true) {
|
||||||
{
|
|
||||||
// Ensure that expected keys exists
|
// Ensure that expected keys exists
|
||||||
// and not expected keys don't exist after recovery
|
// and not expected keys don't exist after recovery
|
||||||
ValidateKeyExistence(db_, keysMustExist, keysMustNotExist);
|
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
|
||||||
|
|
||||||
if (checkedAfterReopen) {
|
if (checked_after_reopen) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -10084,7 +10083,7 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
||||||
|
|
||||||
checkedAfterReopen = true;
|
checked_after_reopen = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -10093,21 +10092,21 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
class ChangeBatchHandler : public WriteBatch::Handler {
|
class ChangeBatchHandler : public WriteBatch::Handler {
|
||||||
private:
|
private:
|
||||||
// Batch to insert keys in
|
// Batch to insert keys in
|
||||||
WriteBatch* newWriteBatch_;
|
WriteBatch* new_write_batch_;
|
||||||
// Number of keys to add in the new batch
|
// Number of keys to add in the new batch
|
||||||
size_t m_numKeysToAddInNewBatch;
|
size_t num_keys_to_add_in_new_batch_;
|
||||||
// Number of keys added to new batch
|
// Number of keys added to new batch
|
||||||
size_t m_numKeysAdded;
|
size_t num_keys_added_;
|
||||||
public:
|
public:
|
||||||
ChangeBatchHandler(WriteBatch* newWriteBatch,
|
ChangeBatchHandler(WriteBatch* new_write_batch,
|
||||||
size_t numKeysToAddInNewBatch) :
|
size_t num_keys_to_add_in_new_batch) :
|
||||||
newWriteBatch_(newWriteBatch),
|
new_write_batch_(new_write_batch),
|
||||||
m_numKeysToAddInNewBatch(numKeysToAddInNewBatch),
|
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
|
||||||
m_numKeysAdded(0){ }
|
num_keys_added_(0){ }
|
||||||
virtual void Put(const Slice& key, const Slice& value) override {
|
virtual void Put(const Slice& key, const Slice& value) override {
|
||||||
if (m_numKeysAdded < m_numKeysToAddInNewBatch) {
|
if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
|
||||||
newWriteBatch_->Put(key, value);
|
new_write_batch_->Put(key, value);
|
||||||
++m_numKeysAdded;
|
++num_keys_added_;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -10115,24 +10114,24 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
class TestWalFilterWithChangeBatch : public WalFilter {
|
class TestWalFilterWithChangeBatch : public WalFilter {
|
||||||
private:
|
private:
|
||||||
// Index at which to start changing records
|
// Index at which to start changing records
|
||||||
size_t m_changeRecordsFromIndex;
|
size_t change_records_from_index_;
|
||||||
// Number of keys to add in the new batch
|
// Number of keys to add in the new batch
|
||||||
size_t m_numKeysToAddInNewBatch;
|
size_t num_keys_to_add_in_new_batch_;
|
||||||
// Current record index, incremented with each record encountered.
|
// Current record index, incremented with each record encountered.
|
||||||
size_t currentRecordIndex_;
|
size_t current_record_index_;
|
||||||
public:
|
public:
|
||||||
TestWalFilterWithChangeBatch(
|
TestWalFilterWithChangeBatch(
|
||||||
size_t changeRecordsFromIndex,
|
size_t change_records_from_index,
|
||||||
size_t numKeysToAddInNewBatch) :
|
size_t num_keys_to_add_in_new_batch) :
|
||||||
m_changeRecordsFromIndex(changeRecordsFromIndex),
|
change_records_from_index_(change_records_from_index),
|
||||||
m_numKeysToAddInNewBatch(numKeysToAddInNewBatch),
|
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
|
||||||
currentRecordIndex_(0) { }
|
current_record_index_(0) { }
|
||||||
|
|
||||||
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
|
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
|
||||||
WriteBatch* new_batch, bool* batch_changed) const override {
|
WriteBatch* new_batch, bool* batch_changed) const override {
|
||||||
|
|
||||||
if (currentRecordIndex_ >= m_changeRecordsFromIndex) {
|
if (current_record_index_ >= change_records_from_index_) {
|
||||||
ChangeBatchHandler handler(new_batch, m_numKeysToAddInNewBatch);
|
ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
|
||||||
batch.Iterate(&handler);
|
batch.Iterate(&handler);
|
||||||
*batch_changed = true;
|
*batch_changed = true;
|
||||||
}
|
}
|
||||||
@ -10140,7 +10139,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
// Filter is passed as a const object for RocksDB to not modify the
|
// Filter is passed as a const object for RocksDB to not modify the
|
||||||
// object, however we modify it for our own purpose here and hence
|
// object, however we modify it for our own purpose here and hence
|
||||||
// cast the constness away.
|
// cast the constness away.
|
||||||
(const_cast<TestWalFilterWithChangeBatch*>(this)->currentRecordIndex_)++;
|
(const_cast<TestWalFilterWithChangeBatch*>(this)->current_record_index_)++;
|
||||||
|
|
||||||
return WalProcessingOption::kContinueProcessing;
|
return WalProcessingOption::kContinueProcessing;
|
||||||
}
|
}
|
||||||
@ -10150,66 +10149,65 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
std::vector<std::vector<std::string>> batchKeys(3);
|
std::vector<std::vector<std::string>> batch_keys(3);
|
||||||
|
|
||||||
batchKeys[0].push_back("key1");
|
batch_keys[0].push_back("key1");
|
||||||
batchKeys[0].push_back("key2");
|
batch_keys[0].push_back("key2");
|
||||||
batchKeys[1].push_back("key3");
|
batch_keys[1].push_back("key3");
|
||||||
batchKeys[1].push_back("key4");
|
batch_keys[1].push_back("key4");
|
||||||
batchKeys[2].push_back("key5");
|
batch_keys[2].push_back("key5");
|
||||||
batchKeys[2].push_back("key6");
|
batch_keys[2].push_back("key6");
|
||||||
|
|
||||||
Options options = OptionsForLogIterTest();
|
Options options = OptionsForLogIterTest();
|
||||||
DestroyAndReopen(options);
|
DestroyAndReopen(options);
|
||||||
CreateAndReopenWithCF({ "pikachu" }, options);
|
CreateAndReopenWithCF({ "pikachu" }, options);
|
||||||
|
|
||||||
// Write given keys in given batches
|
// Write given keys in given batches
|
||||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
WriteBatch batch;
|
WriteBatch batch;
|
||||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
batch.Put(handles_[0], batchKeys[i][j], DummyString(1024));
|
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
|
||||||
}
|
}
|
||||||
dbfull()->Write(WriteOptions(), &batch);
|
dbfull()->Write(WriteOptions(), &batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a test filter that would apply WalProcessingOption at the first
|
// Create a test filter that would apply wal_processing_option at the first
|
||||||
// record
|
// record
|
||||||
size_t changeRecordsFromIndex = 1;
|
size_t change_records_from_index = 1;
|
||||||
size_t numKeysToAddInNewBatch = 1;
|
size_t num_keys_to_add_in_new_batch = 1;
|
||||||
TestWalFilterWithChangeBatch testWalFilterWithChangeBatch(
|
TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
|
||||||
changeRecordsFromIndex, numKeysToAddInNewBatch);
|
change_records_from_index, num_keys_to_add_in_new_batch);
|
||||||
|
|
||||||
// Reopen database with option to use WAL filter
|
// Reopen database with option to use WAL filter
|
||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
options.wal_filter = &testWalFilterWithChangeBatch;
|
options.wal_filter = &test_wal_filter_with_change_batch;
|
||||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
||||||
|
|
||||||
// Ensure that all keys exist before m_changeRecordsFromIndex
|
// Ensure that all keys exist before change_records_from_index_
|
||||||
// And after that index only single key exists
|
// And after that index only single key exists
|
||||||
// as our filter adds only single key for each batch
|
// as our filter adds only single key for each batch
|
||||||
std::vector<Slice> keysMustExist;
|
std::vector<Slice> keys_must_exist;
|
||||||
std::vector<Slice> keysMustNotExist;
|
std::vector<Slice> keys_must_not_exist;
|
||||||
|
|
||||||
for (size_t i = 0; i < batchKeys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
for (size_t j = 0; j < batchKeys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
if (i >= changeRecordsFromIndex && j >= numKeysToAddInNewBatch) {
|
if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
|
||||||
keysMustNotExist.push_back(Slice(batchKeys[i][j]));
|
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
keysMustExist.push_back(Slice(batchKeys[i][j]));
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool checkedAfterReopen = false;
|
bool checked_after_reopen = false;
|
||||||
|
|
||||||
while (true)
|
while (true) {
|
||||||
{
|
|
||||||
// Ensure that expected keys exists
|
// Ensure that expected keys exists
|
||||||
// and not expected keys don't exist after recovery
|
// and not expected keys don't exist after recovery
|
||||||
ValidateKeyExistence(db_, keysMustExist, keysMustNotExist);
|
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
|
||||||
|
|
||||||
if (checkedAfterReopen) {
|
if (checked_after_reopen) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -10219,9 +10217,75 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
||||||
|
|
||||||
checkedAfterReopen = true;
|
checked_after_reopen = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) {
|
||||||
|
class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
|
||||||
|
public:
|
||||||
|
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
|
||||||
|
WriteBatch* new_batch, bool* batch_changed) const override {
|
||||||
|
*new_batch = batch;
|
||||||
|
new_batch->Put("key_extra", "value_extra");
|
||||||
|
*batch_changed = true;
|
||||||
|
return WalProcessingOption::kContinueProcessing;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override {
|
||||||
|
return "WalFilterTestWithChangeBatchExtraKeys";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<std::vector<std::string>> batch_keys(3);
|
||||||
|
|
||||||
|
batch_keys[0].push_back("key1");
|
||||||
|
batch_keys[0].push_back("key2");
|
||||||
|
batch_keys[1].push_back("key3");
|
||||||
|
batch_keys[1].push_back("key4");
|
||||||
|
batch_keys[2].push_back("key5");
|
||||||
|
batch_keys[2].push_back("key6");
|
||||||
|
|
||||||
|
Options options = OptionsForLogIterTest();
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
CreateAndReopenWithCF({ "pikachu" }, options);
|
||||||
|
|
||||||
|
// Write given keys in given batches
|
||||||
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
|
WriteBatch batch;
|
||||||
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
|
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
|
||||||
|
}
|
||||||
|
dbfull()->Write(WriteOptions(), &batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a test filter that would add extra keys
|
||||||
|
TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys;
|
||||||
|
|
||||||
|
// Reopen database with option to use WAL filter
|
||||||
|
options = OptionsForLogIterTest();
|
||||||
|
options.wal_filter = &test_wal_filter_extra_keys;
|
||||||
|
Status status =
|
||||||
|
TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
||||||
|
ASSERT_TRUE(status.IsNotSupported());
|
||||||
|
|
||||||
|
// Reopen without filter, now reopen should succeed - previous
|
||||||
|
// attempt to open must not have altered the db.
|
||||||
|
options = OptionsForLogIterTest();
|
||||||
|
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
||||||
|
|
||||||
|
std::vector<Slice> keys_must_exist;
|
||||||
|
std::vector<Slice> keys_must_not_exist; //empty vector
|
||||||
|
|
||||||
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
|
||||||
|
}
|
||||||
|
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
|
@ -40,6 +40,20 @@ public:
|
|||||||
// * stop log replay
|
// * stop log replay
|
||||||
// (by returning kStop replay) - please note that this implies
|
// (by returning kStop replay) - please note that this implies
|
||||||
// discarding the logs from current record onwards.
|
// discarding the logs from current record onwards.
|
||||||
|
//
|
||||||
|
// @params batch batch encountered in the log during recovery
|
||||||
|
// @params new_batch new_batch to populate if filter wants to change
|
||||||
|
// the batch (for example to filter some records out,
|
||||||
|
// or alter some records).
|
||||||
|
// Please note that the new batch MUST NOT contain
|
||||||
|
// more records than original, else recovery would
|
||||||
|
// be failed.
|
||||||
|
// @params batch_changed Whether batch was changed by the filter.
|
||||||
|
// It must be set to true if new_batch was populated,
|
||||||
|
// else new_batch has no effect.
|
||||||
|
// @returns Processing option for the current record.
|
||||||
|
// Please see WalProcessingOption enum above for
|
||||||
|
// details.
|
||||||
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
||||||
WriteBatch* new_batch, bool* batch_changed) const = 0;
|
WriteBatch* new_batch, bool* batch_changed) const = 0;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user