From 6c7b1a0cc79463842d7b66e9a627fd25e9bfa9d7 Mon Sep 17 00:00:00 2001 From: anand76 Date: Tue, 12 Nov 2019 13:51:18 -0800 Subject: [PATCH] Batched MultiGet API for multiple column families (#5816) Summary: Add a new API that allows a user to call MultiGet specifying multiple keys belonging to different column families. This is mainly useful for users who want to do a consistent read of keys across column families, with the added performance benefits of batching and returning values using PinnableSlice. As part of this change, the code in the original multi-column family MultiGet for acquiring the super versions has been refactored into a separate function that can be used by both, the batching and the non-batching versions of MultiGet. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5816 Test Plan: make check make asan_check asan_crash_test Differential Revision: D18408676 Pulled By: anand1976 fbshipit-source-id: 933e7bec91dd70e7b633be4ff623a1116cc28c8d --- HISTORY.md | 1 + db/db_basic_test.cc | 86 ++- db/db_impl/db_impl.cc | 529 +++++++++++------- db/db_impl/db_impl.h | 87 ++- db/db_test_util.cc | 33 +- db/db_test_util.h | 3 +- include/rocksdb/db.h | 41 ++ table/multiget_context.h | 17 +- .../write_batch_with_index.cc | 10 +- 9 files changed, 568 insertions(+), 239 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 010e2635a..b5019d406 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ ### New Features * Universal compaction to support options.periodic_compaction_seconds. A full compaction will be triggered if any file is over the threshold. * `GetLiveFilesMetaData` and `GetColumnFamilyMetaData` now expose the file number of SST files as well as the oldest blob file referenced by each SST. +* A batched MultiGet API (DB::MultiGet()) that supports retrieving keys from multiple column families. ### Performance Improvements * For 64-bit hashing, RocksDB is standardizing on a slightly modified preview version of XXH3. This function is now used for many non-persisted hashes, along with fastrange64() in place of the modulus operator, and some benchmarks show a slight improvement. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index c316e3dc8..84111aec4 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1019,15 +1019,27 @@ TEST_F(DBBasicTest, DBCloseFlushError) { Destroy(options); } -TEST_F(DBBasicTest, MultiGetMultiCF) { +class DBMultiGetTestWithParam : public DBBasicTest, + public testing::WithParamInterface {}; + +TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, options); + // tuples + std::vector> cf_kv_vec; + static const int num_keys = 24; + cf_kv_vec.reserve(num_keys); - for (int i = 0; i < 8; ++i) { - ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", - "cf" + std::to_string(i) + "_val")); + for (int i = 0; i < num_keys; ++i) { + int cf = i / 3; + int cf_key = 1 % 3; + cf_kv_vec.emplace_back(std::make_tuple( + cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key), + "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key))); + ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]), + std::get<2>(cf_kv_vec[i]))); } int get_sv_count = 0; @@ -1037,10 +1049,14 @@ TEST_F(DBBasicTest, MultiGetMultiCF) { if (++get_sv_count == 2) { // After MultiGet refs a couple of CFs, flush all CFs so MultiGet // is forced to repeat the process - for (int i = 0; i < 8; ++i) { - ASSERT_OK(Flush(i)); - ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", - "cf" + std::to_string(i) + "_val2")); + for (int i = 0; i < num_keys; ++i) { + int cf = i / 3; + int cf_key = i % 8; + if (cf_key == 0) { + ASSERT_OK(Flush(cf)); + } + ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]), + std::get<2>(cf_kv_vec[i]) + "_2")); } } if (get_sv_count == 11) { @@ -1058,26 +1074,53 @@ TEST_F(DBBasicTest, MultiGetMultiCF) { std::vector keys; std::vector values; - for (int i = 0; i < 8; ++i) { - cfs.push_back(i); - keys.push_back("cf" + std::to_string(i) + "_key"); + for (int i = 0; i < num_keys; ++i) { + cfs.push_back(std::get<0>(cf_kv_vec[i])); + keys.push_back(std::get<1>(cf_kv_vec[i])); } - values = MultiGet(cfs, keys, nullptr); - ASSERT_EQ(values.size(), 8); + values = MultiGet(cfs, keys, nullptr, GetParam()); + ASSERT_EQ(values.size(), num_keys); for (unsigned int j = 0; j < values.size(); ++j) { - ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val2"); + ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2"); } - for (int i = 0; i < 8; ++i) { + + keys.clear(); + cfs.clear(); + cfs.push_back(std::get<0>(cf_kv_vec[0])); + keys.push_back(std::get<1>(cf_kv_vec[0])); + cfs.push_back(std::get<0>(cf_kv_vec[3])); + keys.push_back(std::get<1>(cf_kv_vec[3])); + cfs.push_back(std::get<0>(cf_kv_vec[4])); + keys.push_back(std::get<1>(cf_kv_vec[4])); + values = MultiGet(cfs, keys, nullptr, GetParam()); + ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2"); + ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2"); + ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2"); + + keys.clear(); + cfs.clear(); + cfs.push_back(std::get<0>(cf_kv_vec[7])); + keys.push_back(std::get<1>(cf_kv_vec[7])); + cfs.push_back(std::get<0>(cf_kv_vec[6])); + keys.push_back(std::get<1>(cf_kv_vec[6])); + cfs.push_back(std::get<0>(cf_kv_vec[1])); + keys.push_back(std::get<1>(cf_kv_vec[1])); + values = MultiGet(cfs, keys, nullptr, GetParam()); + ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2"); + ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2"); + ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2"); + + for (int cf = 0; cf < 8; ++cf) { auto* cfd = reinterpret_cast( - reinterpret_cast(db_)->GetColumnFamilyHandle(i)) + reinterpret_cast(db_)->GetColumnFamilyHandle(cf)) ->cfd(); ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); } } -TEST_F(DBBasicTest, MultiGetMultiCFMutex) { +TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, @@ -1123,7 +1166,7 @@ TEST_F(DBBasicTest, MultiGetMultiCFMutex) { keys.push_back("cf" + std::to_string(i) + "_key"); } - values = MultiGet(cfs, keys, nullptr); + values = MultiGet(cfs, keys, nullptr, GetParam()); ASSERT_TRUE(last_try); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { @@ -1138,7 +1181,7 @@ TEST_F(DBBasicTest, MultiGetMultiCFMutex) { } } -TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) { +TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, @@ -1183,7 +1226,7 @@ TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) { } const Snapshot* snapshot = db_->GetSnapshot(); - values = MultiGet(cfs, keys, snapshot); + values = MultiGet(cfs, keys, snapshot, GetParam()); db_->ReleaseSnapshot(snapshot); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { @@ -1197,6 +1240,9 @@ TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) { } } +INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam, + testing::Bool()); + TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 14d9523f7..17b314978 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1645,14 +1645,9 @@ std::vector DBImpl::MultiGet( StopWatch sw(env_, stats_, DB_MULTIGET); PERF_TIMER_GUARD(get_snapshot_time); - SequenceNumber snapshot; + SequenceNumber consistent_seqnum; + ; - struct MultiGetColumnFamilyData { - ColumnFamilyData* cfd; - SuperVersion* super_version; - MultiGetColumnFamilyData(ColumnFamilyData* cf, SuperVersion* sv) - : cfd(cf), super_version(sv) {} - }; std::unordered_map multiget_cf_data( column_family.size()); for (auto cf : column_family) { @@ -1660,86 +1655,20 @@ std::vector DBImpl::MultiGet( auto cfd = cfh->cfd(); if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) { multiget_cf_data.emplace(cfd->GetID(), - MultiGetColumnFamilyData(cfd, nullptr)); + MultiGetColumnFamilyData(cfh, nullptr)); } } - bool last_try = false; - { - // If we end up with the same issue of memtable geting sealed during 2 - // consecutive retries, it means the write rate is very high. In that case - // its probably ok to take the mutex on the 3rd try so we can succeed for - // sure - static const int num_retries = 3; - for (auto i = 0; i < num_retries; ++i) { - last_try = (i == num_retries - 1); - bool retry = false; + std::function::iterator&)> + iter_deref_lambda = + [](std::unordered_map::iterator& + cf_iter) { return &cf_iter->second; }; - if (i > 0) { - for (auto mgd_iter = multiget_cf_data.begin(); - mgd_iter != multiget_cf_data.end(); ++mgd_iter) { - auto super_version = mgd_iter->second.super_version; - auto cfd = mgd_iter->second.cfd; - if (super_version != nullptr) { - ReturnAndCleanupSuperVersion(cfd, super_version); - } - mgd_iter->second.super_version = nullptr; - } - } - - if (read_options.snapshot == nullptr) { - if (last_try) { - TEST_SYNC_POINT("DBImpl::MultiGet::LastTry"); - // We're close to max number of retries. For the last retry, - // acquire the lock so we're sure to succeed - mutex_.Lock(); - } - snapshot = last_seq_same_as_publish_seq_ - ? versions_->LastSequence() - : versions_->LastPublishedSequence(); - } else { - snapshot = reinterpret_cast(read_options.snapshot) - ->number_; - } - - for (auto mgd_iter = multiget_cf_data.begin(); - mgd_iter != multiget_cf_data.end(); ++mgd_iter) { - if (!last_try) { - mgd_iter->second.super_version = - GetAndRefSuperVersion(mgd_iter->second.cfd); - } else { - mgd_iter->second.super_version = - mgd_iter->second.cfd->GetSuperVersion()->Ref(); - } - TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV"); - if (read_options.snapshot != nullptr || last_try) { - // If user passed a snapshot, then we don't care if a memtable is - // sealed or compaction happens because the snapshot would ensure - // that older key versions are kept around. If this is the last - // retry, then we have the lock so nothing bad can happen - continue; - } - // We could get the earliest sequence number for the whole list of - // memtables, which will include immutable memtables as well, but that - // might be tricky to maintain in case we decide, in future, to do - // memtable compaction. - if (!last_try) { - auto seq = - mgd_iter->second.super_version->mem->GetEarliestSequenceNumber(); - if (seq > snapshot) { - retry = true; - break; - } - } - } - if (!retry) { - if (last_try) { - mutex_.Unlock(); - } - break; - } - } - } + bool unref_only = + MultiCFSnapshot>( + read_options, nullptr, iter_deref_lambda, &multiget_cf_data, + &consistent_seqnum); // Contain a list of merge operations if merge occurs. MergeContext merge_context; @@ -1763,7 +1692,7 @@ std::vector DBImpl::MultiGet( Status& s = stat_list[i]; std::string* value = &(*values)[i]; - LookupKey lkey(keys[i], snapshot); + LookupKey lkey(keys[i], consistent_seqnum); auto cfh = reinterpret_cast(column_family[i]); SequenceNumber max_covering_tombstone_seq = 0; auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); @@ -1807,7 +1736,7 @@ std::vector DBImpl::MultiGet( for (auto mgd_iter : multiget_cf_data) { auto mgd = mgd_iter.second; - if (!last_try) { + if (!unref_only) { ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version); } else { mgd.cfd->GetSuperVersion()->Unref(); @@ -1824,125 +1753,330 @@ std::vector DBImpl::MultiGet( return stat_list; } -// Order keys by CF ID, followed by key contents -struct CompareKeyContext { - inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) { - const Comparator* comparator = cfd->user_comparator(); - int cmp = comparator->Compare(*(lhs->key), *(rhs->key)); - if (cmp < 0) { - return true; - } - return false; - } - const ColumnFamilyData* cfd; -}; - -void DBImpl::MultiGet(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, const size_t num_keys, - const Slice* keys, PinnableSlice* values, - Status* statuses, const bool sorted_input) { - autovector key_context; - for (size_t i = 0; i < num_keys; ++i) { - key_context.emplace_back(keys[i], &values[i], &statuses[i]); - } - - MultiGetImpl(read_options, column_family, key_context, sorted_input, nullptr, - nullptr); -} - -void DBImpl::MultiGetImpl( - const ReadOptions& read_options, ColumnFamilyHandle* column_family, - autovector& key_context, - bool sorted_input, ReadCallback* callback, bool* is_blob_index) { - PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); - StopWatch sw(env_, stats_, DB_MULTIGET); - size_t num_keys = key_context.size(); - +template +bool DBImpl::MultiCFSnapshot( + const ReadOptions& read_options, ReadCallback* callback, + std::function& + iter_deref_func, + T* cf_list, SequenceNumber* snapshot) { PERF_TIMER_GUARD(get_snapshot_time); - ColumnFamilyHandleImpl* cfh = - reinterpret_cast(column_family); - ColumnFamilyData* cfd = cfh->cfd(); - - autovector sorted_keys; - sorted_keys.resize(num_keys); - { - size_t index = 0; - for (KeyContext& key : key_context) { -#ifndef NDEBUG - if (index > 0 && sorted_input) { - KeyContext* lhs = &key_context[index-1]; - KeyContext* rhs = &key_context[index]; - const Comparator* comparator = cfd->user_comparator(); - int cmp = comparator->Compare(*(lhs->key), *(rhs->key)); - assert(cmp <= 0); + bool last_try = false; + if (cf_list->size() == 1) { + // Fast path for a single column family. We can simply get the thread loca + // super version + auto cf_iter = cf_list->begin(); + auto node = iter_deref_func(cf_iter); + node->super_version = GetAndRefSuperVersion(node->cfd); + if (read_options.snapshot != nullptr) { + // Note: In WritePrepared txns this is not necessary but not harmful + // either. Because prep_seq > snapshot => commit_seq > snapshot so if + // a snapshot is specified we should be fine with skipping seq numbers + // that are greater than that. + // + // In WriteUnprepared, we cannot set snapshot in the lookup key because we + // may skip uncommitted data that should be visible to the transaction for + // reading own writes. + *snapshot = + static_cast(read_options.snapshot)->number_; + if (callback) { + *snapshot = std::max(*snapshot, callback->max_visible_seq()); } -#endif - - sorted_keys[index] = &key; - index++; + } else { + // Since we get and reference the super version before getting + // the snapshot number, without a mutex protection, it is possible + // that a memtable switch happened in the middle and not all the + // data for this snapshot is available. But it will contain all + // the data available in the super version we have, which is also + // a valid snapshot to read from. + // We shouldn't get snapshot before finding and referencing the super + // version because a flush happening in between may compact away data for + // the snapshot, but the snapshot is earlier than the data overwriting it, + // so users may see wrong results. + *snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); } - if (!sorted_input) { - CompareKeyContext sort_comparator; - sort_comparator.cfd = cfd; - std::sort(sorted_keys.begin(), sorted_keys.begin() + index, - sort_comparator); + } else { + // If we end up with the same issue of memtable geting sealed during 2 + // consecutive retries, it means the write rate is very high. In that case + // its probably ok to take the mutex on the 3rd try so we can succeed for + // sure + static const int num_retries = 3; + for (int i = 0; i < num_retries; ++i) { + last_try = (i == num_retries - 1); + bool retry = false; + + if (i > 0) { + for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end(); + ++cf_iter) { + auto node = iter_deref_func(cf_iter); + SuperVersion* super_version = node->super_version; + ColumnFamilyData* cfd = node->cfd; + if (super_version != nullptr) { + ReturnAndCleanupSuperVersion(cfd, super_version); + } + node->super_version = nullptr; + } + } + if (read_options.snapshot == nullptr) { + if (last_try) { + TEST_SYNC_POINT("DBImpl::MultiGet::LastTry"); + // We're close to max number of retries. For the last retry, + // acquire the lock so we're sure to succeed + mutex_.Lock(); + } + *snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); + } else { + *snapshot = reinterpret_cast(read_options.snapshot) + ->number_; + } + for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end(); + ++cf_iter) { + auto node = iter_deref_func(cf_iter); + if (!last_try) { + node->super_version = GetAndRefSuperVersion(node->cfd); + } else { + node->super_version = node->cfd->GetSuperVersion()->Ref(); + } + TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV"); + if (read_options.snapshot != nullptr || last_try) { + // If user passed a snapshot, then we don't care if a memtable is + // sealed or compaction happens because the snapshot would ensure + // that older key versions are kept around. If this is the last + // retry, then we have the lock so nothing bad can happen + continue; + } + // We could get the earliest sequence number for the whole list of + // memtables, which will include immutable memtables as well, but that + // might be tricky to maintain in case we decide, in future, to do + // memtable compaction. + if (!last_try) { + SequenceNumber seq = + node->super_version->mem->GetEarliestSequenceNumber(); + if (seq > *snapshot) { + retry = true; + break; + } + } + } + if (!retry) { + if (last_try) { + mutex_.Unlock(); + } + break; + } } } // Keep track of bytes that we read for statistics-recording later PERF_TIMER_STOP(get_snapshot_time); - // Acquire SuperVersion - SuperVersion* super_version = GetAndRefSuperVersion(cfd); - SequenceNumber snapshot; - if (read_options.snapshot != nullptr) { - // Note: In WritePrepared txns this is not necessary but not harmful - // either. Because prep_seq > snapshot => commit_seq > snapshot so if - // a snapshot is specified we should be fine with skipping seq numbers - // that are greater than that. - // - // In WriteUnprepared, we cannot set snapshot in the lookup key because we - // may skip uncommitted data that should be visible to the transaction for - // reading own writes. - snapshot = - reinterpret_cast(read_options.snapshot)->number_; - if (callback) { - snapshot = std::max(snapshot, callback->max_visible_seq()); - } - } else { - // Since we get and reference the super version before getting - // the snapshot number, without a mutex protection, it is possible - // that a memtable switch happened in the middle and not all the - // data for this snapshot is available. But it will contain all - // the data available in the super version we have, which is also - // a valid snapshot to read from. - // We shouldn't get snapshot before finding and referencing the super - // version because a flush happening in between may compact away data for - // the snapshot, but the snapshot is earlier than the data overwriting it, - // so users may see wrong results. - snapshot = last_seq_same_as_publish_seq_ - ? versions_->LastSequence() - : versions_->LastPublishedSequence(); - if (callback) { - // The unprep_seqs are not published for write unprepared, so it could be - // that max_visible_seq is larger. Seek to the std::max of the two. - // However, we still want our callback to contain the actual snapshot so - // that it can do the correct visibility filtering. - callback->Refresh(snapshot); + return last_try; +} - // Internally, WriteUnpreparedTxnReadCallback::Refresh would set - // max_visible_seq = max(max_visible_seq, snapshot) - // - // Currently, the commented out assert is broken by - // InvalidSnapshotReadCallback, but if write unprepared recovery followed - // the regular transaction flow, then this special read callback would not - // be needed. - // - // assert(callback->max_visible_seq() >= snapshot); - snapshot = callback->max_visible_seq(); +void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool sorted_input) { + autovector key_context; + autovector sorted_keys; + sorted_keys.resize(num_keys); + for (size_t i = 0; i < num_keys; ++i) { + key_context.emplace_back(column_families[i], keys[i], &values[i], + &statuses[i]); + } + for (size_t i = 0; i < num_keys; ++i) { + sorted_keys[i] = &key_context[i]; + } + PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys); + + autovector + multiget_cf_data; + size_t cf_start = 0; + ColumnFamilyHandle* cf = sorted_keys[0]->column_family; + for (size_t i = 0; i < num_keys; ++i) { + KeyContext* key_ctx = sorted_keys[i]; + if (key_ctx->column_family != cf) { + multiget_cf_data.emplace_back( + MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr)); + cf_start = i; + cf = key_ctx->column_family; } } + { + // multiget_cf_data.emplace_back( + // MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr)); + multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr); + } + std::function::iterator&)> + iter_deref_lambda = + [](autovector::iterator& cf_iter) { + return &(*cf_iter); + }; + + SequenceNumber consistent_seqnum; + bool unref_only = MultiCFSnapshot< + autovector>( + read_options, nullptr, iter_deref_lambda, &multiget_cf_data, + &consistent_seqnum); + + for (auto cf_iter = multiget_cf_data.begin(); + cf_iter != multiget_cf_data.end(); ++cf_iter) { + MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys, + cf_iter->super_version, consistent_seqnum, nullptr, nullptr); + if (!unref_only) { + ReturnAndCleanupSuperVersion(cf_iter->cfd, cf_iter->super_version); + } else { + cf_iter->cfd->GetSuperVersion()->Unref(); + } + } +} + +namespace { +// Order keys by CF ID, followed by key contents +struct CompareKeyContext { + inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) { + ColumnFamilyHandleImpl* cfh = + static_cast(lhs->column_family); + uint32_t cfd_id1 = cfh->cfd()->GetID(); + const Comparator* comparator = cfh->cfd()->user_comparator(); + cfh = static_cast(lhs->column_family); + uint32_t cfd_id2 = cfh->cfd()->GetID(); + + if (cfd_id1 < cfd_id2) { + return true; + } else if (cfd_id1 > cfd_id2) { + return false; + } + + // Both keys are from the same column family + int cmp = comparator->Compare(*(lhs->key), *(rhs->key)); + if (cmp < 0) { + return true; + } + return false; + } +}; + +} // anonymous namespace + +void DBImpl::PrepareMultiGetKeys( + size_t num_keys, bool sorted_input, + autovector* sorted_keys) { +#ifndef NDEBUG + if (sorted_input) { + for (size_t index = 0; index < sorted_keys->size(); ++index) { + if (index > 0) { + KeyContext* lhs = (*sorted_keys)[index - 1]; + KeyContext* rhs = (*sorted_keys)[index]; + ColumnFamilyHandleImpl* cfh = + reinterpret_cast(lhs->column_family); + uint32_t cfd_id1 = cfh->cfd()->GetID(); + const Comparator* comparator = cfh->cfd()->user_comparator(); + cfh = reinterpret_cast(lhs->column_family); + uint32_t cfd_id2 = cfh->cfd()->GetID(); + + assert(cfd_id1 <= cfd_id2); + if (cfd_id1 < cfd_id2) { + continue; + } + + // Both keys are from the same column family + int cmp = comparator->Compare(*(lhs->key), *(rhs->key)); + assert(cmp <= 0); + } + index++; + } + } +#endif + if (!sorted_input) { + CompareKeyContext sort_comparator; + std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys, + sort_comparator); + } +} + +void DBImpl::MultiGet(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const size_t num_keys, + const Slice* keys, PinnableSlice* values, + Status* statuses, const bool sorted_input) { + autovector key_context; + autovector sorted_keys; + sorted_keys.resize(num_keys); + for (size_t i = 0; i < num_keys; ++i) { + key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]); + } + for (size_t i = 0; i < num_keys; ++i) { + sorted_keys[i] = &key_context[i]; + } + PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys); + MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys); +} + +void DBImpl::MultiGetWithCallback( + const ReadOptions& read_options, ColumnFamilyHandle* column_family, + ReadCallback* callback, + autovector* sorted_keys) { + std::array multiget_cf_data; + multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr); + std::function::iterator&)> + iter_deref_lambda = + [](std::array::iterator& cf_iter) { + return &(*cf_iter); + }; + + size_t num_keys = sorted_keys->size(); + SequenceNumber consistent_seqnum; + bool unref_only = MultiCFSnapshot>( + read_options, callback, iter_deref_lambda, &multiget_cf_data, + &consistent_seqnum); +#ifndef NDEBUG + assert(!unref_only); +#else + // Silence unused variable warning + (void)unref_only; +#endif // NDEBUG + + if (callback && read_options.snapshot == nullptr) { + // The unprep_seqs are not published for write unprepared, so it could be + // that max_visible_seq is larger. Seek to the std::max of the two. + // However, we still want our callback to contain the actual snapshot so + // that it can do the correct visibility filtering. + callback->Refresh(consistent_seqnum); + + // Internally, WriteUnpreparedTxnReadCallback::Refresh would set + // max_visible_seq = max(max_visible_seq, snapshot) + // + // Currently, the commented out assert is broken by + // InvalidSnapshotReadCallback, but if write unprepared recovery followed + // the regular transaction flow, then this special read callback would not + // be needed. + // + // assert(callback->max_visible_seq() >= snapshot); + consistent_seqnum = callback->max_visible_seq(); + } + + MultiGetImpl(read_options, 0, num_keys, sorted_keys, + multiget_cf_data[0].super_version, consistent_seqnum, nullptr, + nullptr); + ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd, + multiget_cf_data[0].super_version); +} + +void DBImpl::MultiGetImpl( + const ReadOptions& read_options, size_t start_key, size_t num_keys, + autovector* sorted_keys, + SuperVersion* super_version, SequenceNumber snapshot, + ReadCallback* callback, bool* is_blob_index) { + PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); + StopWatch sw(env_, stats_, DB_MULTIGET); // For each of the given keys, apply the entire "get" process as follows: // First look in the memtable, then in the immutable memtable (if any). @@ -1953,8 +2087,8 @@ void DBImpl::MultiGetImpl( size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE) ? MultiGetContext::MAX_BATCH_SIZE : keys_left; - MultiGetContext ctx(&sorted_keys[num_keys - keys_left], batch_size, - snapshot); + MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left, + batch_size, snapshot); MultiGetRange range = ctx.GetMultiGetRange(); bool lookup_current = false; @@ -1992,15 +2126,14 @@ void DBImpl::MultiGetImpl( PERF_TIMER_GUARD(get_post_process_time); size_t num_found = 0; uint64_t bytes_read = 0; - for (KeyContext& key : key_context) { - if (key.s->ok()) { - bytes_read += key.value->size(); + for (size_t i = start_key; i < start_key + num_keys; ++i) { + KeyContext* key = (*sorted_keys)[i]; + if (key->s->ok()) { + bytes_read += key->value->size(); num_found++; } } - ReturnAndCleanupSuperVersion(cfd, super_version); - RecordTick(stats_, NUMBER_MULTIGET_CALLS); RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 8f1b6e88a..fe97e08be 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -199,11 +199,15 @@ class DBImpl : public DB { PinnableSlice* values, Status* statuses, const bool sorted_input = false) override; - void MultiGetImpl( + virtual void MultiGet(const ReadOptions& options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool sorted_input = false) override; + + virtual void MultiGetWithCallback( const ReadOptions& options, ColumnFamilyHandle* column_family, - autovector& key_context, - bool sorted_input, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr); + ReadCallback* callback, + autovector* sorted_keys); virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family, @@ -1640,6 +1644,81 @@ class DBImpl : public DB { const DBOptions& db_options, const std::vector& column_families); + // Utility function to do some debug validation and sort the given vector + // of MultiGet keys + void PrepareMultiGetKeys( + const size_t num_keys, bool sorted, + autovector* key_ptrs); + + // A structure to hold the information required to process MultiGet of keys + // belonging to one column family. For a multi column family MultiGet, there + // will be a container of these objects. + struct MultiGetColumnFamilyData { + ColumnFamilyHandle* cf; + ColumnFamilyData* cfd; + + // For the batched MultiGet which relies on sorted keys, start specifies + // the index of first key belonging to this column family in the sorted + // list. + size_t start; + + // For the batched MultiGet case, num_keys specifies the number of keys + // belonging to this column family in the sorted list + size_t num_keys; + + // SuperVersion for the column family obtained in a manner that ensures a + // consistent view across all column families in the DB + SuperVersion* super_version; + MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, + SuperVersion* sv) + : cf(column_family), + cfd(static_cast(cf)->cfd()), + start(0), + num_keys(0), + super_version(sv) {} + + MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, size_t first, + size_t count, SuperVersion* sv) + : cf(column_family), + cfd(static_cast(cf)->cfd()), + start(first), + num_keys(count), + super_version(sv) {} + + MultiGetColumnFamilyData() = default; + }; + + // A common function to obtain a consistent snapshot, which can be implicit + // if the user doesn't specify a snapshot in read_options, across + // multiple column families for MultiGet. It will attempt to get an implicit + // snapshot without acquiring the db_mutes, but will give up after a few + // tries and acquire the mutex if a memtable flush happens. The template + // allows both the batched and non-batched MultiGet to call this with + // either an std::unordered_map or autovector of column families. + // + // If callback is non-null, the callback is refreshed with the snapshot + // sequence number + // + // A return value of true indicates that the SuperVersions were obtained + // from the ColumnFamilyData, whereas false indicates they are thread + // local + template + bool MultiCFSnapshot( + const ReadOptions& read_options, ReadCallback* callback, + std::function& + iter_deref_func, + T* cf_list, SequenceNumber* snapshot); + + // The actual implementation of the batching MultiGet. The caller is expected + // to have acquired the SuperVersion and pass in a snapshot sequence number + // in order to construct the LookupKeys. The start_key and num_keys specify + // the range of keys in the sorted_keys vector for a single column family. + void MultiGetImpl( + const ReadOptions& read_options, size_t start_key, size_t num_keys, + autovector* sorted_keys, + SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback, + bool* is_blob_index); + // table_cache_ provides its own synchronization std::shared_ptr table_cache_; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 41e4c5105..88f57275f 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -777,7 +777,8 @@ std::string DBTestBase::Get(int cf, const std::string& k, std::vector DBTestBase::MultiGet(std::vector cfs, const std::vector& k, - const Snapshot* snapshot) { + const Snapshot* snapshot, + const bool batched) { ReadOptions options; options.verify_checksums = true; options.snapshot = snapshot; @@ -789,12 +790,30 @@ std::vector DBTestBase::MultiGet(std::vector cfs, handles.push_back(handles_[cfs[i]]); keys.push_back(k[i]); } - std::vector s = db_->MultiGet(options, handles, keys, &result); - for (unsigned int i = 0; i < s.size(); ++i) { - if (s[i].IsNotFound()) { - result[i] = "NOT_FOUND"; - } else if (!s[i].ok()) { - result[i] = s[i].ToString(); + std::vector s; + if (!batched) { + s = db_->MultiGet(options, handles, keys, &result); + for (unsigned int i = 0; i < s.size(); ++i) { + if (s[i].IsNotFound()) { + result[i] = "NOT_FOUND"; + } else if (!s[i].ok()) { + result[i] = s[i].ToString(); + } + } + } else { + std::vector pin_values(cfs.size()); + result.resize(cfs.size()); + s.resize(cfs.size()); + db_->MultiGet(options, cfs.size(), handles.data(), keys.data(), + pin_values.data(), s.data()); + for (unsigned int i = 0; i < s.size(); ++i) { + if (s[i].IsNotFound()) { + result[i] = "NOT_FOUND"; + } else if (!s[i].ok()) { + result[i] = s[i].ToString(); + } else { + result[i].assign(pin_values[i].data(), pin_values[i].size()); + } } } return result; diff --git a/db/db_test_util.h b/db/db_test_util.h index 667371487..f8a01ad15 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -850,7 +850,8 @@ class DBTestBase : public testing::Test { std::vector MultiGet(std::vector cfs, const std::vector& k, - const Snapshot* snapshot = nullptr); + const Snapshot* snapshot, + const bool batched); std::vector MultiGet(const std::vector& k, const Snapshot* snapshot = nullptr); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index e73ae9c20..bc93aeda1 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -490,6 +490,47 @@ class DB { values++; } } + + // Overloaded MultiGet API that improves performance by batching operations + // in the read path for greater efficiency. Currently, only the block based + // table format with full filters are supported. Other table formats such + // as plain table, block based table with block based filters and + // partitioned indexes will still work, but will not get any performance + // benefits. + // Parameters - + // options - ReadOptions + // column_family - ColumnFamilyHandle* that the keys belong to. All the keys + // passed to the API are restricted to a single column family + // num_keys - Number of keys to lookup + // keys - Pointer to C style array of key Slices with num_keys elements + // values - Pointer to C style array of PinnableSlices with num_keys elements + // statuses - Pointer to C style array of Status with num_keys elements + // sorted_input - If true, it means the input keys are already sorted by key + // order, so the MultiGet() API doesn't have to sort them + // again. If false, the keys will be copied and sorted + // internally by the API - the input array will not be + // modified + virtual void MultiGet(const ReadOptions& options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool /*sorted_input*/ = false) { + std::vector cf; + std::vector user_keys; + std::vector status; + std::vector vals; + + for (size_t i = 0; i < num_keys; ++i) { + cf.emplace_back(column_families[i]); + user_keys.emplace_back(keys[i]); + } + status = MultiGet(options, cf, user_keys, &vals); + std::copy(status.begin(), status.end(), statuses); + for (auto& value : vals) { + values->PinSelf(value); + values++; + } + } + // If the key definitely does not exist in the database, then this method // returns false, else true. If the caller wants to obtain value when the key // is found in memory, a bool for 'value_found' must be passed. 'value_found' diff --git a/table/multiget_context.h b/table/multiget_context.h index fe6bbc3bf..8b5b607b3 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -5,6 +5,7 @@ #pragma once #include +#include #include #include "db/lookup_key.h" #include "db/merge_context.h" @@ -21,6 +22,7 @@ struct KeyContext { LookupKey* lkey; Slice ukey; Slice ikey; + ColumnFamilyHandle* column_family; Status* s; MergeContext merge_context; SequenceNumber max_covering_tombstone_seq; @@ -29,9 +31,11 @@ struct KeyContext { PinnableSlice* value; GetContext* get_context; - KeyContext(const Slice& user_key, PinnableSlice* val, Status* stat) + KeyContext(ColumnFamilyHandle* col_family, const Slice& user_key, + PinnableSlice* val, Status* stat) : key(&user_key), lkey(nullptr), + column_family(col_family), s(stat), max_covering_tombstone_seq(0), key_exists(false), @@ -85,10 +89,9 @@ class MultiGetContext { // htat need to be performed static const int MAX_BATCH_SIZE = 32; - MultiGetContext(KeyContext** sorted_keys, size_t num_keys, - SequenceNumber snapshot) - : sorted_keys_(sorted_keys), - num_keys_(num_keys), + MultiGetContext(autovector* sorted_keys, + size_t begin, size_t num_keys, SequenceNumber snapshot) + : num_keys_(num_keys), value_mask_(0), lookup_key_ptr_(reinterpret_cast(lookup_key_stack_buf)) { int index = 0; @@ -100,6 +103,8 @@ class MultiGetContext { } for (size_t iter = 0; iter != num_keys_; ++iter) { + // autovector may not be contiguous storage, so make a copy + sorted_keys_[iter] = (*sorted_keys)[begin + iter]; sorted_keys_[iter]->lkey = new (&lookup_key_ptr_[index]) LookupKey(*sorted_keys_[iter]->key, snapshot); sorted_keys_[iter]->ukey = sorted_keys_[iter]->lkey->user_key(); @@ -118,7 +123,7 @@ class MultiGetContext { static const int MAX_LOOKUP_KEYS_ON_STACK = 16; alignas(alignof(LookupKey)) char lookup_key_stack_buf[sizeof(LookupKey) * MAX_LOOKUP_KEYS_ON_STACK]; - KeyContext** sorted_keys_; + std::array sorted_keys_; size_t num_keys_; uint64_t value_mask_; std::unique_ptr lookup_key_heap_buf; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 4cc6b9bca..fe99e43e6 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -963,6 +963,7 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( ->immutable_db_options(); autovector key_context; + autovector sorted_keys; // To hold merges from the write batch autovector, MultiGetContext::MAX_BATCH_SIZE> @@ -1002,14 +1003,17 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || result == WriteBatchWithIndexInternal::Result::kNotFound); - key_context.emplace_back(keys[i], &values[i], &statuses[i]); + key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]); + sorted_keys.emplace_back(&key_context.back()); merges.emplace_back(result, std::move(merge_context)); } // Did not find key in batch OR could not resolve Merges. Try DB. static_cast_with_check(db->GetRootDB()) - ->MultiGetImpl(read_options, column_family, key_context, sorted_input, - callback); + ->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys); + static_cast_with_check(db->GetRootDB()) + ->MultiGetWithCallback(read_options, column_family, callback, + &sorted_keys); ColumnFamilyHandleImpl* cfh = reinterpret_cast(column_family);