From 3897ce3125b8c71072d01c198496a2c47b2a8e3a Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Thu, 10 Jun 2021 12:55:29 -0700 Subject: [PATCH] Support for Merge in Integrated BlobDB with base values (#8292) Summary: This PR add support for Merge operation in Integrated BlobDB with base values(i.e DB::Put). Merged values can be retrieved through DB::Get, DB::MultiGet, DB::GetMergeOperands and Iterator operation. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8292 Test Plan: Add new unit tests Reviewed By: ltamasi Differential Revision: D28415896 Pulled By: akankshamahajan15 fbshipit-source-id: e9b3478bef51d2f214fb88c31ed3c8d2f4a531ff --- CMakeLists.txt | 1 + TARGETS | 2 + db/blob/blob_fetcher.cc | 22 ++++++ db/blob/blob_fetcher.h | 26 +++++++ db/blob/db_blob_basic_test.cc | 57 ++++++++++++++ db/blob/db_blob_index_test.cc | 114 ++++++++++++++++++++++++++- db/db_iter.cc | 130 ++++++++++++++++++++----------- db/db_iter.h | 2 + db/db_merge_operand_test.cc | 132 ++++++++++++++++++++++++++----- db/version_set.cc | 7 +- src.mk | 1 + table/get_context.cc | 143 ++++++++++++++++++++-------------- table/get_context.h | 10 ++- 13 files changed, 516 insertions(+), 131 deletions(-) create mode 100644 db/blob/blob_fetcher.cc create mode 100644 db/blob/blob_fetcher.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 3a2dc7ed8..db802edb3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -628,6 +628,7 @@ set(SOURCES cache/lru_cache.cc cache/sharded_cache.cc db/arena_wrapped_db_iter.cc + db/blob/blob_fetcher.cc db/blob/blob_file_addition.cc db/blob/blob_file_builder.cc db/blob/blob_file_cache.cc diff --git a/TARGETS b/TARGETS index 75f53a733..248a810ba 100644 --- a/TARGETS +++ b/TARGETS @@ -137,6 +137,7 @@ cpp_library( "cache/lru_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", + "db/blob/blob_fetcher.cc", "db/blob/blob_file_addition.cc", "db/blob/blob_file_builder.cc", "db/blob/blob_file_cache.cc", @@ -448,6 +449,7 @@ cpp_library( "cache/lru_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", + "db/blob/blob_fetcher.cc", "db/blob/blob_file_addition.cc", "db/blob/blob_file_builder.cc", "db/blob/blob_file_cache.cc", diff --git a/db/blob/blob_fetcher.cc b/db/blob/blob_fetcher.cc new file mode 100644 index 000000000..a42a4be5f --- /dev/null +++ b/db/blob/blob_fetcher.cc @@ -0,0 +1,22 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_fetcher.h" + +#include "db/version_set.h" + +namespace ROCKSDB_NAMESPACE { + +Status BlobFetcher::FetchBlob(const Slice& user_key, const Slice& blob_index, + PinnableSlice* blob_value) { + Status s; + assert(version_); + constexpr uint64_t* bytes_read = nullptr; + s = version_->GetBlob(read_options_, user_key, blob_index, blob_value, + bytes_read); + return s; +} + +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/db/blob/blob_fetcher.h b/db/blob/blob_fetcher.h new file mode 100644 index 000000000..747057f09 --- /dev/null +++ b/db/blob/blob_fetcher.h @@ -0,0 +1,26 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/options.h" +#include "rocksdb/status.h" + +namespace ROCKSDB_NAMESPACE { +class Version; + +class BlobFetcher { + public: + BlobFetcher(Version* version, const ReadOptions& read_options) + : version_(version), read_options_(read_options) {} + + Status FetchBlob(const Slice& user_key, const Slice& blob_index, + PinnableSlice* blob_value); + + private: + Version* version_; + ReadOptions read_options_; +}; +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index 41510e052..feee834c5 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -310,6 +310,63 @@ TEST_F(DBBlobBasicTest, BestEffortsRecovery_MissingNewestBlobFile) { ASSERT_EQ("value" + std::to_string(kNumTableFiles - 2), value); } +TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) { + Options options = GetDefaultOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + ASSERT_OK(Put("Key1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("Key1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("Key1", "v3")); + ASSERT_OK(Flush()); + + std::string value; + ASSERT_OK(db_->Get(ReadOptions(), "Key1", &value)); + ASSERT_EQ(Get("Key1"), "v1,v2,v3"); +} + +TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) { + constexpr size_t num_keys = 3; + + Options options = GetDefaultOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + ASSERT_OK(Put("Key0", "v0_0")); + ASSERT_OK(Put("Key1", "v1_0")); + ASSERT_OK(Put("Key2", "v2_0")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("Key0", "v0_1")); + ASSERT_OK(Merge("Key1", "v1_1")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("Key0", "v0_2")); + ASSERT_OK(Flush()); + + std::array keys{{"Key0", "Key1", "Key2"}}; + std::array values; + std::array statuses; + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], "v0_0,v0_1,v0_2"); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], "v1_0,v1_1"); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], "v2_0"); +} + class DBBlobBasicIOErrorTest : public DBBlobBasicTest, public testing::WithParamInterface { protected: diff --git a/db/blob/db_blob_index_test.cc b/db/blob/db_blob_index_test.cc index 8e9da81be..34bcd9fb5 100644 --- a/db/blob/db_blob_index_test.cc +++ b/db/blob/db_blob_index_test.cc @@ -399,7 +399,7 @@ TEST_F(DBBlobIndexTest, Iterate) { create_normal_iterator); verify(9, Status::kOk, get_value(10, 0), get_value(8, 0), create_normal_iterator); - verify(11, Status::kNotSupported, "", "", create_normal_iterator); + verify(11, Status::kCorruption, "", "", create_normal_iterator); verify(13, Status::kOk, get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), @@ -418,7 +418,11 @@ TEST_F(DBBlobIndexTest, Iterate) { create_blob_iterator, check_is_blob(false)); verify(9, Status::kOk, get_value(10, 0), get_value(8, 0), create_blob_iterator, check_is_blob(false)); - verify(11, Status::kNotSupported, "", "", create_blob_iterator); + if (tier <= kImmutableMemtables) { + verify(11, Status::kNotSupported, "", "", create_blob_iterator); + } else { + verify(11, Status::kCorruption, "", "", create_blob_iterator); + } verify(13, Status::kOk, get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), @@ -440,7 +444,11 @@ TEST_F(DBBlobIndexTest, Iterate) { create_blob_iterator, check_is_blob(false)); verify(9, Status::kOk, get_value(10, 0), get_value(8, 0), create_blob_iterator, check_is_blob(false)); - verify(11, Status::kNotSupported, "", "", create_blob_iterator); + if (tier <= kImmutableMemtables) { + verify(11, Status::kNotSupported, "", "", create_blob_iterator); + } else { + verify(11, Status::kCorruption, "", "", create_blob_iterator); + } verify(13, Status::kOk, get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), @@ -455,6 +463,106 @@ TEST_F(DBBlobIndexTest, Iterate) { } } +TEST_F(DBBlobIndexTest, IntegratedBlobIterate) { + const std::vector> data = { + /*00*/ {"Put"}, + /*01*/ {"Put", "Merge", "Merge", "Merge"}, + /*02*/ {"Put"}}; + + auto get_key = [](size_t index) { return ("key" + std::to_string(index)); }; + + auto get_value = [&](size_t index, size_t version) { + return get_key(index) + "_value" + ToString(version); + }; + + auto check_iterator = [&](Iterator* iterator, Status expected_status, + const Slice& expected_value) { + ASSERT_EQ(expected_status, iterator->status()); + if (expected_status.ok()) { + ASSERT_TRUE(iterator->Valid()); + ASSERT_EQ(expected_value, iterator->value()); + } else { + ASSERT_FALSE(iterator->Valid()); + } + }; + + auto verify = [&](size_t index, Status expected_status, + const Slice& expected_value) { + // Seek + { + Iterator* iterator = db_->NewIterator(ReadOptions()); + std::unique_ptr iterator_guard(iterator); + ASSERT_OK(iterator->status()); + ASSERT_OK(iterator->Refresh()); + iterator->Seek(get_key(index)); + check_iterator(iterator, expected_status, expected_value); + } + // Next + { + Iterator* iterator = db_->NewIterator(ReadOptions()); + std::unique_ptr iterator_guard(iterator); + ASSERT_OK(iterator->Refresh()); + iterator->Seek(get_key(index - 1)); + ASSERT_TRUE(iterator->Valid()); + ASSERT_OK(iterator->status()); + iterator->Next(); + check_iterator(iterator, expected_status, expected_value); + } + // SeekForPrev + { + Iterator* iterator = db_->NewIterator(ReadOptions()); + std::unique_ptr iterator_guard(iterator); + ASSERT_OK(iterator->status()); + ASSERT_OK(iterator->Refresh()); + iterator->SeekForPrev(get_key(index)); + check_iterator(iterator, expected_status, expected_value); + } + // Prev + { + Iterator* iterator = db_->NewIterator(ReadOptions()); + std::unique_ptr iterator_guard(iterator); + iterator->Seek(get_key(index + 1)); + ASSERT_TRUE(iterator->Valid()); + ASSERT_OK(iterator->status()); + iterator->Prev(); + check_iterator(iterator, expected_status, expected_value); + } + }; + + Options options = GetTestOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + DestroyAndReopen(options); + + // fill data + for (size_t i = 0; i < data.size(); i++) { + for (size_t j = 0; j < data[i].size(); j++) { + std::string key = get_key(i); + std::string value = get_value(i, j); + if (data[i][j] == "Put") { + ASSERT_OK(Put(key, value)); + ASSERT_OK(Flush()); + } else if (data[i][j] == "Merge") { + ASSERT_OK(Merge(key, value)); + ASSERT_OK(Flush()); + } + } + } + + std::string expected_value = get_value(1, 0) + "," + get_value(1, 1) + "," + + get_value(1, 2) + "," + get_value(1, 3); + Status expected_status; + verify(1, expected_status, expected_value); + +#ifndef ROCKSDB_LITE + // Test DBIter::FindValueForCurrentKeyUsingSeek flow. + ASSERT_OK(dbfull()->SetOptions(cfh(), + {{"max_sequential_skip_in_iterations", "0"}})); + verify(1, expected_status, expected_value); +#endif // !ROCKSDB_LITE +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_iter.cc b/db/db_iter.cc index c302a2071..038f30053 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -576,12 +576,8 @@ bool DBIter::MergeValuesNewToOld() { // hit a put, merge the put value with operands and store the // final result in saved_value_. We are done! const Slice val = iter_.value(); - Status s = MergeHelper::TimedFullMerge( - merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(), - &saved_value_, logger_, statistics_, clock_, &pinned_value_, true); + Status s = Merge(&val, ikey.user_key); if (!s.ok()) { - valid_ = false; - status_ = s; return false; } // iter_ is positioned after put @@ -598,9 +594,31 @@ bool DBIter::MergeValuesNewToOld() { iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); } else if (kTypeBlobIndex == ikey.type) { - status_ = Status::NotSupported("BlobDB does not support merge operator."); - valid_ = false; - return false; + if (expose_blob_index_) { + status_ = + Status::NotSupported("BlobDB does not support merge operator."); + valid_ = false; + return false; + } + // hit a put, merge the put value with operands and store the + // final result in saved_value_. We are done! + if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) { + return false; + } + valid_ = true; + const Slice blob_value = value(); + Status s = Merge(&blob_value, ikey.user_key); + if (!s.ok()) { + return false; + } + is_blob_ = false; + // iter_ is positioned after put + iter_.Next(); + if (!iter_.status().ok()) { + valid_ = false; + return false; + } + return true; } else { valid_ = false; status_ = Status::Corruption( @@ -619,16 +637,10 @@ bool DBIter::MergeValuesNewToOld() { // a deletion marker. // feed null as the existing value to the merge operator, such that // client can differentiate this scenario and do things accordingly. - Status s = MergeHelper::TimedFullMerge( - merge_operator_, saved_key_.GetUserKey(), nullptr, - merge_context_.GetOperands(), &saved_value_, logger_, statistics_, clock_, - &pinned_value_, true); + Status s = Merge(nullptr, saved_key_.GetUserKey()); if (!s.ok()) { - valid_ = false; - status_ = s; return false; } - assert(status_.ok()); return true; } @@ -931,21 +943,36 @@ bool DBIter::FindValueForCurrentKey() { if (last_not_merge_type == kTypeDeletion || last_not_merge_type == kTypeSingleDeletion || last_not_merge_type == kTypeRangeDeletion) { - s = MergeHelper::TimedFullMerge( - merge_operator_, saved_key_.GetUserKey(), nullptr, - merge_context_.GetOperands(), &saved_value_, logger_, statistics_, - clock_, &pinned_value_, true); + s = Merge(nullptr, saved_key_.GetUserKey()); + if (!s.ok()) { + return false; + } + return true; } else if (last_not_merge_type == kTypeBlobIndex) { - status_ = - Status::NotSupported("BlobDB does not support merge operator."); - valid_ = false; - return false; + if (expose_blob_index_) { + status_ = + Status::NotSupported("BlobDB does not support merge operator."); + valid_ = false; + return false; + } + if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) { + return false; + } + valid_ = true; + const Slice blob_value = value(); + s = Merge(&blob_value, saved_key_.GetUserKey()); + if (!s.ok()) { + return false; + } + is_blob_ = false; + return true; } else { assert(last_not_merge_type == kTypeValue); - s = MergeHelper::TimedFullMerge( - merge_operator_, saved_key_.GetUserKey(), &pinned_value_, - merge_context_.GetOperands(), &saved_value_, logger_, statistics_, - clock_, &pinned_value_, true); + s = Merge(&pinned_value_, saved_key_.GetUserKey()); + if (!s.ok()) { + return false; + } + return true; } break; case kTypeValue: @@ -955,7 +982,6 @@ bool DBIter::FindValueForCurrentKey() { if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) { return false; } - break; default: valid_ = false; @@ -1095,25 +1121,33 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (ikey.type == kTypeValue) { const Slice val = iter_.value(); - Status s = MergeHelper::TimedFullMerge( - merge_operator_, saved_key_.GetUserKey(), &val, - merge_context_.GetOperands(), &saved_value_, logger_, statistics_, - clock_, &pinned_value_, true); + Status s = Merge(&val, saved_key_.GetUserKey()); if (!s.ok()) { - valid_ = false; - status_ = s; return false; } - valid_ = true; return true; } else if (ikey.type == kTypeMerge) { merge_context_.PushOperand( iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); } else if (ikey.type == kTypeBlobIndex) { - status_ = Status::NotSupported("BlobDB does not support merge operator."); - valid_ = false; - return false; + if (expose_blob_index_) { + status_ = + Status::NotSupported("BlobDB does not support merge operator."); + valid_ = false; + return false; + } + if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) { + return false; + } + valid_ = true; + const Slice blob_value = value(); + Status s = Merge(&blob_value, saved_key_.GetUserKey()); + if (!s.ok()) { + return false; + } + is_blob_ = false; + return true; } else { valid_ = false; status_ = Status::Corruption( @@ -1123,13 +1157,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { } } - Status s = MergeHelper::TimedFullMerge( - merge_operator_, saved_key_.GetUserKey(), nullptr, - merge_context_.GetOperands(), &saved_value_, logger_, statistics_, clock_, - &pinned_value_, true); + Status s = Merge(nullptr, saved_key_.GetUserKey()); if (!s.ok()) { - valid_ = false; - status_ = s; return false; } @@ -1152,6 +1181,19 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { return true; } +Status DBIter::Merge(const Slice* val, const Slice& user_key) { + Status s = MergeHelper::TimedFullMerge( + merge_operator_, user_key, val, merge_context_.GetOperands(), + &saved_value_, logger_, statistics_, clock_, &pinned_value_, true); + if (!s.ok()) { + valid_ = false; + status_ = s; + return s; + } + valid_ = true; + return s; +} + // Move backwards until the key smaller than saved_key_. // Changes valid_ only if return value is false. bool DBIter::FindUserKeyBeforeSavedKey() { diff --git a/db/db_iter.h b/db/db_iter.h index 21eaf8bd7..9b296d08b 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -299,6 +299,8 @@ class DBIter final : public Iterator { // index when using the integrated BlobDB implementation. bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index); + Status Merge(const Slice* val, const Slice& user_key); + const SliceTransform* prefix_extractor_; Env* const env_; SystemClock* clock_; diff --git a/db/db_merge_operand_test.cc b/db/db_merge_operand_test.cc index e7709c1f4..45bafb44c 100644 --- a/db/db_merge_operand_test.cc +++ b/db/db_merge_operand_test.cc @@ -19,6 +19,28 @@ namespace ROCKSDB_NAMESPACE { +namespace { +class LimitedStringAppendMergeOp : public StringAppendTESTOperator { + public: + LimitedStringAppendMergeOp(int limit, char delim) + : StringAppendTESTOperator(delim), limit_(limit) {} + + const char* Name() const override { + return "DBMergeOperatorTest::LimitedStringAppendMergeOp"; + } + + bool ShouldMerge(const std::vector& operands) const override { + if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) { + return true; + } + return false; + } + + private: + size_t limit_ = 0; +}; +} // namespace + class DBMergeOperandTest : public DBTestBase { public: DBMergeOperandTest() @@ -26,26 +48,6 @@ class DBMergeOperandTest : public DBTestBase { }; TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) { - class LimitedStringAppendMergeOp : public StringAppendTESTOperator { - public: - LimitedStringAppendMergeOp(int limit, char delim) - : StringAppendTESTOperator(delim), limit_(limit) {} - - const char* Name() const override { - return "DBMergeOperatorTest::LimitedStringAppendMergeOp"; - } - - bool ShouldMerge(const std::vector& operands) const override { - if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) { - return true; - } - return false; - } - - private: - size_t limit_ = 0; - }; - Options options; options.create_if_missing = true; // Use only the latest two merge operands. @@ -214,7 +216,8 @@ TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) { ASSERT_EQ(values[2], "dc"); ASSERT_EQ(values[3], "ed"); - // First 3 k5 values are in SST and next 4 k5 values are in Immutable Memtable + // First 3 k5 values are in SST and next 4 k5 values are in Immutable + // Memtable ASSERT_OK(Merge("k5", "who")); ASSERT_OK(Merge("k5", "am")); ASSERT_OK(Merge("k5", "i")); @@ -232,6 +235,93 @@ TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) { ASSERT_EQ(values[2], "am"); } +TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) { + Options options; + options.create_if_missing = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + // Use only the latest two merge operands. + options.merge_operator = std::make_shared(2, ','); + options.env = env_; + Reopen(options); + int num_records = 4; + int number_of_operands = 0; + std::vector values(num_records); + GetMergeOperandsOptions merge_operands_info; + merge_operands_info.expected_max_number_of_operands = num_records; + + // All k1 values are in memtable. + ASSERT_OK(Put("k1", "x")); + ASSERT_OK(Merge("k1", "b")); + ASSERT_OK(Merge("k1", "c")); + ASSERT_OK(Merge("k1", "d")); + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + "k1", values.data(), &merge_operands_info, + &number_of_operands)); + ASSERT_EQ(values[0], "x"); + ASSERT_EQ(values[1], "b"); + ASSERT_EQ(values[2], "c"); + ASSERT_EQ(values[3], "d"); + + // expected_max_number_of_operands is less than number of merge operands so + // status should be Incomplete. + merge_operands_info.expected_max_number_of_operands = num_records - 1; + Status status = db_->GetMergeOperands( + ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), + &merge_operands_info, &number_of_operands); + ASSERT_EQ(status.IsIncomplete(), true); + merge_operands_info.expected_max_number_of_operands = num_records; + + // All k2 values are flushed to L0 into a single file. + ASSERT_OK(Put("k2", "q")); + ASSERT_OK(Merge("k2", "w")); + ASSERT_OK(Merge("k2", "e")); + ASSERT_OK(Merge("k2", "r")); + ASSERT_OK(Flush()); + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + "k2", values.data(), &merge_operands_info, + &number_of_operands)); + ASSERT_EQ(values[0], "q,w,e,r"); + + // Do some compaction that will make the following tests more predictable + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // All k3 values are flushed and are in different files. + ASSERT_OK(Put("k3", "ab")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "bc")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "cd")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "de")); + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + "k3", values.data(), &merge_operands_info, + &number_of_operands)); + ASSERT_EQ(values[0], "ab"); + ASSERT_EQ(values[1], "bc"); + ASSERT_EQ(values[2], "cd"); + ASSERT_EQ(values[3], "de"); + + // All K4 values are in different levels + ASSERT_OK(Put("k4", "ba")); + ASSERT_OK(Flush()); + MoveFilesToLevel(4); + ASSERT_OK(Merge("k4", "cb")); + ASSERT_OK(Flush()); + MoveFilesToLevel(3); + ASSERT_OK(Merge("k4", "dc")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_OK(Merge("k4", "ed")); + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + "k4", values.data(), &merge_operands_info, + &number_of_operands)); + ASSERT_EQ(values[0], "ba"); + ASSERT_EQ(values[1], "cb"); + ASSERT_EQ(values[2], "dc"); + ASSERT_EQ(values[3], "ed"); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 8f0c86f21..407aa5edc 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -21,6 +21,7 @@ #include #include "compaction/compaction.h" +#include "db/blob/blob_fetcher.h" #include "db/blob/blob_file_cache.h" #include "db/blob/blob_file_reader.h" #include "db/blob/blob_index.h" @@ -1875,6 +1876,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, // need to provide it here. bool is_blob_index = false; bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index; + BlobFetcher blob_fetcher(this, read_options); GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, @@ -1882,7 +1884,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found, merge_context, do_merge, max_covering_tombstone_seq, clock_, seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use, - tracing_get_id); + tracing_get_id, &blob_fetcher); // Pin blocks that we read to hold merge operands if (merge_operator_) { @@ -2031,6 +2033,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, // use autovector in order to avoid unnecessary construction of GetContext // objects, which is expensive autovector get_ctx; + BlobFetcher blob_fetcher(this, read_options); for (auto iter = range->begin(); iter != range->end(); ++iter) { assert(iter->s->ok() || iter->s->IsMergeInProgress()); get_ctx.emplace_back( @@ -2039,7 +2042,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, iter->ukey_with_ts, iter->value, iter->timestamp, nullptr, &(iter->merge_context), true, &iter->max_covering_tombstone_seq, clock_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, - &iter->is_blob_index, tracing_mget_id); + &iter->is_blob_index, tracing_mget_id, &blob_fetcher); // MergeInProgress status, if set, has been transferred to the get_context // state, so we set status to ok here. From now on, the iter status will // be used for IO errors, and get_context state will be used for any diff --git a/src.mk b/src.mk index e419498bb..b088e8b70 100644 --- a/src.mk +++ b/src.mk @@ -6,6 +6,7 @@ LIB_SOURCES = \ cache/lru_cache.cc \ cache/sharded_cache.cc \ db/arena_wrapped_db_iter.cc \ + db/blob/blob_fetcher.cc \ db/blob/blob_file_addition.cc \ db/blob/blob_file_builder.cc \ db/blob/blob_file_cache.cc \ diff --git a/table/get_context.cc b/table/get_context.cc index 948c21b36..919ed4c34 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -39,14 +39,17 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { } // namespace -GetContext::GetContext( - const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, - Statistics* statistics, GetState init_state, const Slice& user_key, - PinnableSlice* pinnable_val, std::string* timestamp, bool* value_found, - MergeContext* merge_context, bool do_merge, - SequenceNumber* _max_covering_tombstone_seq, SystemClock* clock, - SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr, - ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id) +GetContext::GetContext(const Comparator* ucmp, + const MergeOperator* merge_operator, Logger* logger, + Statistics* statistics, GetState init_state, + const Slice& user_key, PinnableSlice* pinnable_val, + std::string* timestamp, bool* value_found, + MergeContext* merge_context, bool do_merge, + SequenceNumber* _max_covering_tombstone_seq, + SystemClock* clock, SequenceNumber* seq, + PinnedIteratorsManager* _pinned_iters_mgr, + ReadCallback* callback, bool* is_blob_index, + uint64_t tracing_get_id, BlobFetcher* blob_fetcher) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -65,7 +68,8 @@ GetContext::GetContext( callback_(callback), do_merge_(do_merge), is_blob_index_(is_blob_index), - tracing_get_id_(tracing_get_id) { + tracing_get_id_(tracing_get_id), + blob_fetcher_(blob_fetcher) { if (seq_) { *seq_ = kMaxSequenceNumber; } @@ -79,11 +83,11 @@ GetContext::GetContext( bool do_merge, SequenceNumber* _max_covering_tombstone_seq, SystemClock* clock, SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback, - bool* is_blob_index, uint64_t tracing_get_id) + bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher) : GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key, pinnable_val, nullptr, value_found, merge_context, do_merge, _max_covering_tombstone_seq, clock, seq, _pinned_iters_mgr, - callback, is_blob_index, tracing_get_id) {} + callback, is_blob_index, tracing_get_id, blob_fetcher) {} // Called from TableCache::Get and Table::Get when file/block in which // key may exist are not there in TableCache/BlockCache respectively. In this @@ -250,6 +254,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kUnexpectedBlobIndex; return false; } + if (is_blob_index_ != nullptr) { + *is_blob_index_ = (type == kTypeBlobIndex); + } if (kNotFound == state_) { state_ = kFound; if (do_merge_) { @@ -260,7 +267,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } else { TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf", this); - // Otherwise copy the value pinnable_val_->PinSelf(value); } @@ -269,27 +275,44 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, // It means this function is called as part of DB GetMergeOperands // API and the current value should be part of // merge_context_->operand_list - push_operand(value, value_pinner); + if (is_blob_index_ != nullptr && *is_blob_index_) { + PinnableSlice pin_val; + if (GetBlobValue(value, &pin_val) == false) { + return false; + } + Slice blob_value(pin_val); + push_operand(blob_value, nullptr); + } else { + push_operand(value, value_pinner); + } } } else if (kMerge == state_) { assert(merge_operator_ != nullptr); - state_ = kFound; - if (do_merge_) { - if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, &value, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, clock_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; - } + if (is_blob_index_ != nullptr && *is_blob_index_) { + PinnableSlice pin_val; + if (GetBlobValue(value, &pin_val) == false) { + return false; + } + Slice blob_value(pin_val); + state_ = kFound; + if (do_merge_) { + Merge(&blob_value); + } else { + // It means this function is called as part of DB GetMergeOperands + // API and the current value should be part of + // merge_context_->operand_list + push_operand(blob_value, nullptr); } } else { - // It means this function is called as part of DB GetMergeOperands - // API and the current value should be part of - // merge_context_->operand_list - push_operand(value, value_pinner); + state_ = kFound; + if (do_merge_) { + Merge(&value); + } else { + // It means this function is called as part of DB GetMergeOperands + // API and the current value should be part of + // merge_context_->operand_list + push_operand(value, value_pinner); + } } } if (state_ == kFound) { @@ -299,9 +322,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, timestamp_->assign(ts.data(), ts.size()); } } - if (is_blob_index_ != nullptr) { - *is_blob_index_ = (type == kTypeBlobIndex); - } return false; case kTypeDeletion: @@ -315,20 +335,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kDeleted; } else if (kMerge == state_) { state_ = kFound; - if (LIKELY(pinnable_val_ != nullptr)) { - if (do_merge_) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, clock_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; - } - } - // If do_merge_ = false then the current value shouldn't be part of - // merge_context_->operand_list - } + Merge(nullptr); + // If do_merge_ = false then the current value shouldn't be part of + // merge_context_->operand_list } return false; @@ -341,20 +350,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, merge_operator_->ShouldMerge( merge_context_->GetOperandsDirectionBackward())) { state_ = kFound; - if (LIKELY(pinnable_val_ != nullptr)) { - // do_merge_ = true this is the case where this function is called - // as part of DB Get API hence merge operators should be merged. - if (do_merge_) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, clock_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; - } - } - } + Merge(nullptr); return false; } return true; @@ -369,6 +365,35 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, return false; } +void GetContext::Merge(const Slice* value) { + if (LIKELY(pinnable_val_ != nullptr)) { + if (do_merge_) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, value, merge_context_->GetOperands(), + pinnable_val_->GetSelf(), logger_, statistics_, clock_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } + } + } +} + +bool GetContext::GetBlobValue(const Slice& blob_index, + PinnableSlice* blob_value) { + Status status = blob_fetcher_->FetchBlob(user_key_, blob_index, blob_value); + if (!status.ok()) { + if (status.IsIncomplete()) { + MarkKeyMayExist(); + return false; + } + state_ = kCorrupt; + return false; + } + *is_blob_index_ = false; + return true; +} + void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) { if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && value_pinner != nullptr) { diff --git a/table/get_context.h b/table/get_context.h index 97ee9d751..9b2f67807 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -5,6 +5,8 @@ #pragma once #include + +#include "db/blob/blob_fetcher.h" #include "db/dbformat.h" #include "db/merge_context.h" #include "db/read_callback.h" @@ -103,7 +105,7 @@ class GetContext { SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr, - uint64_t tracing_get_id = 0); + uint64_t tracing_get_id = 0, BlobFetcher* blob_fetcher = nullptr); GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, PinnableSlice* value, @@ -113,7 +115,7 @@ class GetContext { SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr, - uint64_t tracing_get_id = 0); + uint64_t tracing_get_id = 0, BlobFetcher* blob_fetcher = nullptr); GetContext() = delete; @@ -170,6 +172,9 @@ class GetContext { void push_operand(const Slice& value, Cleanable* value_pinner); private: + void Merge(const Slice* value); + bool GetBlobValue(const Slice& blob_index, PinnableSlice* blob_value); + const Comparator* ucmp_; const MergeOperator* merge_operator_; // the merge operations encountered; @@ -200,6 +205,7 @@ class GetContext { // Used for block cache tracing only. A tracing get id uniquely identifies a // Get or a MultiGet. const uint64_t tracing_get_id_; + BlobFetcher* blob_fetcher_; }; // Call this to replay a log and bring the get_context up to date. The replay