From 0c59691dde4e8f0bf3937933ae234b21b0299df3 Mon Sep 17 00:00:00 2001 From: Praveen Rao Date: Mon, 19 Oct 2015 13:27:40 -0700 Subject: [PATCH] Handle multiple batches in single log record - allow app to return a new batch + allow app to return corrupted record status --- db/db_impl.cc | 57 ++++- db/db_test.cc | 388 ++++++++++++++++++++++++----------- include/rocksdb/options.h | 4 +- include/rocksdb/wal_filter.h | 40 ++-- util/options.cc | 4 +- 5 files changed, 339 insertions(+), 154 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index c6b3f2c14..2a3c5a5db 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1156,22 +1156,67 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, #ifndef ROCKSDB_LITE if (db_options_.wal_filter != nullptr) { - WALFilter::WALProcessingOption walProcessingOption = - db_options_.wal_filter->LogRecord(batch); + WriteBatch new_batch; + bool batch_changed = false; + + WalFilter::WalProcessingOption walProcessingOption = + db_options_.wal_filter->LogRecord(batch, &new_batch, &batch_changed); switch (walProcessingOption) { - case WALFilter::WALProcessingOption::kContinueProcessing: + case WalFilter::WalProcessingOption::kContinueProcessing: //do nothing, proceeed normally break; - case WALFilter::WALProcessingOption::kIgnoreCurrentRecord: + case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: //skip current record continue; - case WALFilter::WALProcessingOption::kStopReplay: + case WalFilter::WalProcessingOption::kStopReplay: //skip current record and stop replay continue_replay_log = false; continue; - default: + case WalFilter::WalProcessingOption::kCorruptedRecord: { + status = Status::Corruption("Corruption reported by Wal Filter ", + db_options_.wal_filter->Name()); + MaybeIgnoreError(&status); + if (!status.ok()) { + reporter.Corruption(record.size(), status); + continue; + } + break; + } + default: { assert(false); //unhandled case + status = Status::NotSupported("Unknown WalProcessingOption returned" + " by Wal Filter ", db_options_.wal_filter->Name()); + MaybeIgnoreError(&status); + if (!status.ok()) { + return status; + } + else { + // Ignore the error with current record processing. + continue; + } + } + } + + if (batch_changed) { + // Make sure that the count in the new batch is + // within the orignal count. + int new_count = WriteBatchInternal::Count(&new_batch); + int original_count = WriteBatchInternal::Count(&batch); + if (new_count > original_count) { + // Question: should this be treated as an error ?? + // Would it cause problems if #num records > diff in seq#? + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "Recovering log #%" PRIu64 " mode %d log filter %s returned " + "more records (%d) than original (%d)", log_number, + db_options_.wal_recovery_mode, db_options_.wal_filter->Name(), + new_count, original_count); + } + // Set the same sequence number in the new_batch + // as the original batch. + WriteBatchInternal::SetSequence(&new_batch, + WriteBatchInternal::Sequence(&batch)); + batch = new_batch; } } #endif //ROCKSDB_LITE diff --git a/db/db_test.cc b/db/db_test.cc index c45fc68f0..beaa47af2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -9888,44 +9888,73 @@ TEST_F(DBTest, PauseBackgroundWorkTest) { } #ifndef ROCKSDB_LITE +namespace { + void ValidateKeyExistence(DB* db, + const std::vector& keysMustExist, + const std::vector& keysMustNotExist) { + // Ensure that expected keys exist + std::vector values; + if (keysMustExist.size() > 0) { + std::vector status_list = db->MultiGet(ReadOptions(), + keysMustExist, + &values); + for (size_t i = 0; i < keysMustExist.size(); i++) { + ASSERT_OK(status_list[i]); + } + } + + // Ensure that given keys don't exist + if (keysMustNotExist.size() > 0) { + std::vector status_list = db->MultiGet(ReadOptions(), + keysMustNotExist, + &values); + for (size_t i = 0; i < keysMustNotExist.size(); i++) { + ASSERT_TRUE(status_list[i].IsNotFound()); + } + } + } + +} //namespace + TEST_F(DBTest, WalFilterTest) { - class TestWALFilter : public WALFilter { + class TestWalFilter : public WalFilter { private: // Processing option that is requested to be applied at the given index - WALFilter::WALProcessingOption m_walProcessingOption; - // Index at which to apply m_walProcessingOption - // At other indexes default WALProcessingOption::kContinueProcessing is + WalFilter::WalProcessingOption WalProcessingOption_; + // Index at which to apply WalProcessingOption_ + // At other indexes default WalProcessingOption::kContinueProcessing is // returned. - size_t m_applyOptionAtRecordIndex; + size_t applyOptionAtRecordIndex_; // Current record index, incremented with each record encountered. - size_t m_currentRecordIndex; + size_t currentRecordIndex_; public: - TestWALFilter(WALFilter::WALProcessingOption walProcessingOption, + TestWalFilter(WalFilter::WalProcessingOption WalProcessingOption, size_t applyOptionForRecordIndex) : - m_walProcessingOption(walProcessingOption), - m_applyOptionAtRecordIndex(applyOptionForRecordIndex), - m_currentRecordIndex(0) { } + WalProcessingOption_(WalProcessingOption), + applyOptionAtRecordIndex_(applyOptionForRecordIndex), + currentRecordIndex_(0) { } - virtual WALProcessingOption LogRecord(const WriteBatch & batch) const override { - WALFilter::WALProcessingOption optionToReturn; + virtual WalProcessingOption LogRecord(const WriteBatch & batch, + WriteBatch* new_batch, bool* batch_changed) const override { + WalFilter::WalProcessingOption optionToReturn; - if (m_currentRecordIndex == m_applyOptionAtRecordIndex) { - optionToReturn = m_walProcessingOption; + if (currentRecordIndex_ == applyOptionAtRecordIndex_) { + optionToReturn = WalProcessingOption_; } else { - optionToReturn = WALProcessingOption::kContinueProcessing; + optionToReturn = WalProcessingOption::kContinueProcessing; } // Filter is passed as a const object for RocksDB to not modify the // object, however we modify it for our own purpose here and hence // cast the constness away. - (const_cast(this)->m_currentRecordIndex)++; + (const_cast(this)->currentRecordIndex_)++; return optionToReturn; } virtual const char* Name() const override { - return "TestWALFilter"; + return "TestWalFilter"; } }; @@ -9940,128 +9969,255 @@ TEST_F(DBTest, WalFilterTest) { batchKeys[2].push_back("key6"); // Test with all WAL processing options - for (char option = 0; - option < static_cast(WALFilter::WALProcessingOption::kWALProcessingOptionMax); + for (int option = 0; + option < static_cast(WalFilter::WalProcessingOption::kWalProcessingOptionMax); option++) { Options options = OptionsForLogIterTest(); DestroyAndReopen(options); CreateAndReopenWithCF({ "pikachu" }, options); - { - // Write given keys in given batches - for (size_t i = 0; i < batchKeys.size(); i++) { - WriteBatch batch; - for (size_t j = 0; j < batchKeys[i].size(); j++) { - batch.Put(handles_[0], batchKeys[i][j], DummyString(1024)); - } - dbfull()->Write(WriteOptions(), &batch); + + // Write given keys in given batches + for (size_t i = 0; i < batchKeys.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batchKeys[i].size(); j++) { + batch.Put(handles_[0], batchKeys[i][j], DummyString(1024)); } + dbfull()->Write(WriteOptions(), &batch); + } - WALFilter::WALProcessingOption walProcessingOption = - static_cast(option); + WalFilter::WalProcessingOption WalProcessingOption = + static_cast(option); - // Create a test filter that would apply walProcessingOption at the first - // record - size_t applyOptionForRecordIndex = 1; - TestWALFilter testWalFilter(walProcessingOption, - applyOptionForRecordIndex); + // Create a test filter that would apply WalProcessingOption at the first + // record + size_t applyOptionForRecordIndex = 1; + TestWalFilter testWalFilter(WalProcessingOption, + applyOptionForRecordIndex); - // Reopen database with option to use WAL filter - options = OptionsForLogIterTest(); - options.wal_filter = &testWalFilter; + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &testWalFilter; + Status status = TryReopenWithColumnFamilies({ "default", "pikachu" }, + options); + if (WalProcessingOption == + WalFilter::WalProcessingOption::kCorruptedRecord) { + assert(!status.ok()); + // In case of corruption we can turn off paranoid_checks to reopen + // databse + options.paranoid_checks = false; ReopenWithColumnFamilies({ "default", "pikachu" }, options); + } else { + assert(status.ok()); + } - // Compute which keys we expect to be found - // and which we expect not to be found after recovery. - std::vector keysMustExist; - std::vector keysMustNotExist; - switch (walProcessingOption) { - case WALFilter::WALProcessingOption::kContinueProcessing: { - fprintf(stderr, "Testing with complete WAL processing," - " i.e. the default case\n"); - //we expect all records to be processed - for (size_t i = 0; i < batchKeys.size(); i++) { - for (size_t j = 0; j < batchKeys[i].size(); j++) { + // Compute which keys we expect to be found + // and which we expect not to be found after recovery. + std::vector keysMustExist; + std::vector keysMustNotExist; + switch (WalProcessingOption) { + case WalFilter::WalProcessingOption::kCorruptedRecord: + case WalFilter::WalProcessingOption::kContinueProcessing: { + fprintf(stderr, "Testing with complete WAL processing\n"); + //we expect all records to be processed + for (size_t i = 0; i < batchKeys.size(); i++) { + for (size_t j = 0; j < batchKeys[i].size(); j++) { + keysMustExist.push_back(Slice(batchKeys[i][j])); + } + } + break; + } + case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: { + fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n", + applyOptionForRecordIndex); + // We expect the record with applyOptionForRecordIndex to be not + // found. + for (size_t i = 0; i < batchKeys.size(); i++) { + for (size_t j = 0; j < batchKeys[i].size(); j++) { + if (i == applyOptionForRecordIndex) { + keysMustNotExist.push_back(Slice(batchKeys[i][j])); + } + else { keysMustExist.push_back(Slice(batchKeys[i][j])); } } - break; } - case WALFilter::WALProcessingOption::kIgnoreCurrentRecord: { - fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n", - applyOptionForRecordIndex); - // We expect the record with applyOptionForRecordIndex to be not - // found. - for (size_t i = 0; i < batchKeys.size(); i++) { - for (size_t j = 0; j < batchKeys[i].size(); j++) { - if (i == applyOptionForRecordIndex) { - keysMustNotExist.push_back(Slice(batchKeys[i][j])); - } - else { - keysMustExist.push_back(Slice(batchKeys[i][j])); - } + break; + } + case WalFilter::WalProcessingOption::kStopReplay: { + fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n", + applyOptionForRecordIndex); + // We expect records beyond applyOptionForRecordIndex to be not + // found. + for (size_t i = 0; i < batchKeys.size(); i++) { + for (size_t j = 0; j < batchKeys[i].size(); j++) { + if (i >= applyOptionForRecordIndex) { + keysMustNotExist.push_back(Slice(batchKeys[i][j])); + } + else { + keysMustExist.push_back(Slice(batchKeys[i][j])); } } - break; } - case WALFilter::WALProcessingOption::kStopReplay: { - fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n", - applyOptionForRecordIndex); - // We expect records beyond applyOptionForRecordIndex to be not - // found. - for (size_t i = 0; i < batchKeys.size(); i++) { - for (size_t j = 0; j < batchKeys[i].size(); j++) { - if (i >= applyOptionForRecordIndex) { - keysMustNotExist.push_back(Slice(batchKeys[i][j])); - } - else { - keysMustExist.push_back(Slice(batchKeys[i][j])); - } - } - } - break; - } - default: - assert(false); //unhandled case + break; + } + default: + assert(false); //unhandled case + } + + bool checkedAfterReopen = false; + + while (true) + { + // Ensure that expected keys exists + // and not expected keys don't exist after recovery + ValidateKeyExistence(db_, keysMustExist, keysMustNotExist); + + if (checkedAfterReopen) { + break; } - bool checkedAfterReopen = false; + //reopen database again to make sure previous log(s) are not used + //(even if they were skipped) + //reopn database with option to use WAL filter + options = OptionsForLogIterTest(); + ReopenWithColumnFamilies({ "default", "pikachu" }, options); - while (true) - { - // Ensure that expected keys exist after recovery - std::vector values; - if (keysMustExist.size() > 0) { - std::vector status_list = dbfull()->MultiGet(ReadOptions(), - keysMustExist, - &values); - for (size_t i = 0; i < keysMustExist.size(); i++) { - ASSERT_OK(status_list[i]); - } - } + checkedAfterReopen = true; + } + } +} - // Ensure that discarded keys don't exist after recovery - if (keysMustNotExist.size() > 0) { - std::vector status_list = dbfull()->MultiGet(ReadOptions(), - keysMustNotExist, - &values); - for (size_t i = 0; i < keysMustNotExist.size(); i++) { - ASSERT_TRUE(status_list[i].IsNotFound()); - } - } - - if (checkedAfterReopen) { - break; - } - - //reopen database again to make sure previous log(s) are not used - //(even if they were skipped) - //reopn database with option to use WAL filter - options = OptionsForLogIterTest(); - ReopenWithColumnFamilies({ "default", "pikachu" }, options); - - checkedAfterReopen = true; +TEST_F(DBTest, WalFilterTestWithChangeBatch) { + class ChangeBatchHandler : public WriteBatch::Handler { + private: + // Whether we have already added a key to new batch + size_t m_numKeysAdded; + // Batch to insert keys in + WriteBatch* newWriteBatch_; + // Number of keys to add in the new batch + size_t m_numKeysToAddInNewBatch; + public: + ChangeBatchHandler(WriteBatch* newWriteBatch, + size_t numKeysToAddInNewBatch) : + newWriteBatch_(newWriteBatch), + m_numKeysToAddInNewBatch(numKeysToAddInNewBatch), + m_numKeysAdded(0){ } + virtual void Put(const Slice& key, const Slice& value) override { + if (m_numKeysAdded < m_numKeysToAddInNewBatch) { + newWriteBatch_->Put(key, value); + ++m_numKeysAdded; } } + }; + + class TestWalFilterWithChangeBatch : public WalFilter { + private: + // Index at which to start changing records + size_t m_changeRecordsFromIndex; + // Number of keys to add in the new batch + size_t m_numKeysToAddInNewBatch; + // Current record index, incremented with each record encountered. + size_t currentRecordIndex_; + public: + TestWalFilterWithChangeBatch( + size_t changeRecordsFromIndex, + size_t numKeysToAddInNewBatch) : + m_changeRecordsFromIndex(changeRecordsFromIndex), + m_numKeysToAddInNewBatch(numKeysToAddInNewBatch), + currentRecordIndex_(0) { } + + virtual WalProcessingOption LogRecord(const WriteBatch & batch, + WriteBatch* new_batch, bool* batch_changed) const override { + + if (currentRecordIndex_ >= m_changeRecordsFromIndex) { + ChangeBatchHandler handler(new_batch, m_numKeysToAddInNewBatch); + batch.Iterate(&handler); + *batch_changed = true; + } + + // Filter is passed as a const object for RocksDB to not modify the + // object, however we modify it for our own purpose here and hence + // cast the constness away. + (const_cast(this)->currentRecordIndex_)++; + + return WalProcessingOption::kContinueProcessing; + } + + virtual const char* Name() const override { + return "TestWalFilterWithChangeBatch"; + } + }; + + std::vector> batchKeys(3); + + batchKeys[0].push_back("key1"); + batchKeys[0].push_back("key2"); + batchKeys[1].push_back("key3"); + batchKeys[1].push_back("key4"); + batchKeys[2].push_back("key5"); + batchKeys[2].push_back("key6"); + + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({ "pikachu" }, options); + + // Write given keys in given batches + for (size_t i = 0; i < batchKeys.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batchKeys[i].size(); j++) { + batch.Put(handles_[0], batchKeys[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + // Create a test filter that would apply WalProcessingOption at the first + // record + size_t changeRecordsFromIndex = 1; + size_t numKeysToAddInNewBatch = 1; + TestWalFilterWithChangeBatch testWalFilterWithChangeBatch( + changeRecordsFromIndex, numKeysToAddInNewBatch); + + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &testWalFilterWithChangeBatch; + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + + // Ensure that all keys exist before m_changeRecordsFromIndex + // And after that index only single key exists + // as our filter adds only single key for each batch + std::vector keysMustExist; + std::vector keysMustNotExist; + + for (size_t i = 0; i < batchKeys.size(); i++) { + for (size_t j = 0; j < batchKeys[i].size(); j++) { + if (i >= changeRecordsFromIndex && j >= numKeysToAddInNewBatch) { + keysMustNotExist.push_back(Slice(batchKeys[i][j])); + } + else { + keysMustExist.push_back(Slice(batchKeys[i][j])); + } + } + } + + bool checkedAfterReopen = false; + + while (true) + { + // Ensure that expected keys exists + // and not expected keys don't exist after recovery + ValidateKeyExistence(db_, keysMustExist, keysMustNotExist); + + if (checkedAfterReopen) { + break; + } + + //reopen database again to make sure previous log(s) are not used + //(even if they were skipped) + //reopn database with option to use WAL filter + options = OptionsForLogIterTest(); + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + + checkedAfterReopen = true; } } #endif // ROCKSDB_LITE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 88b2280bf..6ce3d3ea8 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -46,7 +46,7 @@ class Slice; class SliceTransform; class Statistics; class InternalKeyComparator; -class WALFilter; +class WalFilter; // DB contents are stored in a set of blocks, each of which holds a // sequence of key,value pairs. Each block may be compressed before @@ -1138,7 +1138,7 @@ struct DBOptions { // records, ignoring a particular record or skipping replay. // The filter is invoked at startup and is invoked from a single-thread // currently. - const WALFilter * wal_filter; + const WalFilter* wal_filter; #endif //ROCKSDB_LITE }; diff --git a/include/rocksdb/wal_filter.h b/include/rocksdb/wal_filter.h index 0090c8b38..c30eb1277 100644 --- a/include/rocksdb/wal_filter.h +++ b/include/rocksdb/wal_filter.h @@ -2,14 +2,8 @@ // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -// Copyright (c) 2013 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. -#ifndef STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_ -#define STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_ - -#ifndef ROCKSDB_LITE +#pragma once namespace rocksdb { @@ -18,9 +12,9 @@ class WriteBatch; // WALFilter allows an application to inspect write-ahead-log (WAL) // records or modify their processing on recovery. // Please see the details below. -class WALFilter { +class WalFilter { public: - enum class WALProcessingOption { + enum class WalProcessingOption { // Continue processing as usual kContinueProcessing = 0, // Ignore the current record but continue processing of log(s) @@ -28,40 +22,30 @@ public: // Stop replay of logs and discard logs // Logs won't be replayed on subsequent recovery kStopReplay = 2, + // Corrupted record detected by filter + kCorruptedRecord = 3, // Marker for enum count - kWALProcessingOptionMax = 3 + kWalProcessingOptionMax = 4 }; - virtual ~WALFilter() { }; + virtual ~WalFilter() { }; // LogRecord is invoked for each log record encountered for all the logs // during replay on logs on recovery. This method can be used to: // * inspect the record (using the batch parameter) // * ignoring current record - // (by returning WALProcessingOption::kIgnoreCurrentRecord) + // (by returning WalProcessingOption::kIgnoreCurrentRecord) + // * reporting corrupted record + // (by returning WalProcessingOption::kCorruptedRecord) // * stop log replay // (by returning kStop replay) - please note that this implies // discarding the logs from current record onwards. - virtual WALProcessingOption LogRecord(const WriteBatch & batch) const = 0; + virtual WalProcessingOption LogRecord(const WriteBatch& batch, + WriteBatch* new_batch, bool* batch_changed) const = 0; // Returns a name that identifies this WAL filter. // The name will be printed to LOG file on start up for diagnosis. virtual const char* Name() const = 0; }; -// Default implementation of WALFilter that does not alter WAL processing -class DefaultWALFilter : WALFilter { - virtual WALProcessingOption LogRecord(const WriteBatch & batch) const override { - return WALProcessingOption::kContinueProcessing; - } - - virtual const char* Name() const override { - return "DefaultWALFilter"; - } -}; - } // namespace rocksdb - -#endif // ROCKSDB_LITE - -#endif // STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_ diff --git a/util/options.cc b/util/options.cc index 4384d011e..19acbc711 100644 --- a/util/options.cc +++ b/util/options.cc @@ -258,7 +258,7 @@ DBOptions::DBOptions() skip_stats_update_on_db_open(false), wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords) #ifndef ROCKSDB_LITE - ,wal_filter(nullptr) + , wal_filter(nullptr) #endif // ROCKSDB_LITE { } @@ -318,7 +318,7 @@ DBOptions::DBOptions(const Options& options) wal_recovery_mode(options.wal_recovery_mode), row_cache(options.row_cache) #ifndef ROCKSDB_LITE - ,wal_filter(options.wal_filter) + , wal_filter(options.wal_filter) #endif // ROCKSDB_LITE { }