diff --git a/db/db_impl.cc b/db/db_impl.cc index cfb2e7310..01cf750e0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1161,10 +1161,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, WriteBatch new_batch; bool batch_changed = false; - WalFilter::WalProcessingOption walProcessingOption = + WalFilter::WalProcessingOption wal_processing_option = db_options_.wal_filter->LogRecord(batch, &new_batch, &batch_changed); - switch (walProcessingOption) { + switch (wal_processing_option) { case WalFilter::WalProcessingOption::kContinueProcessing: //do nothing, proceeed normally break; @@ -1206,13 +1206,15 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, int new_count = WriteBatchInternal::Count(&new_batch); int original_count = WriteBatchInternal::Count(&batch); if (new_count > original_count) { - // Question: should this be treated as an error ?? - // Would it cause problems if #num records > diff in seq#? - Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + Log(InfoLogLevel::FATAL_LEVEL, db_options_.info_log, "Recovering log #%" PRIu64 " mode %d log filter %s returned " - "more records (%d) than original (%d)", log_number, - db_options_.wal_recovery_mode, db_options_.wal_filter->Name(), - new_count, original_count); + "more records (%d) than original (%d) which is not allowed. " + "Aborting recovery.", + log_number, db_options_.wal_recovery_mode, + db_options_.wal_filter->Name(), new_count, original_count); + status = Status::NotSupported("More than original # of records " + "returned by Wal Filter ", db_options_.wal_filter->Name()); + return status; } // Set the same sequence number in the new_batch // as the original batch. diff --git a/db/db_test.cc b/db/db_test.cc index 105fca55d..df5b3c69b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -9892,25 +9892,25 @@ TEST_F(DBTest, PauseBackgroundWorkTest) { #ifndef ROCKSDB_LITE namespace { void ValidateKeyExistence(DB* db, - const std::vector& keysMustExist, - const std::vector& keysMustNotExist) { + const std::vector& keys_must_exist, + const std::vector& keys_must_not_exist) { // Ensure that expected keys exist std::vector values; - if (keysMustExist.size() > 0) { + if (keys_must_exist.size() > 0) { std::vector status_list = db->MultiGet(ReadOptions(), - keysMustExist, + keys_must_exist, &values); - for (size_t i = 0; i < keysMustExist.size(); i++) { + for (size_t i = 0; i < keys_must_exist.size(); i++) { ASSERT_OK(status_list[i]); } } // Ensure that given keys don't exist - if (keysMustNotExist.size() > 0) { + if (keys_must_not_exist.size() > 0) { std::vector status_list = db->MultiGet(ReadOptions(), - keysMustNotExist, + keys_must_not_exist, &values); - for (size_t i = 0; i < keysMustNotExist.size(); i++) { + for (size_t i = 0; i < keys_must_not_exist.size(); i++) { ASSERT_TRUE(status_list[i].IsNotFound()); } } @@ -9922,37 +9922,37 @@ TEST_F(DBTest, WalFilterTest) { class TestWalFilter : public WalFilter { private: // Processing option that is requested to be applied at the given index - WalFilter::WalProcessingOption WalProcessingOption_; - // Index at which to apply WalProcessingOption_ - // At other indexes default WalProcessingOption::kContinueProcessing is + WalFilter::WalProcessingOption wal_processing_option_; + // Index at which to apply wal_processing_option_ + // At other indexes default wal_processing_option::kContinueProcessing is // returned. - size_t applyOptionAtRecordIndex_; + size_t apply_option_at_record_index_; // Current record index, incremented with each record encountered. - size_t currentRecordIndex_; + size_t current_record_index_; public: - TestWalFilter(WalFilter::WalProcessingOption WalProcessingOption, - size_t applyOptionForRecordIndex) : - WalProcessingOption_(WalProcessingOption), - applyOptionAtRecordIndex_(applyOptionForRecordIndex), - currentRecordIndex_(0) { } + TestWalFilter(WalFilter::WalProcessingOption wal_processing_option, + size_t apply_option_for_record_index) : + wal_processing_option_(wal_processing_option), + apply_option_at_record_index_(apply_option_for_record_index), + current_record_index_(0) { } virtual WalProcessingOption LogRecord(const WriteBatch & batch, WriteBatch* new_batch, bool* batch_changed) const override { - WalFilter::WalProcessingOption optionToReturn; + WalFilter::WalProcessingOption option_to_return; - if (currentRecordIndex_ == applyOptionAtRecordIndex_) { - optionToReturn = WalProcessingOption_; + if (current_record_index_ == apply_option_at_record_index_) { + option_to_return = wal_processing_option_; } else { - optionToReturn = WalProcessingOption::kContinueProcessing; + option_to_return = WalProcessingOption::kContinueProcessing; } // Filter is passed as a const object for RocksDB to not modify the // object, however we modify it for our own purpose here and hence // cast the constness away. - (const_cast(this)->currentRecordIndex_)++; + (const_cast(this)->current_record_index_)++; - return optionToReturn; + return option_to_return; } virtual const char* Name() const override { @@ -9961,14 +9961,14 @@ TEST_F(DBTest, WalFilterTest) { }; // Create 3 batches with two keys each - std::vector> batchKeys(3); + std::vector> batch_keys(3); - batchKeys[0].push_back("key1"); - batchKeys[0].push_back("key2"); - batchKeys[1].push_back("key3"); - batchKeys[1].push_back("key4"); - batchKeys[2].push_back("key5"); - batchKeys[2].push_back("key6"); + batch_keys[0].push_back("key1"); + batch_keys[0].push_back("key2"); + batch_keys[1].push_back("key3"); + batch_keys[1].push_back("key4"); + batch_keys[2].push_back("key5"); + batch_keys[2].push_back("key6"); // Test with all WAL processing options for (int option = 0; @@ -9979,29 +9979,29 @@ TEST_F(DBTest, WalFilterTest) { CreateAndReopenWithCF({ "pikachu" }, options); // Write given keys in given batches - for (size_t i = 0; i < batchKeys.size(); i++) { + for (size_t i = 0; i < batch_keys.size(); i++) { WriteBatch batch; - for (size_t j = 0; j < batchKeys[i].size(); j++) { - batch.Put(handles_[0], batchKeys[i][j], DummyString(1024)); + for (size_t j = 0; j < batch_keys[i].size(); j++) { + batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); } dbfull()->Write(WriteOptions(), &batch); } - WalFilter::WalProcessingOption WalProcessingOption = + WalFilter::WalProcessingOption wal_processing_option = static_cast(option); - // Create a test filter that would apply WalProcessingOption at the first + // Create a test filter that would apply wal_processing_option at the first // record - size_t applyOptionForRecordIndex = 1; - TestWalFilter testWalFilter(WalProcessingOption, - applyOptionForRecordIndex); + size_t apply_option_for_record_index = 1; + TestWalFilter test_wal_filter(wal_processing_option, + apply_option_for_record_index); // Reopen database with option to use WAL filter options = OptionsForLogIterTest(); - options.wal_filter = &testWalFilter; + options.wal_filter = &test_wal_filter; Status status = TryReopenWithColumnFamilies({ "default", "pikachu" }, options); - if (WalProcessingOption == + if (wal_processing_option == WalFilter::WalProcessingOption::kCorruptedRecord) { assert(!status.ok()); // In case of corruption we can turn off paranoid_checks to reopen @@ -10014,32 +10014,32 @@ TEST_F(DBTest, WalFilterTest) { // Compute which keys we expect to be found // and which we expect not to be found after recovery. - std::vector keysMustExist; - std::vector keysMustNotExist; - switch (WalProcessingOption) { + std::vector keys_must_exist; + std::vector keys_must_not_exist; + switch (wal_processing_option) { case WalFilter::WalProcessingOption::kCorruptedRecord: case WalFilter::WalProcessingOption::kContinueProcessing: { fprintf(stderr, "Testing with complete WAL processing\n"); //we expect all records to be processed - for (size_t i = 0; i < batchKeys.size(); i++) { - for (size_t j = 0; j < batchKeys[i].size(); j++) { - keysMustExist.push_back(Slice(batchKeys[i][j])); + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + keys_must_exist.push_back(Slice(batch_keys[i][j])); } } break; } case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: { fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n", - applyOptionForRecordIndex); - // We expect the record with applyOptionForRecordIndex to be not + apply_option_for_record_index); + // We expect the record with apply_option_for_record_index to be not // found. - for (size_t i = 0; i < batchKeys.size(); i++) { - for (size_t j = 0; j < batchKeys[i].size(); j++) { - if (i == applyOptionForRecordIndex) { - keysMustNotExist.push_back(Slice(batchKeys[i][j])); + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + if (i == apply_option_for_record_index) { + keys_must_not_exist.push_back(Slice(batch_keys[i][j])); } else { - keysMustExist.push_back(Slice(batchKeys[i][j])); + keys_must_exist.push_back(Slice(batch_keys[i][j])); } } } @@ -10047,16 +10047,16 @@ TEST_F(DBTest, WalFilterTest) { } case WalFilter::WalProcessingOption::kStopReplay: { fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n", - applyOptionForRecordIndex); - // We expect records beyond applyOptionForRecordIndex to be not + apply_option_for_record_index); + // We expect records beyond apply_option_for_record_index to be not // found. - for (size_t i = 0; i < batchKeys.size(); i++) { - for (size_t j = 0; j < batchKeys[i].size(); j++) { - if (i >= applyOptionForRecordIndex) { - keysMustNotExist.push_back(Slice(batchKeys[i][j])); + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + if (i >= apply_option_for_record_index) { + keys_must_not_exist.push_back(Slice(batch_keys[i][j])); } else { - keysMustExist.push_back(Slice(batchKeys[i][j])); + keys_must_exist.push_back(Slice(batch_keys[i][j])); } } } @@ -10066,15 +10066,14 @@ TEST_F(DBTest, WalFilterTest) { assert(false); //unhandled case } - bool checkedAfterReopen = false; + bool checked_after_reopen = false; - while (true) - { + while (true) { // Ensure that expected keys exists // and not expected keys don't exist after recovery - ValidateKeyExistence(db_, keysMustExist, keysMustNotExist); + ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); - if (checkedAfterReopen) { + if (checked_after_reopen) { break; } @@ -10084,7 +10083,7 @@ TEST_F(DBTest, WalFilterTest) { options = OptionsForLogIterTest(); ReopenWithColumnFamilies({ "default", "pikachu" }, options); - checkedAfterReopen = true; + checked_after_reopen = true; } } } @@ -10093,21 +10092,21 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { class ChangeBatchHandler : public WriteBatch::Handler { private: // Batch to insert keys in - WriteBatch* newWriteBatch_; + WriteBatch* new_write_batch_; // Number of keys to add in the new batch - size_t m_numKeysToAddInNewBatch; + size_t num_keys_to_add_in_new_batch_; // Number of keys added to new batch - size_t m_numKeysAdded; + size_t num_keys_added_; public: - ChangeBatchHandler(WriteBatch* newWriteBatch, - size_t numKeysToAddInNewBatch) : - newWriteBatch_(newWriteBatch), - m_numKeysToAddInNewBatch(numKeysToAddInNewBatch), - m_numKeysAdded(0){ } + ChangeBatchHandler(WriteBatch* new_write_batch, + size_t num_keys_to_add_in_new_batch) : + new_write_batch_(new_write_batch), + num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), + num_keys_added_(0){ } virtual void Put(const Slice& key, const Slice& value) override { - if (m_numKeysAdded < m_numKeysToAddInNewBatch) { - newWriteBatch_->Put(key, value); - ++m_numKeysAdded; + if (num_keys_added_ < num_keys_to_add_in_new_batch_) { + new_write_batch_->Put(key, value); + ++num_keys_added_; } } }; @@ -10115,24 +10114,24 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { class TestWalFilterWithChangeBatch : public WalFilter { private: // Index at which to start changing records - size_t m_changeRecordsFromIndex; + size_t change_records_from_index_; // Number of keys to add in the new batch - size_t m_numKeysToAddInNewBatch; + size_t num_keys_to_add_in_new_batch_; // Current record index, incremented with each record encountered. - size_t currentRecordIndex_; + size_t current_record_index_; public: TestWalFilterWithChangeBatch( - size_t changeRecordsFromIndex, - size_t numKeysToAddInNewBatch) : - m_changeRecordsFromIndex(changeRecordsFromIndex), - m_numKeysToAddInNewBatch(numKeysToAddInNewBatch), - currentRecordIndex_(0) { } + size_t change_records_from_index, + size_t num_keys_to_add_in_new_batch) : + change_records_from_index_(change_records_from_index), + num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), + current_record_index_(0) { } virtual WalProcessingOption LogRecord(const WriteBatch & batch, WriteBatch* new_batch, bool* batch_changed) const override { - if (currentRecordIndex_ >= m_changeRecordsFromIndex) { - ChangeBatchHandler handler(new_batch, m_numKeysToAddInNewBatch); + if (current_record_index_ >= change_records_from_index_) { + ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_); batch.Iterate(&handler); *batch_changed = true; } @@ -10140,7 +10139,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { // Filter is passed as a const object for RocksDB to not modify the // object, however we modify it for our own purpose here and hence // cast the constness away. - (const_cast(this)->currentRecordIndex_)++; + (const_cast(this)->current_record_index_)++; return WalProcessingOption::kContinueProcessing; } @@ -10150,66 +10149,65 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { } }; - std::vector> batchKeys(3); + std::vector> batch_keys(3); - batchKeys[0].push_back("key1"); - batchKeys[0].push_back("key2"); - batchKeys[1].push_back("key3"); - batchKeys[1].push_back("key4"); - batchKeys[2].push_back("key5"); - batchKeys[2].push_back("key6"); + batch_keys[0].push_back("key1"); + batch_keys[0].push_back("key2"); + batch_keys[1].push_back("key3"); + batch_keys[1].push_back("key4"); + batch_keys[2].push_back("key5"); + batch_keys[2].push_back("key6"); Options options = OptionsForLogIterTest(); DestroyAndReopen(options); CreateAndReopenWithCF({ "pikachu" }, options); // Write given keys in given batches - for (size_t i = 0; i < batchKeys.size(); i++) { + for (size_t i = 0; i < batch_keys.size(); i++) { WriteBatch batch; - for (size_t j = 0; j < batchKeys[i].size(); j++) { - batch.Put(handles_[0], batchKeys[i][j], DummyString(1024)); + for (size_t j = 0; j < batch_keys[i].size(); j++) { + batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); } dbfull()->Write(WriteOptions(), &batch); } - // Create a test filter that would apply WalProcessingOption at the first + // Create a test filter that would apply wal_processing_option at the first // record - size_t changeRecordsFromIndex = 1; - size_t numKeysToAddInNewBatch = 1; - TestWalFilterWithChangeBatch testWalFilterWithChangeBatch( - changeRecordsFromIndex, numKeysToAddInNewBatch); + size_t change_records_from_index = 1; + size_t num_keys_to_add_in_new_batch = 1; + TestWalFilterWithChangeBatch test_wal_filter_with_change_batch( + change_records_from_index, num_keys_to_add_in_new_batch); // Reopen database with option to use WAL filter options = OptionsForLogIterTest(); - options.wal_filter = &testWalFilterWithChangeBatch; + options.wal_filter = &test_wal_filter_with_change_batch; ReopenWithColumnFamilies({ "default", "pikachu" }, options); - // Ensure that all keys exist before m_changeRecordsFromIndex + // Ensure that all keys exist before change_records_from_index_ // And after that index only single key exists // as our filter adds only single key for each batch - std::vector keysMustExist; - std::vector keysMustNotExist; + std::vector keys_must_exist; + std::vector keys_must_not_exist; - for (size_t i = 0; i < batchKeys.size(); i++) { - for (size_t j = 0; j < batchKeys[i].size(); j++) { - if (i >= changeRecordsFromIndex && j >= numKeysToAddInNewBatch) { - keysMustNotExist.push_back(Slice(batchKeys[i][j])); + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) { + keys_must_not_exist.push_back(Slice(batch_keys[i][j])); } else { - keysMustExist.push_back(Slice(batchKeys[i][j])); + keys_must_exist.push_back(Slice(batch_keys[i][j])); } } } - bool checkedAfterReopen = false; + bool checked_after_reopen = false; - while (true) - { + while (true) { // Ensure that expected keys exists // and not expected keys don't exist after recovery - ValidateKeyExistence(db_, keysMustExist, keysMustNotExist); + ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); - if (checkedAfterReopen) { + if (checked_after_reopen) { break; } @@ -10219,9 +10217,75 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { options = OptionsForLogIterTest(); ReopenWithColumnFamilies({ "default", "pikachu" }, options); - checkedAfterReopen = true; + checked_after_reopen = true; } } + +TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) { + class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter { + public: + virtual WalProcessingOption LogRecord(const WriteBatch & batch, + WriteBatch* new_batch, bool* batch_changed) const override { + *new_batch = batch; + new_batch->Put("key_extra", "value_extra"); + *batch_changed = true; + return WalProcessingOption::kContinueProcessing; + } + + virtual const char* Name() const override { + return "WalFilterTestWithChangeBatchExtraKeys"; + } + }; + + std::vector> batch_keys(3); + + batch_keys[0].push_back("key1"); + batch_keys[0].push_back("key2"); + batch_keys[1].push_back("key3"); + batch_keys[1].push_back("key4"); + batch_keys[2].push_back("key5"); + batch_keys[2].push_back("key6"); + + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({ "pikachu" }, options); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys[i].size(); j++) { + batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + // Create a test filter that would add extra keys + TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys; + + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &test_wal_filter_extra_keys; + Status status = + TryReopenWithColumnFamilies({ "default", "pikachu" }, options); + ASSERT_TRUE(status.IsNotSupported()); + + // Reopen without filter, now reopen should succeed - previous + // attempt to open must not have altered the db. + options = OptionsForLogIterTest(); + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + + std::vector keys_must_exist; + std::vector keys_must_not_exist; //empty vector + + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + keys_must_exist.push_back(Slice(batch_keys[i][j])); + } + } + + ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); +} + #endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE diff --git a/include/rocksdb/wal_filter.h b/include/rocksdb/wal_filter.h index c30eb1277..81ffa3998 100644 --- a/include/rocksdb/wal_filter.h +++ b/include/rocksdb/wal_filter.h @@ -40,6 +40,20 @@ public: // * stop log replay // (by returning kStop replay) - please note that this implies // discarding the logs from current record onwards. + // + // @params batch batch encountered in the log during recovery + // @params new_batch new_batch to populate if filter wants to change + // the batch (for example to filter some records out, + // or alter some records). + // Please note that the new batch MUST NOT contain + // more records than original, else recovery would + // be failed. + // @params batch_changed Whether batch was changed by the filter. + // It must be set to true if new_batch was populated, + // else new_batch has no effect. + // @returns Processing option for the current record. + // Please see WalProcessingOption enum above for + // details. virtual WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch, bool* batch_changed) const = 0;