diff --git a/CMakeLists.txt b/CMakeLists.txt index 91a025268..460d13a38 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -502,6 +502,7 @@ set(TESTS db/db_iter_test.cc db/db_log_iter_test.cc db/db_memtable_test.cc + db/db_merge_operator_test.cc db/db_options_test.cc db/db_properties_test.cc db/db_table_properties_test.cc diff --git a/Makefile b/Makefile index ed0917c37..223d1f02b 100644 --- a/Makefile +++ b/Makefile @@ -309,6 +309,7 @@ TESTS = \ db_inplace_update_test \ db_iterator_test \ db_memtable_test \ + db_merge_operator_test \ db_options_test \ db_range_del_test \ db_sst_test \ @@ -996,6 +997,9 @@ db_iterator_test: db/db_iterator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHA db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_iter.cc b/db/db_iter.cc index d2d49aac1..64409c060 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -518,6 +518,7 @@ void DBIter::MergeValuesNewToOld() { iter_->IsValuePinned() /* operand_pinned */); ParsedInternalKey ikey; + Status s; for (iter_->Next(); iter_->Valid(); iter_->Next()) { if (!ParseKey(&ikey)) { // skip corrupted key @@ -538,9 +539,12 @@ void DBIter::MergeValuesNewToOld() { // final result in saved_value_. We are done! // ignore corruption if there is any. const Slice val = iter_->value(); - MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val, - merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_, &pinned_value_); + s = MergeHelper::TimedFullMerge( + merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(), + &saved_value_, logger_, statistics_, env_, &pinned_value_); + if (!s.ok()) { + status_ = s; + } // iter_ is positioned after put iter_->Next(); return; @@ -559,9 +563,12 @@ void DBIter::MergeValuesNewToOld() { // a deletion marker. // feed null as the existing value to the merge operator, such that // client can differentiate this scenario and do things accordingly. - MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, - merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_, &pinned_value_); + s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, + merge_context_.GetOperands(), &saved_value_, + logger_, statistics_, env_, &pinned_value_); + if (!s.ok()) { + status_ = s; + } } void DBIter::Prev() { @@ -742,6 +749,7 @@ bool DBIter::FindValueForCurrentKey() { FindParseableKey(&ikey, kReverse); } + Status s; switch (last_key_entry_type) { case kTypeDeletion: case kTypeSingleDeletion: @@ -753,16 +761,16 @@ bool DBIter::FindValueForCurrentKey() { if (last_not_merge_type == kTypeDeletion || last_not_merge_type == kTypeSingleDeletion || last_not_merge_type == kTypeRangeDeletion) { - MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), - nullptr, merge_context_.GetOperands(), - &saved_value_, logger_, statistics_, env_, - &pinned_value_); + s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), + nullptr, merge_context_.GetOperands(), + &saved_value_, logger_, statistics_, + env_, &pinned_value_); } else { assert(last_not_merge_type == kTypeValue); - MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), - &pinned_value_, - merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_, &pinned_value_); + s = MergeHelper::TimedFullMerge( + merge_operator_, saved_key_.GetKey(), &pinned_value_, + merge_context_.GetOperands(), &saved_value_, logger_, statistics_, + env_, &pinned_value_); } break; case kTypeValue: @@ -773,6 +781,9 @@ bool DBIter::FindValueForCurrentKey() { break; } valid_ = true; + if (!s.ok()) { + status_ = s; + } return true; } @@ -818,13 +829,15 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { FindParseableKey(&ikey, kForward); } + Status s; if (!iter_->Valid() || !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) || ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || range_del_agg_.ShouldDelete(ikey)) { - MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, - merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_, &pinned_value_); + s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), + nullptr, merge_context_.GetOperands(), + &saved_value_, logger_, statistics_, env_, + &pinned_value_); // Make iter_ valid and point to saved_key_ if (!iter_->Valid() || !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) { @@ -832,14 +845,20 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); } valid_ = true; + if (!s.ok()) { + status_ = s; + } return true; } const Slice& val = iter_->value(); - MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val, - merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_, &pinned_value_); + s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val, + merge_context_.GetOperands(), &saved_value_, + logger_, statistics_, env_, &pinned_value_); valid_ = true; + if (!s.ok()) { + status_ = s; + } return true; } diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc new file mode 100644 index 000000000..189639ce9 --- /dev/null +++ b/db/db_merge_operator_test.cc @@ -0,0 +1,98 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#include +#include + +#include "db/db_test_util.h" +#include "db/forward_iterator.h" +#include "port/stack_trace.h" +#include "utilities/merge_operators.h" + +namespace rocksdb { + +// Test merge operator functionality. +class DBMergeOperatorTest : public DBTestBase { + public: + DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {} +}; + +// A test merge operator mimics put but also fails if one of merge operands is +// "corrupted". +class TestPutOperator : public MergeOperator { + public: + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + if (merge_in.existing_value != nullptr && + *(merge_in.existing_value) == "corrupted") { + return false; + } + for (auto value : merge_in.operand_list) { + if (value == "corrupted") { + return false; + } + } + merge_out->existing_operand = merge_in.operand_list.back(); + return true; + } + + virtual const char* Name() const override { return "TestPutOperator"; } +}; + +TEST_F(DBMergeOperatorTest, MergeErrorOnRead) { + Options options; + options.create_if_missing = true; + options.merge_operator.reset(new TestPutOperator()); + Reopen(options); + ASSERT_OK(Merge("k1", "v1")); + ASSERT_OK(Merge("k1", "corrupted")); + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption()); + VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}}); +} + +TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) { + Options options; + options.create_if_missing = true; + options.merge_operator.reset(new TestPutOperator()); + options.max_successive_merges = 3; + Reopen(options); + ASSERT_OK(Merge("k1", "v1")); + ASSERT_OK(Merge("k1", "v2")); + // Will trigger a merge when hitting max_successive_merges and the merge + // will fail. The delta will be inserted nevertheless. + ASSERT_OK(Merge("k1", "corrupted")); + // Data should stay unmerged after the error. + VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}}); +} + +TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) { + Options options; + options.create_if_missing = true; + options.merge_operator.reset(new TestPutOperator()); + + DestroyAndReopen(options); + ASSERT_OK(Merge("k1", "v1")); + ASSERT_OK(Merge("k1", "corrupted")); + ASSERT_OK(Put("k2", "v2")); + VerifyDBFromMap({{"k1", ""}, {"k2", "v2"}}, nullptr, false, + {{"k1", Status::Corruption()}}); + VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}}); + + DestroyAndReopen(options); + ASSERT_OK(Merge("k1", "v1")); + ASSERT_OK(Put("k2", "v2")); + ASSERT_OK(Merge("k2", "corrupted")); + VerifyDBFromMap({{"k1", "v1"}, {"k2", ""}}, nullptr, false, + {{"k2", Status::Corruption()}}); + VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}}); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 5508d6a13..53b2d4489 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_test_util.h" +#include "db/forward_iterator.h" namespace rocksdb { @@ -501,6 +502,15 @@ Status DBTestBase::Put(int cf, const Slice& k, const Slice& v, } } +Status DBTestBase::Merge(const Slice& k, const Slice& v, WriteOptions wo) { + return db_->Merge(wo, k, v); +} + +Status DBTestBase::Merge(int cf, const Slice& k, const Slice& v, + WriteOptions wo) { + return db_->Merge(wo, handles_[cf], k, v); +} + Status DBTestBase::Delete(const std::string& k) { return db_->Delete(WriteOptions(), k); } @@ -1089,11 +1099,18 @@ std::vector DBTestBase::ListTableFiles(Env* env, } void DBTestBase::VerifyDBFromMap(std::map true_data, - size_t* total_reads_res, bool tailing_iter) { + size_t* total_reads_res, bool tailing_iter, + std::map status) { size_t total_reads = 0; for (auto& kv : true_data) { - ASSERT_EQ(Get(kv.first), kv.second); + Status s = status[kv.first]; + if (s.ok()) { + ASSERT_EQ(Get(kv.first), kv.second); + } else { + std::string value; + ASSERT_EQ(s, db_->Get(ReadOptions(), kv.first, &value)); + } total_reads++; } @@ -1106,21 +1123,40 @@ void DBTestBase::VerifyDBFromMap(std::map true_data, // Verify Iterator::Next() iter_cnt = 0; auto data_iter = true_data.begin(); + Status s; for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { ASSERT_EQ(iter->key().ToString(), data_iter->first); - ASSERT_EQ(iter->value().ToString(), data_iter->second); + Status current_status = status[data_iter->first]; + if (!current_status.ok()) { + s = current_status; + } + ASSERT_EQ(iter->status(), s); + if (current_status.ok()) { + ASSERT_EQ(iter->value().ToString(), data_iter->second); + } iter_cnt++; total_reads++; } ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / " << true_data.size(); + delete iter; // Verify Iterator::Prev() + // Use a new iterator to make sure its status is clean. + iter = db_->NewIterator(ro); iter_cnt = 0; + s = Status::OK(); auto data_rev = true_data.rbegin(); for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { ASSERT_EQ(iter->key().ToString(), data_rev->first); - ASSERT_EQ(iter->value().ToString(), data_rev->second); + Status current_status = status[data_rev->first]; + if (!current_status.ok()) { + s = current_status; + } + ASSERT_EQ(iter->status(), s); + if (current_status.ok()) { + ASSERT_EQ(iter->value().ToString(), data_rev->second); + } iter_cnt++; total_reads++; } @@ -1134,7 +1170,6 @@ void DBTestBase::VerifyDBFromMap(std::map true_data, ASSERT_EQ(kv.second, iter->value().ToString()); total_reads++; } - delete iter; } @@ -1176,6 +1211,25 @@ void DBTestBase::VerifyDBFromMap(std::map true_data, } } +void DBTestBase::VerifyDBInternal( + std::vector> true_data) { + Arena arena; + InternalKeyComparator icmp(last_options_.comparator); + RangeDelAggregator range_del_agg(icmp, {}); + auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg); + iter->SeekToFirst(); + for (auto p : true_data) { + ASSERT_TRUE(iter->Valid()); + ParsedInternalKey ikey; + ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey)); + ASSERT_EQ(p.first, ikey.user_key); + ASSERT_EQ(p.second, iter->value()); + iter->Next(); + }; + ASSERT_FALSE(iter->Valid()); + iter->~InternalIterator(); +} + #ifndef ROCKSDB_LITE uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily( diff --git a/db/db_test_util.h b/db/db_test_util.h index c02a79a1e..bf58d27ba 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -699,6 +699,12 @@ class DBTestBase : public testing::Test { Status Put(int cf, const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()); + Status Merge(const Slice& k, const Slice& v, + WriteOptions wo = WriteOptions()); + + Status Merge(int cf, const Slice& k, const Slice& v, + WriteOptions wo = WriteOptions()); + Status Delete(const std::string& k); Status Delete(int cf, const std::string& k); @@ -827,7 +833,11 @@ class DBTestBase : public testing::Test { void VerifyDBFromMap(std::map true_data, size_t* total_reads_res = nullptr, - bool tailing_iter = false); + bool tailing_iter = false, + std::map status = {}); + + void VerifyDBInternal( + std::vector> true_data); #ifndef ROCKSDB_LITE uint64_t GetNumberOfSstFilesForColumnFamily(DB* db, diff --git a/src.mk b/src.mk index 1539b2e1e..2a4d48b8c 100644 --- a/src.mk +++ b/src.mk @@ -96,7 +96,7 @@ LIB_SOURCES = \ util/compaction_job_stats_impl.cc \ util/concurrent_arena.cc \ util/crc32c.cc \ - util/db_options.cc \ + util/db_options.cc \ util/delete_scheduler.cc \ util/dynamic_bloom.cc \ util/env.cc \ @@ -126,7 +126,7 @@ LIB_SOURCES = \ util/perf_level.cc \ util/random.cc \ util/rate_limiter.cc \ - util/sharded_cache.cc \ + util/sharded_cache.cc \ util/slice.cc \ util/sst_file_manager_impl.cc \ util/statistics.cc \ @@ -223,18 +223,20 @@ MAIN_SOURCES = \ db/dbformat_test.cc \ db/db_iter_test.cc \ db/db_test.cc \ - db/db_block_cache_test.cc \ + db/db_block_cache_test.cc \ db/db_io_failure_test.cc \ db/db_bloom_filter_test.cc \ db/db_compaction_filter_test.cc \ db/db_compaction_test.cc \ db/db_dynamic_level_test.cc \ - db/db_flush_test.cc \ + db/db_flush_test.cc \ db/db_inplace_update_test.cc \ - db/db_iterator_test.cc \ + db/db_iterator_test.cc \ db/db_log_iter_test.cc \ + db/db_memtable_test.cc \ + db/db_merge_operator_test.cc \ db/db_options_test.cc \ - db/db_range_del_test.cc \ + db/db_range_del_test.cc \ db/db_sst_test.cc \ db/external_sst_file_test.cc \ db/db_tailing_iter_test.cc \ @@ -314,7 +316,7 @@ MAIN_SOURCES = \ utilities/write_batch_with_index/write_batch_with_index_test.cc \ utilities/column_aware_encoding_test.cc \ utilities/lua/rocks_lua_test.cc \ - util/iostats_context_test.cc \ + util/iostats_context_test.cc \ util/log_write_bench.cc \ util/mock_env_test.cc \ util/options_test.cc \