From c293472908d3b0452848483e86f99d0086eb86ea Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 3 Oct 2017 09:08:07 -0700 Subject: [PATCH] Add ValueType::kTypeBlobIndex Summary: Add kTypeBlobIndex value type, which will be used by blob db only, to insert a (key, blob_offset) KV pair. The purpose is to 1. Make it possible to open existing rocksdb instance as blob db. Existing value will be of kTypeIndex type, while value inserted by blob db will be of kTypeBlobIndex. 2. Make rocksdb able to detect if the db contains value written by blob db, if so return error. 3. Make it possible to have blob db optionally store value in SST file (with kTypeValue type) or as a blob value (with kTypeBlobIndex type). The root db (DBImpl) basically pretended kTypeBlobIndex are normal value on write. On Get if is_blob is provided, return whether the value read is of kTypeBlobIndex type, or return Status::NotSupported() status if is_blob is not provided. On scan allow_blob flag is pass and if the flag is true, return wether the value is of kTypeBlobIndex type via iter->IsBlob(). Changes on blob db side will be in a separate patch. Closes https://github.com/facebook/rocksdb/pull/2886 Differential Revision: D5838431 Pulled By: yiwu-arbug fbshipit-source-id: 3c5306c62bc13bb11abc03422ec5cbcea1203cca --- CMakeLists.txt | 1 + Makefile | 4 + TARGETS | 1 + db/compaction_iterator.cc | 2 + db/db_blob_index_test.cc | 407 ++++++++++++++++++++++++++++++ db/db_impl.cc | 158 ++++++------ db/db_impl.h | 13 +- db/db_impl_debug.cc | 9 + db/db_iter.cc | 99 ++++++-- db/db_iter.h | 15 +- db/dbformat.cc | 2 +- db/dbformat.h | 4 +- db/memtable.cc | 24 +- db/memtable.h | 8 +- db/memtable_list.cc | 18 +- db/memtable_list.h | 10 +- db/version_set.cc | 10 +- db/version_set.h | 3 +- db/write_batch.cc | 64 ++++- db/write_batch_internal.h | 3 + include/rocksdb/listener.h | 1 + include/rocksdb/write_batch.h | 6 + table/get_context.cc | 28 +- table/get_context.h | 7 +- utilities/blob_db/blob_db_test.cc | 2 +- 25 files changed, 753 insertions(+), 146 deletions(-) create mode 100644 db/db_blob_index_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 31ab5abb8..ab2177b88 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -713,6 +713,7 @@ if(WITH_TESTS) db/corruption_test.cc db/cuckoo_table_db_test.cc db/db_basic_test.cc + db/db_blob_index_test.cc db/db_block_cache_test.cc db/db_bloom_filter_test.cc db/db_compaction_filter_test.cc diff --git a/Makefile b/Makefile index a657fad72..5a89f6bf7 100644 --- a/Makefile +++ b/Makefile @@ -360,6 +360,7 @@ TESTS = \ db_wal_test \ db_block_cache_test \ db_test \ + db_blob_index_test \ db_bloom_filter_test \ db_iter_test \ db_log_iter_test \ @@ -1063,6 +1064,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_blob_index_test: db/db_blob_index_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_block_cache_test: db/db_block_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 3fac4a737..ac85eab93 100644 --- a/TARGETS +++ b/TARGETS @@ -367,6 +367,7 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'], ['cuckoo_table_reader_test', 'table/cuckoo_table_reader_test.cc', 'serial'], ['date_tiered_test', 'utilities/date_tiered/date_tiered_test.cc', 'serial'], ['db_basic_test', 'db/db_basic_test.cc', 'serial'], + ['db_blob_index_test', 'db/db_blob_index_test.cc', 'serial'], ['db_block_cache_test', 'db/db_block_cache_test.cc', 'serial'], ['db_bloom_filter_test', 'db/db_bloom_filter_test.cc', 'serial'], ['db_compaction_filter_test', 'db/db_compaction_filter_test.cc', 'parallel'], diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 547260292..8eac637c4 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -25,6 +25,8 @@ CompactionEventListener::CompactionListenerValueType fromInternalValueType( kSingleDelete; case kTypeRangeDeletion: return CompactionEventListener::CompactionListenerValueType::kRangeDelete; + case kTypeBlobIndex: + return CompactionEventListener::CompactionListenerValueType::kBlobIndex; default: assert(false); return CompactionEventListener::CompactionListenerValueType::kInvalid; diff --git a/db/db_blob_index_test.cc b/db/db_blob_index_test.cc new file mode 100644 index 000000000..bfc95760c --- /dev/null +++ b/db/db_blob_index_test.cc @@ -0,0 +1,407 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include +#include +#include +#include + +#include "db/column_family.h" +#include "db/db_iter.h" +#include "db/db_test_util.h" +#include "db/dbformat.h" +#include "db/write_batch_internal.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "util/string_util.h" +#include "utilities/merge_operators.h" + +namespace rocksdb { + +// kTypeBlobIndex is a value type used by BlobDB only. The base rocksdb +// should accept the value type on write, and report not supported value +// for reads, unless caller request for it explicitly. The base rocksdb +// doesn't understand format of actual blob index (the value). +class DBBlobIndexTest : public DBTestBase { + public: + enum Tier { + kMemtable = 0, + kImmutableMemtables = 1, + kL0SstFile = 2, + kLnSstFile = 3, + }; + const std::vector kAllTiers = {Tier::kMemtable, + Tier::kImmutableMemtables, + Tier::kL0SstFile, Tier::kLnSstFile}; + + DBBlobIndexTest() : DBTestBase("/db_blob_index_test") {} + + ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); } + + ColumnFamilyData* cfd() { + return reinterpret_cast(cfh())->cfd(); + } + + Status PutBlobIndex(WriteBatch* batch, const Slice& key, + const Slice& blob_index) { + return WriteBatchInternal::PutBlobIndex(batch, cfd()->GetID(), key, + blob_index); + } + + Status Write(WriteBatch* batch) { + return dbfull()->Write(WriteOptions(), batch); + } + + std::string GetImpl(const Slice& key, bool* is_blob_index = nullptr, + const Snapshot* snapshot = nullptr) { + ReadOptions read_options; + read_options.snapshot = snapshot; + PinnableSlice value; + auto s = dbfull()->GetImpl(read_options, cfh(), key, &value, + nullptr /*value_found*/, is_blob_index); + if (s.IsNotFound()) { + return "NOT_FOUND"; + } + if (s.IsNotSupported()) { + return "NOT_SUPPORTED"; + } + if (!s.ok()) { + return s.ToString(); + } + return value.ToString(); + } + + std::string GetBlobIndex(const Slice& key, + const Snapshot* snapshot = nullptr) { + bool is_blob_index = false; + std::string value = GetImpl(key, &is_blob_index, snapshot); + if (!is_blob_index) { + return "NOT_BLOB"; + } + return value; + } + + ArenaWrappedDBIter* GetBlobIterator() { + return dbfull()->NewIteratorImpl(ReadOptions(), cfd(), + dbfull()->GetLatestSequenceNumber(), + true /*allow_blob*/); + } + + Options GetTestOptions() { + Options options; + options.create_if_missing = true; + options.num_levels = 2; + options.disable_auto_compactions = true; + // Disable auto flushes. + options.max_write_buffer_number = 10; + options.min_write_buffer_number_to_merge = 10; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + return options; + } + + void MoveDataTo(Tier tier) { + switch (tier) { + case Tier::kMemtable: + break; + case Tier::kImmutableMemtables: + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + break; + case Tier::kL0SstFile: + ASSERT_OK(Flush()); + break; + case Tier::kLnSstFile: + ASSERT_OK(Flush()); + ASSERT_OK(Put("a", "dummy")); + ASSERT_OK(Put("z", "dummy")); + ASSERT_OK(Flush()); + ASSERT_OK( + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel()); + break; + } + } +}; + +// Should be able to write kTypeBlobIndex to memtables and SST files. +TEST_F(DBBlobIndexTest, Write) { + for (auto tier : kAllTiers) { + DestroyAndReopen(GetTestOptions()); + for (int i = 1; i <= 5; i++) { + std::string index = ToString(i); + WriteBatch batch; + ASSERT_OK(PutBlobIndex(&batch, "key" + index, "blob" + index)); + ASSERT_OK(Write(&batch)); + } + MoveDataTo(tier); + for (int i = 1; i <= 5; i++) { + std::string index = ToString(i); + ASSERT_EQ("blob" + index, GetBlobIndex("key" + index)); + } + } +} + +// Get should be able to return blob index if is_blob_index is provided, +// otherwise return Status::NotSupported status. +TEST_F(DBBlobIndexTest, Get) { + for (auto tier : kAllTiers) { + DestroyAndReopen(GetTestOptions()); + WriteBatch batch; + ASSERT_OK(batch.Put("key", "value")); + ASSERT_OK(PutBlobIndex(&batch, "blob_key", "blob_index")); + ASSERT_OK(Write(&batch)); + MoveDataTo(tier); + // Verify normal value + bool is_blob_index = false; + PinnableSlice value; + ASSERT_EQ("value", Get("key")); + ASSERT_EQ("value", GetImpl("key")); + ASSERT_EQ("value", GetImpl("key", &is_blob_index)); + ASSERT_FALSE(is_blob_index); + // Verify blob index + ASSERT_TRUE(Get("blob_key", &value).IsNotSupported()); + ASSERT_EQ("NOT_SUPPORTED", GetImpl("blob_key")); + ASSERT_EQ("blob_index", GetImpl("blob_key", &is_blob_index)); + ASSERT_TRUE(is_blob_index); + } +} + +// Get should NOT return Status::NotSupported if blob index is updated with +// a normal value. +TEST_F(DBBlobIndexTest, Updated) { + for (auto tier : kAllTiers) { + DestroyAndReopen(GetTestOptions()); + WriteBatch batch; + for (int i = 0; i < 10; i++) { + ASSERT_OK(PutBlobIndex(&batch, "key" + ToString(i), "blob_index")); + } + ASSERT_OK(Write(&batch)); + // Avoid blob values from being purged. + const Snapshot* snapshot = dbfull()->GetSnapshot(); + ASSERT_OK(Put("key1", "new_value")); + ASSERT_OK(Merge("key2", "a")); + ASSERT_OK(Merge("key2", "b")); + ASSERT_OK(Merge("key2", "c")); + ASSERT_OK(Delete("key3")); + ASSERT_OK(SingleDelete("key4")); + ASSERT_OK(Delete("key5")); + ASSERT_OK(Merge("key5", "a")); + ASSERT_OK(Merge("key5", "b")); + ASSERT_OK(Merge("key5", "c")); + ASSERT_OK(dbfull()->DeleteRange(WriteOptions(), cfh(), "key6", "key9")); + MoveDataTo(tier); + for (int i = 0; i < 10; i++) { + ASSERT_EQ("blob_index", GetBlobIndex("key" + ToString(i), snapshot)); + } + ASSERT_EQ("new_value", Get("key1")); + ASSERT_EQ("NOT_SUPPORTED", GetImpl("key2")); + ASSERT_EQ("NOT_FOUND", Get("key3")); + ASSERT_EQ("NOT_FOUND", Get("key4")); + ASSERT_EQ("a,b,c", GetImpl("key5")); + for (int i = 6; i < 9; i++) { + ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i))); + } + ASSERT_EQ("blob_index", GetBlobIndex("key9")); + dbfull()->ReleaseSnapshot(snapshot); + } +} + +// Iterator should get blob value if allow_blob flag is set, +// otherwise return Status::NotSupported status. +TEST_F(DBBlobIndexTest, Iterate) { + const std::vector> data = { + /*00*/ {kTypeValue}, + /*01*/ {kTypeBlobIndex}, + /*02*/ {kTypeValue}, + /*03*/ {kTypeBlobIndex, kTypeValue}, + /*04*/ {kTypeValue}, + /*05*/ {kTypeValue, kTypeBlobIndex}, + /*06*/ {kTypeValue}, + /*07*/ {kTypeDeletion, kTypeBlobIndex}, + /*08*/ {kTypeValue}, + /*09*/ {kTypeSingleDeletion, kTypeBlobIndex}, + /*10*/ {kTypeValue}, + /*11*/ {kTypeMerge, kTypeMerge, kTypeMerge, kTypeBlobIndex}, + /*12*/ {kTypeValue}, + /*13*/ + {kTypeMerge, kTypeMerge, kTypeMerge, kTypeDeletion, kTypeBlobIndex}, + /*14*/ {kTypeValue}, + /*15*/ {kTypeBlobIndex}, + /*16*/ {kTypeValue}, + }; + + auto get_key = [](int index) { + char buf[20]; + snprintf(buf, sizeof(buf), "%02d", index); + return "key" + std::string(buf); + }; + + auto get_value = [&](int index, int version) { + return get_key(index) + "_value" + ToString(version); + }; + + auto check_iterator = [&](Iterator* iterator, Status::Code expected_status, + const Slice& expected_value) { + ASSERT_EQ(expected_status, iterator->status().code()); + if (expected_status == Status::kOk) { + ASSERT_TRUE(iterator->Valid()); + ASSERT_EQ(expected_value, iterator->value()); + } else { + ASSERT_FALSE(iterator->Valid()); + } + }; + + auto create_normal_iterator = [&]() -> Iterator* { + return dbfull()->NewIterator(ReadOptions()); + }; + + auto create_blob_iterator = [&]() -> Iterator* { return GetBlobIterator(); }; + + auto check_is_blob = [&](bool is_blob) { + return [is_blob](Iterator* iterator) { + ASSERT_EQ(is_blob, + reinterpret_cast(iterator)->IsBlob()); + }; + }; + + auto verify = [&](int index, Status::Code expected_status, + const Slice& forward_value, const Slice& backward_value, + std::function create_iterator, + std::function extra_check = nullptr) { + // Seek + auto* iterator = create_iterator(); + ASSERT_OK(iterator->Refresh()); + iterator->Seek(get_key(index)); + check_iterator(iterator, expected_status, forward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + + // Next + iterator = create_iterator(); + ASSERT_OK(iterator->Refresh()); + iterator->Seek(get_key(index - 1)); + ASSERT_TRUE(iterator->Valid()); + iterator->Next(); + check_iterator(iterator, expected_status, forward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + + // SeekForPrev + iterator = create_iterator(); + ASSERT_OK(iterator->Refresh()); + iterator->SeekForPrev(get_key(index)); + check_iterator(iterator, expected_status, backward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + + // Prev + iterator = create_iterator(); + iterator->Seek(get_key(index + 1)); + ASSERT_TRUE(iterator->Valid()); + iterator->Prev(); + check_iterator(iterator, expected_status, backward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + }; + + for (auto tier : {Tier::kMemtable} /*kAllTiers*/) { + // Avoid values from being purged. + std::vector snapshots; + DestroyAndReopen(GetTestOptions()); + + // fill data + for (int i = 0; i < static_cast(data.size()); i++) { + for (int j = static_cast(data[i].size()) - 1; j >= 0; j--) { + std::string key = get_key(i); + std::string value = get_value(i, j); + WriteBatch batch; + switch (data[i][j]) { + case kTypeValue: + ASSERT_OK(Put(key, value)); + break; + case kTypeDeletion: + ASSERT_OK(Delete(key)); + break; + case kTypeSingleDeletion: + ASSERT_OK(SingleDelete(key)); + break; + case kTypeMerge: + ASSERT_OK(Merge(key, value)); + break; + case kTypeBlobIndex: + ASSERT_OK(PutBlobIndex(&batch, key, value)); + ASSERT_OK(Write(&batch)); + break; + default: + assert(false); + }; + } + snapshots.push_back(dbfull()->GetSnapshot()); + } + ASSERT_OK( + dbfull()->DeleteRange(WriteOptions(), cfh(), get_key(15), get_key(16))); + snapshots.push_back(dbfull()->GetSnapshot()); + MoveDataTo(tier); + + // Normal iterator + verify(1, Status::kNotSupported, "", "", create_normal_iterator); + verify(3, Status::kNotSupported, "", "", create_normal_iterator); + verify(5, Status::kOk, get_value(5, 0), get_value(5, 0), + create_normal_iterator); + verify(7, Status::kOk, get_value(8, 0), get_value(6, 0), + create_normal_iterator); + verify(9, Status::kOk, get_value(10, 0), get_value(8, 0), + create_normal_iterator); + verify(11, Status::kNotSupported, "", "", create_normal_iterator); + verify(13, Status::kOk, + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + create_normal_iterator); + verify(15, Status::kOk, get_value(16, 0), get_value(14, 0), + create_normal_iterator); + + // Iterator with blob support + verify(1, Status::kOk, get_value(1, 0), get_value(1, 0), + create_blob_iterator, check_is_blob(true)); + verify(3, Status::kOk, get_value(3, 0), get_value(3, 0), + create_blob_iterator, check_is_blob(true)); + verify(5, Status::kOk, get_value(5, 0), get_value(5, 0), + create_blob_iterator, check_is_blob(false)); + verify(7, Status::kOk, get_value(8, 0), get_value(6, 0), + create_blob_iterator, check_is_blob(false)); + verify(9, Status::kOk, get_value(10, 0), get_value(8, 0), + create_blob_iterator, check_is_blob(false)); + verify(11, Status::kNotSupported, "", "", create_blob_iterator); + verify(13, Status::kOk, + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + create_blob_iterator, check_is_blob(false)); + verify(15, Status::kOk, get_value(16, 0), get_value(14, 0), + create_blob_iterator, check_is_blob(false)); + + for (auto* snapshot : snapshots) { + dbfull()->ReleaseSnapshot(snapshot); + } + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_impl.cc b/db/db_impl.cc index 4aba14c60..688bc51fa 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -909,7 +909,8 @@ Status DBImpl::Get(const ReadOptions& read_options, Status DBImpl::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - PinnableSlice* pinnable_val, bool* value_found) { + PinnableSlice* pinnable_val, bool* value_found, + bool* is_blob_index) { assert(pinnable_val != nullptr); StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); @@ -959,13 +960,13 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, bool done = false; if (!skip_memtable) { if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &range_del_agg, read_options)) { + &range_del_agg, read_options, is_blob_index)) { done = true; pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } else if ((s.ok() || s.IsMergeInProgress()) && sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &range_del_agg, read_options)) { + &range_del_agg, read_options, is_blob_index)) { done = true; pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); @@ -977,7 +978,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, - &range_del_agg, value_found); + &range_del_agg, value_found, nullptr, nullptr, + is_blob_index); RecordTick(stats_, MEMTABLE_MISS); } @@ -1417,73 +1419,79 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); - auto snapshot = read_options.snapshot != nullptr - ? reinterpret_cast( - read_options.snapshot)->number_ + ? reinterpret_cast(read_options.snapshot) + ->number_ : latest_snapshot; - - // Try to generate a DB iterator tree in continuous memory area to be - // cache friendly. Here is an example of result: - // +-------------------------------+ - // | | - // | ArenaWrappedDBIter | - // | + | - // | +---> Inner Iterator ------------+ - // | | | | - // | | +-- -- -- -- -- -- -- --+ | - // | +--- | Arena | | - // | | | | - // | Allocated Memory: | | - // | | +-------------------+ | - // | | | DBIter | <---+ - // | | + | - // | | | +-> iter_ ------------+ - // | | | | | - // | | +-------------------+ | - // | | | MergingIterator | <---+ - // | | + | - // | | | +->child iter1 ------------+ - // | | | | | | - // | | +->child iter2 ----------+ | - // | | | | | | | - // | | | +->child iter3 --------+ | | - // | | | | | | - // | | +-------------------+ | | | - // | | | Iterator1 | <--------+ - // | | +-------------------+ | | - // | | | Iterator2 | <------+ - // | | +-------------------+ | - // | | | Iterator3 | <----+ - // | | +-------------------+ - // | | | - // +-------+-----------------------+ - // - // ArenaWrappedDBIter inlines an arena area where all the iterators in - // the iterator tree are allocated in the order of being accessed when - // querying. - // Laying out the iterators in the order of being accessed makes it more - // likely that any iterator pointer is close to the iterator it points to so - // that they are likely to be in the same cache line and/or page. - ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, read_options, *cfd->ioptions(), snapshot, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, - ((read_options.snapshot != nullptr) ? nullptr : this), cfd); - - InternalIterator* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator()); - db_iter->SetIterUnderDBIter(internal_iter); - - return db_iter; + return NewIteratorImpl(read_options, cfd, snapshot); } // To stop compiler from complaining return nullptr; } +ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + bool allow_blob) { + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + + // Try to generate a DB iterator tree in continuous memory area to be + // cache friendly. Here is an example of result: + // +-------------------------------+ + // | | + // | ArenaWrappedDBIter | + // | + | + // | +---> Inner Iterator ------------+ + // | | | | + // | | +-- -- -- -- -- -- -- --+ | + // | +--- | Arena | | + // | | | | + // | Allocated Memory: | | + // | | +-------------------+ | + // | | | DBIter | <---+ + // | | + | + // | | | +-> iter_ ------------+ + // | | | | | + // | | +-------------------+ | + // | | | MergingIterator | <---+ + // | | + | + // | | | +->child iter1 ------------+ + // | | | | | | + // | | +->child iter2 ----------+ | + // | | | | | | | + // | | | +->child iter3 --------+ | | + // | | | | | | + // | | +-------------------+ | | | + // | | | Iterator1 | <--------+ + // | | +-------------------+ | | + // | | | Iterator2 | <------+ + // | | +-------------------+ | + // | | | Iterator3 | <----+ + // | | +-------------------+ + // | | | + // +-------+-----------------------+ + // + // ArenaWrappedDBIter inlines an arena area where all the iterators in + // the iterator tree are allocated in the order of being accessed when + // querying. + // Laying out the iterators in the order of being accessed makes it more + // likely that any iterator pointer is close to the iterator it points to so + // that they are likely to be in the same cache line and/or page. + ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( + env_, read_options, *cfd->ioptions(), snapshot, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + sv->version_number, ((read_options.snapshot != nullptr) ? nullptr : this), + cfd, allow_blob); + + InternalIterator* internal_iter = + NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator()); + db_iter->SetIterUnderDBIter(internal_iter); + + return db_iter; +} + Status DBImpl::NewIterators( const ReadOptions& read_options, const std::vector& column_families, @@ -1527,28 +1535,16 @@ Status DBImpl::NewIterators( #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); + auto snapshot = + read_options.snapshot != nullptr + ? reinterpret_cast(read_options.snapshot) + ->number_ + : latest_snapshot; for (size_t i = 0; i < column_families.size(); ++i) { auto* cfd = reinterpret_cast( column_families[i])->cfd(); - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); - - auto snapshot = - read_options.snapshot != nullptr - ? reinterpret_cast( - read_options.snapshot)->number_ - : latest_snapshot; - - ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, read_options, *cfd->ioptions(), snapshot, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, - ((read_options.snapshot != nullptr) ? nullptr : this), cfd); - InternalIterator* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator()); - db_iter->SetIterUnderDBIter(internal_iter); - iterators->push_back(db_iter); + iterators->push_back(NewIteratorImpl(read_options, cfd, snapshot)); } } diff --git a/db/db_impl.h b/db/db_impl.h index 39c1d6103..76b52b8b8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -52,6 +52,7 @@ namespace rocksdb { +class ArenaWrappedDBIter; class MemTable; class TableCache; class Version; @@ -123,6 +124,7 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found = nullptr) override; + using DB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) override; @@ -130,6 +132,11 @@ class DBImpl : public DB { const ReadOptions& options, const std::vector& column_families, std::vector* iterators) override; + ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + bool allow_blob = false); + virtual const Snapshot* GetSnapshot() override; virtual void ReleaseSnapshot(const Snapshot* snapshot) override; using DB::GetProperty; @@ -341,6 +348,8 @@ class DBImpl : public DB { return alive_log_files_.begin()->getting_flushed; } + Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr); + // Force current memtable contents to be flushed. Status TEST_FlushMemTable(bool wait = true, ColumnFamilyHandle* cfh = nullptr); @@ -644,7 +653,9 @@ class DBImpl : public DB { friend struct SuperVersion; friend class CompactedDBImpl; #ifndef NDEBUG + friend class DBTest2_ReadCallbackTest_Test; friend class XFTransactionWriteHandler; + friend class DBBlobIndexTest; #endif struct CompactionState; @@ -1245,7 +1256,7 @@ class DBImpl : public DB { // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value, - bool* value_found = nullptr); + bool* value_found = nullptr, bool* is_blob_index = nullptr); bool GetIntPropertyInternal(ColumnFamilyData* cfd, const DBPropertyInfo& property_info, diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index de5b66f2a..a4b378020 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -80,6 +80,15 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, disallow_trivial_move); } +Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) { + WriteContext write_context; + InstrumentedMutexLock l(&mutex_); + if (cfd == nullptr) { + cfd = default_cf_handle_->cfd(); + } + return SwitchMemtable(cfd, &write_context); +} + Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) { FlushOptions fo; fo.wait = wait; diff --git a/db/db_iter.cc b/db/db_iter.cc index 33f926ce0..e4a6c92a7 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -8,8 +8,6 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_iter.h" -#include -#include #include #include @@ -18,7 +16,6 @@ #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" #include "monitoring/perf_context_imp.h" -#include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" @@ -105,7 +102,7 @@ class DBIter: public Iterator { DBIter(Env* _env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const Comparator* cmp, InternalIterator* iter, SequenceNumber s, bool arena_mode, - uint64_t max_sequential_skip_in_iterations) + uint64_t max_sequential_skip_in_iterations, bool allow_blob) : arena_mode_(arena_mode), env_(_env), logger_(cf_options.info_log), @@ -122,7 +119,8 @@ class DBIter: public Iterator { pin_thru_lifetime_(read_options.pin_data), total_order_seek_(read_options.total_order_seek), range_del_agg_(cf_options.internal_comparator, s, - true /* collapse_deletions */) { + true /* collapse_deletions */), + allow_blob_(allow_blob) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = cf_options.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; @@ -180,6 +178,10 @@ class DBIter: public Iterator { return status_; } } + bool IsBlob() const { + assert(valid_ && (allow_blob_ || !is_blob_)); + return is_blob_; + } virtual Status GetProperty(std::string prop_name, std::string* prop) override { @@ -287,6 +289,8 @@ class DBIter: public Iterator { RangeDelAggregator range_del_agg_; LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; + bool allow_blob_; + bool is_blob_; // No copying allowed DBIter(const DBIter&); @@ -376,6 +380,8 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // - none of the above : saved_key_ can contain anything, it doesn't matter. uint64_t num_skipped = 0; + is_blob_ = false; + do { ParsedInternalKey ikey; @@ -420,6 +426,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeValue: + case kTypeBlobIndex: saved_key_.SetUserKey( ikey.user_key, !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); @@ -431,6 +438,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { skipping = true; num_skipped = 0; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } else if (ikey.type == kTypeBlobIndex) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + } else { + is_blob_ = true; + valid_ = true; + } + return; } else { valid_ = true; return; @@ -572,6 +591,18 @@ void DBIter::MergeValuesNewToOld() { merge_context_.PushOperand(iter_->value(), iter_->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); + } else if (kTypeBlobIndex == ikey.type) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else { + status_ = + Status::NotSupported("Blob DB does not support merge operator."); + } + valid_ = false; + return; } else { assert(false); } @@ -678,7 +709,6 @@ void DBIter::PrevInternal() { !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); if (FindValueForCurrentKey()) { - valid_ = true; if (!iter_->Valid()) { return; } @@ -745,6 +775,7 @@ bool DBIter::FindValueForCurrentKey() { last_key_entry_type = ikey.type; switch (last_key_entry_type) { case kTypeValue: + case kTypeBlobIndex: if (range_del_agg_.ShouldDelete( ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { @@ -790,6 +821,7 @@ bool DBIter::FindValueForCurrentKey() { } Status s; + is_blob_ = false; switch (last_key_entry_type) { case kTypeDeletion: case kTypeSingleDeletion: @@ -805,6 +837,18 @@ bool DBIter::FindValueForCurrentKey() { merge_operator_, saved_key_.GetUserKey(), nullptr, merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_, &pinned_value_, true); + } else if (last_not_merge_type == kTypeBlobIndex) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else { + status_ = + Status::NotSupported("Blob DB does not support merge operator."); + } + valid_ = false; + return true; } else { assert(last_not_merge_type == kTypeValue); s = MergeHelper::TimedFullMerge( @@ -816,6 +860,17 @@ bool DBIter::FindValueForCurrentKey() { case kTypeValue: // do nothing - we've already has value in saved_value_ break; + case kTypeBlobIndex: + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + return true; + } + is_blob_ = true; + break; default: assert(false); break; @@ -849,7 +904,15 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { valid_ = false; return false; } - if (ikey.type == kTypeValue) { + if (ikey.type == kTypeBlobIndex && !allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + return true; + } + if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { assert(iter_->IsValuePinned()); pinned_value_ = iter_->value(); valid_ = true; @@ -1160,10 +1223,11 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, const Comparator* user_key_comparator, InternalIterator* internal_iter, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations) { - DBIter* db_iter = new DBIter(env, read_options, cf_options, - user_key_comparator, internal_iter, sequence, - false, max_sequential_skip_in_iterations); + uint64_t max_sequential_skip_in_iterations, + bool allow_blob) { + DBIter* db_iter = new DBIter( + env, read_options, cf_options, user_key_comparator, internal_iter, + sequence, false, max_sequential_skip_in_iterations, allow_blob); return db_iter; } @@ -1191,6 +1255,7 @@ inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } +bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); } inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name, std::string* prop) { if (prop_name == "rocksdb.iterator.super-version-number") { @@ -1207,11 +1272,11 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration, - uint64_t version_number) { + uint64_t version_number, bool allow_blob) { auto mem = arena_.AllocateAligned(sizeof(DBIter)); db_iter_ = new (mem) DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr, - sequence, true, max_sequential_skip_in_iteration); + sequence, true, max_sequential_skip_in_iteration, allow_blob); sv_number_ = version_number; } @@ -1231,7 +1296,7 @@ Status ArenaWrappedDBIter::Refresh() { SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); Init(env, read_options_, *(cfd_->ioptions()), latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, - cur_sv_number); + cur_sv_number, allow_blob_); InternalIterator* internal_iter = db_impl_->NewInternalIterator( read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator()); @@ -1247,12 +1312,12 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - DBImpl* db_impl, ColumnFamilyData* cfd) { + DBImpl* db_impl, ColumnFamilyData* cfd, bool allow_blob) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); iter->Init(env, read_options, cf_options, sequence, - max_sequential_skip_in_iterations, version_number); + max_sequential_skip_in_iterations, version_number, allow_blob); if (db_impl != nullptr && cfd != nullptr) { - iter->StoreRefreshInfo(read_options, db_impl, cfd); + iter->StoreRefreshInfo(read_options, db_impl, cfd, allow_blob); } return iter; diff --git a/db/db_iter.h b/db/db_iter.h index ea98ff433..26fcd44cb 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -33,7 +33,8 @@ extern Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, const Comparator* user_key_comparator, InternalIterator* internal_iter, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations); + uint64_t max_sequential_skip_in_iterations, + bool allow_blob = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -63,20 +64,22 @@ class ArenaWrappedDBIter : public Iterator { virtual Slice value() const override; virtual Status status() const override; virtual Status Refresh() override; + bool IsBlob() const; virtual Status GetProperty(std::string prop_name, std::string* prop) override; void Init(Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, - uint64_t version_number); + uint64_t max_sequential_skip_in_iterations, uint64_t version_number, + bool allow_blob); void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, - ColumnFamilyData* cfd) { + ColumnFamilyData* cfd, bool allow_blob) { read_options_ = read_options; db_impl_ = db_impl; cfd_ = cfd; + allow_blob_ = allow_blob; } private: @@ -86,6 +89,7 @@ class ArenaWrappedDBIter : public Iterator { ColumnFamilyData* cfd_ = nullptr; DBImpl* db_impl_ = nullptr; ReadOptions read_options_; + bool allow_blob_ = false; }; // Generate the arena wrapped iterator class. @@ -95,6 +99,7 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr); + DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr, + bool allow_blob = false); } // namespace rocksdb diff --git a/db/dbformat.cc b/db/dbformat.cc index 20c54495a..f287ae9f4 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -27,7 +27,7 @@ namespace rocksdb { // and the value type is embedded as the low 8 bits in the sequence // number in internal keys, we need to use the highest-numbered // ValueType, not the lowest). -const ValueType kValueTypeForSeek = kTypeSingleDeletion; +const ValueType kValueTypeForSeek = kTypeBlobIndex; const ValueType kValueTypeForSeekForPrev = kTypeDeletion; uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { diff --git a/db/dbformat.h b/db/dbformat.h index d9fd5f399..c58b8363a 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -47,6 +47,8 @@ enum ValueType : unsigned char { kTypeNoop = 0xD, // WAL only. kTypeColumnFamilyRangeDeletion = 0xE, // WAL only. kTypeRangeDeletion = 0xF, // meta block + kTypeColumnFamilyBlobIndex = 0x10, // Blob DB only + kTypeBlobIndex = 0x11, // Blob DB only kMaxValue = 0x7F // Not used for storing records. }; @@ -57,7 +59,7 @@ extern const ValueType kValueTypeForSeekForPrev; // Checks whether a type is an inline value type // (i.e. a type used in memtable skiplist and sst file datablock). inline bool IsValueType(ValueType t) { - return t <= kTypeMerge || t == kTypeSingleDeletion; + return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex; } // Checks whether a type is from user operation diff --git a/db/memtable.cc b/db/memtable.cc index 9f2fd20bb..22b4125e0 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -555,6 +555,7 @@ struct Saver { Statistics* statistics; bool inplace_update_support; Env* env_; + bool* is_blob_index; }; } // namespace @@ -584,11 +585,26 @@ static bool SaveValue(void* arg, const char* entry) { ValueType type; UnPackSequenceAndType(tag, &s->seq, &type); - if ((type == kTypeValue || type == kTypeMerge) && + if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) { type = kTypeRangeDeletion; } switch (type) { + case kTypeBlobIndex: + if (s->is_blob_index == nullptr) { + ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index."); + *(s->status) = Status::NotSupported( + "Encounter unsupported blob value. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else if (*(s->merge_in_progress)) { + *(s->status) = + Status::NotSupported("Blob DB does not support merge operator."); + } + if (!s->status->ok()) { + *(s->found_final_value) = true; + return false; + } + // intentional fallthrough case kTypeValue: { if (s->inplace_update_support) { s->mem->GetLock(s->key->user_key())->ReadLock(); @@ -607,6 +623,9 @@ static bool SaveValue(void* arg, const char* entry) { s->mem->GetLock(s->key->user_key())->ReadUnlock(); } *(s->found_final_value) = true; + if (s->is_blob_index != nullptr) { + *(s->is_blob_index) = (type == kTypeBlobIndex); + } return false; } case kTypeDeletion: @@ -653,7 +672,7 @@ static bool SaveValue(void* arg, const char* entry) { bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, - const ReadOptions& read_opts) { + const ReadOptions& read_opts, bool* is_blob_index) { // The sequence number is updated synchronously in version_set.h if (IsEmpty()) { // Avoiding recording stats for speed. @@ -699,6 +718,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.inplace_update_support = moptions_.inplace_update_support; saver.statistics = moptions_.statistics; saver.env_ = env_; + saver.is_blob_index = is_blob_index; table_->Get(key, &saver, SaveValue); *seq = saver.seq; diff --git a/db/memtable.h b/db/memtable.h index 9669a2157..896792484 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -187,13 +187,15 @@ class MemTable { // status returned indicates a corruption or other unexpected error. bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - SequenceNumber* seq, const ReadOptions& read_opts); + SequenceNumber* seq, const ReadOptions& read_opts, + bool* is_blob_index = nullptr); bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts) { + const ReadOptions& read_opts, bool* is_blob_index = nullptr) { SequenceNumber seq; - return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); + return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts, + is_blob_index); } // Attempts to update the new_value inplace, else does normal Add diff --git a/db/memtable_list.cc b/db/memtable_list.cc index c9a927c06..a9d9e1c02 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -104,10 +104,10 @@ int MemTableList::NumFlushed() const { bool MemTableListVersion::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - SequenceNumber* seq, - const ReadOptions& read_opts) { + SequenceNumber* seq, const ReadOptions& read_opts, + bool* is_blob_index) { return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg, - seq, read_opts); + seq, read_opts, is_blob_index); } bool MemTableListVersion::GetFromHistory(const LookupKey& key, @@ -120,19 +120,17 @@ bool MemTableListVersion::GetFromHistory(const LookupKey& key, range_del_agg, seq, read_opts); } -bool MemTableListVersion::GetFromList(std::list* list, - const LookupKey& key, std::string* value, - Status* s, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, - SequenceNumber* seq, - const ReadOptions& read_opts) { +bool MemTableListVersion::GetFromList( + std::list* list, const LookupKey& key, std::string* value, + Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, + SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) { *seq = kMaxSequenceNumber; for (auto& memtable : *list) { SequenceNumber current_seq = kMaxSequenceNumber; bool done = memtable->Get(key, value, s, merge_context, range_del_agg, - ¤t_seq, read_opts); + ¤t_seq, read_opts, is_blob_index); if (*seq == kMaxSequenceNumber) { // Store the most recent sequence number of any operation on this key. // Since we only care about the most recent change, we only need to diff --git a/db/memtable_list.h b/db/memtable_list.h index 628ab544b..23b5bbe55 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -54,13 +54,15 @@ class MemTableListVersion { // returned). Otherwise, *seq will be set to kMaxSequenceNumber. bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - SequenceNumber* seq, const ReadOptions& read_opts); + SequenceNumber* seq, const ReadOptions& read_opts, + bool* is_blob_index = nullptr); bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts) { + const ReadOptions& read_opts, bool* is_blob_index = nullptr) { SequenceNumber seq; - return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); + return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts, + is_blob_index); } // Similar to Get(), but searches the Memtable history of memtables that @@ -117,7 +119,7 @@ class MemTableListVersion { bool GetFromList(std::list* list, const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, - const ReadOptions& read_opts); + const ReadOptions& read_opts, bool* is_blob_index = nullptr); void AddMemTable(MemTable* m); diff --git a/db/version_set.cc b/db/version_set.cc index 2ff425d20..782ebc263 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -965,7 +965,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, PinnableSlice* value, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found, - bool* key_exists, SequenceNumber* seq) { + bool* key_exists, SequenceNumber* seq, bool* is_blob) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); @@ -981,7 +981,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, value, value_found, merge_context, range_del_agg, this->env_, seq, - merge_operator_ ? &pinned_iters_mgr : nullptr); + merge_operator_ ? &pinned_iters_mgr : nullptr, is_blob); // Pin blocks that we read to hold merge operands if (merge_operator_) { @@ -1030,6 +1030,12 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, return; case GetContext::kMerge: break; + case GetContext::kBlobIndex: + ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); + *status = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + return; } f = fp.GetNextFile(); } diff --git a/db/version_set.h b/db/version_set.h index 9fb000c05..5862dea33 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -485,7 +485,8 @@ class Version { void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found = nullptr, - bool* key_exists = nullptr, SequenceNumber* seq = nullptr); + bool* key_exists = nullptr, SequenceNumber* seq = nullptr, + bool* is_blob = nullptr); // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. diff --git a/db/write_batch.cc b/db/write_batch.cc index 43639ac23..89ae044c8 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -67,6 +67,7 @@ enum ContentFlags : uint32_t { HAS_COMMIT = 1 << 7, HAS_ROLLBACK = 1 << 8, HAS_DELETE_RANGE = 1 << 9, + HAS_BLOB_INDEX = 1 << 10, }; struct BatchContentClassifier : public WriteBatch::Handler { @@ -97,6 +98,11 @@ struct BatchContentClassifier : public WriteBatch::Handler { return Status::OK(); } + Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_BLOB_INDEX; + return Status::OK(); + } + Status MarkBeginPrepare() override { content_flags |= ContentFlags::HAS_BEGIN_PREPARE; return Status::OK(); @@ -328,6 +334,17 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, return Status::Corruption("bad WriteBatch Merge"); } break; + case kTypeColumnFamilyBlobIndex: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch BlobIndex"); + } + // intentional fallthrough + case kTypeBlobIndex: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch BlobIndex"); + } + break; case kTypeLogData: assert(blob != nullptr); if (!GetLengthPrefixedSlice(input, blob)) { @@ -414,6 +431,13 @@ Status WriteBatch::Iterate(Handler* handler) const { s = handler->MergeCF(column_family, key, value); found++; break; + case kTypeColumnFamilyBlobIndex: + case kTypeBlobIndex: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX)); + s = handler->PutBlobIndexCF(column_family, key, value); + found++; + break; case kTypeLogData: handler->LogData(blob); break; @@ -759,6 +783,25 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family, value); } +Status WriteBatchInternal::PutBlobIndex(WriteBatch* b, + uint32_t column_family_id, + const Slice& key, const Slice& value) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeBlobIndex)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilyBlobIndex)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_BLOB_INDEX, + std::memory_order_relaxed); + return save.commit(); +} + Status WriteBatch::PutLogData(const Slice& blob) { LocalSavePoint save(this); rep_.push_back(static_cast(kTypeLogData)); @@ -935,8 +978,8 @@ public: return true; } - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { + Status PutCFImpl(uint32_t column_family_id, const Slice& key, + const Slice& value, ValueType value_type) { if (rebuilding_trx_ != nullptr) { WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); return Status::OK(); @@ -951,7 +994,7 @@ public: MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { - mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_, + mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_, get_post_process_info(mem)); } else if (moptions->inplace_callback == nullptr) { assert(!concurrent_memtable_writes_); @@ -986,11 +1029,11 @@ public: value, &merged_value); if (status == UpdateStatus::UPDATED_INPLACE) { // prev_value is updated in-place with final value. - mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); + mem->Add(sequence_, value_type, key, Slice(prev_buffer, prev_size)); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } else if (status == UpdateStatus::UPDATED) { // merged_value contains the final value. - mem->Add(sequence_, kTypeValue, key, Slice(merged_value)); + mem->Add(sequence_, value_type, key, Slice(merged_value)); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } } @@ -1003,6 +1046,11 @@ public: return Status::OK(); } + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + return PutCFImpl(column_family_id, key, value, kTypeValue); + } + Status DeleteImpl(uint32_t column_family_id, const Slice& key, const Slice& value, ValueType delete_type) { MemTable* mem = cf_mems_->GetMemTable(); @@ -1159,6 +1207,12 @@ public: return Status::OK(); } + virtual Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + // Same as PutCF except for value type. + return PutCFImpl(column_family_id, key, value, kTypeBlobIndex); + } + void CheckMemtableFull() { if (flush_scheduler_ != nullptr) { auto* cfd = cf_mems_->current(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 48a417ce8..2408686f1 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -99,6 +99,9 @@ class WriteBatchInternal { static Status Merge(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value); + static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid); static Status MarkRollback(WriteBatch* batch, const Slice& xid); diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 40d318e09..e132033db 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -206,6 +206,7 @@ class CompactionEventListener { kDelete, kSingleDelete, kRangeDelete, + kBlobIndex, kInvalid, }; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 8bd93d36c..336391ead 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -233,6 +233,12 @@ class WriteBatch : public WriteBatchBase { } virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {} + virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/, + const Slice& /*key*/, + const Slice& /*value*/) { + return Status::InvalidArgument("PutBlobIndexCF not implemented"); + } + // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); diff --git a/table/get_context.cc b/table/get_context.cc index 0d688fe46..258891ec4 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -33,14 +33,12 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { } // namespace -GetContext::GetContext(const Comparator* ucmp, - const MergeOperator* merge_operator, Logger* logger, - Statistics* statistics, GetState init_state, - const Slice& user_key, PinnableSlice* pinnable_val, - bool* value_found, MergeContext* merge_context, - RangeDelAggregator* _range_del_agg, Env* env, - SequenceNumber* seq, - PinnedIteratorsManager* _pinned_iters_mgr) +GetContext::GetContext( + const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, + Statistics* statistics, GetState init_state, const Slice& user_key, + PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, + RangeDelAggregator* _range_del_agg, Env* env, SequenceNumber* seq, + PinnedIteratorsManager* _pinned_iters_mgr, bool* is_blob_index) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -54,7 +52,8 @@ GetContext::GetContext(const Comparator* ucmp, env_(env), seq_(seq), replay_log_(nullptr), - pinned_iters_mgr_(_pinned_iters_mgr) { + pinned_iters_mgr_(_pinned_iters_mgr), + is_blob_index_(is_blob_index) { if (seq_) { *seq_ = kMaxSequenceNumber; } @@ -99,13 +98,19 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, auto type = parsed_key.type; // Key matches. Process it - if ((type == kTypeValue || type == kTypeMerge) && + if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && range_del_agg_ != nullptr && range_del_agg_->ShouldDelete(parsed_key)) { type = kTypeRangeDeletion; } switch (type) { case kTypeValue: + case kTypeBlobIndex: assert(state_ == kNotFound || state_ == kMerge); + if (type == kTypeBlobIndex && is_blob_index_ == nullptr) { + // Blob value not supported. Stop. + state_ = kBlobIndex; + return false; + } if (kNotFound == state_) { state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { @@ -131,6 +136,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } } } + if (is_blob_index_ != nullptr) { + *is_blob_index_ = (type == kTypeBlobIndex); + } return false; case kTypeDeletion: diff --git a/table/get_context.h b/table/get_context.h index ac50680b6..a708f6be7 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -22,7 +22,8 @@ class GetContext { kFound, kDeleted, kCorrupt, - kMerge // saver contains the current merge result (the operands) + kMerge, // saver contains the current merge result (the operands) + kBlobIndex, }; GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, @@ -30,7 +31,8 @@ class GetContext { const Slice& user_key, PinnableSlice* value, bool* value_found, MergeContext* merge_context, RangeDelAggregator* range_del_agg, Env* env, SequenceNumber* seq = nullptr, - PinnedIteratorsManager* _pinned_iters_mgr = nullptr); + PinnedIteratorsManager* _pinned_iters_mgr = nullptr, + bool* is_blob_index = nullptr); void MarkKeyMayExist(); @@ -83,6 +85,7 @@ class GetContext { // Used to temporarily pin blocks when state_ == GetContext::kMerge PinnedIteratorsManager* pinned_iters_mgr_; bool sample_; + bool* is_blob_index_; }; void replayGetContextLog(const Slice& replay_log, const Slice& user_key, diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 6e8c9af4b..3918941cf 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -806,7 +806,7 @@ TEST_F(BlobDBTest, ReadWhileGC) { TEST_F(BlobDBTest, ColumnFamilyNotSupported) { Options options; options.env = mock_env_.get(); - mock_env_->set_now_micros(0); + mock_env_->set_current_time(0); Open(BlobDBOptions(), options); ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily(); ColumnFamilyHandle *handle = nullptr;