Lock free MultiGet (#4754)
Summary: Avoid locking the DB mutex in order to reference SuperVersions. Instead, we get the thread local cached SuperVersion for each column family in the list. It depends on finding a sequence number that overlaps with all the open memtables. We start with the latest published sequence number, and if any of the memtables is sealed before we can get all the SuperVersions, the process is repeated. After a few times, give up and lock the DB mutex. Tests: 1. Unit tests 2. make check 3. db_bench - TEST_TMPDIR=/dev/shm ./db_bench -use_existing_db=true -benchmarks=readrandom -write_buffer_size=4194304 -target_file_size_base=4194304 -max_bytes_for_level_base=16777216 -num=5000000 -reads=1000000 -threads=32 -compression_type=none -cache_size=1048576000 -batch_size=1 -bloom_bits=1 readrandom : 0.167 micros/op 5983920 ops/sec; 426.2 MB/s (1000000 of 1000000 found) Multireadrandom with batch size 1: multireadrandom : 0.176 micros/op 5684033 ops/sec; (1000000 of 1000000 found) Pull Request resolved: https://github.com/facebook/rocksdb/pull/4754 Differential Revision: D13363550 Pulled By: anand1976 fbshipit-source-id: 6243e8de7dbd9c8bb490a8eca385da0c855b1dd4
This commit is contained in:
parent
7d65bd5ce4
commit
b9d6eccac1
@ -12,6 +12,7 @@
|
||||
* Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls.
|
||||
* Fix a memory leak when files with range tombstones are read in mmap mode and block cache is enabled
|
||||
* Fix handling of corrupt range tombstone blocks such that corruptions cannot cause deleted keys to reappear
|
||||
* Lock free MultiGet
|
||||
|
||||
## 5.18.0 (11/30/2018)
|
||||
### New Features
|
||||
|
@ -383,6 +383,8 @@ class ColumnFamilyData {
|
||||
|
||||
Directory* GetDataDir(size_t path_id) const;
|
||||
|
||||
ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
|
||||
|
||||
private:
|
||||
friend class ColumnFamilySet;
|
||||
ColumnFamilyData(uint32_t id, const std::string& name,
|
||||
|
@ -6,6 +6,7 @@
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
// #include <iostream>
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/perf_context.h"
|
||||
@ -956,6 +957,184 @@ TEST_F(DBBasicTest, DBCloseFlushError) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, MultiGetMultiCF) {
|
||||
Options options = CurrentOptions();
|
||||
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
|
||||
"alyosha", "popovich"},
|
||||
options);
|
||||
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
|
||||
"cf" + std::to_string(i) + "_val"));
|
||||
}
|
||||
|
||||
int get_sv_count = 0;
|
||||
rocksdb::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
|
||||
if (++get_sv_count == 2) {
|
||||
// After MultiGet refs a couple of CFs, flush all CFs so MultiGet
|
||||
// is forced to repeat the process
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
ASSERT_OK(Flush(i));
|
||||
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
|
||||
"cf" + std::to_string(i) + "_val2"));
|
||||
}
|
||||
}
|
||||
if (get_sv_count == 11) {
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
||||
db->GetColumnFamilyHandle(i))
|
||||
->cfd();
|
||||
ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
|
||||
}
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
std::vector<int> cfs;
|
||||
std::vector<std::string> keys;
|
||||
std::vector<std::string> values;
|
||||
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
cfs.push_back(i);
|
||||
keys.push_back("cf" + std::to_string(i) + "_key");
|
||||
}
|
||||
|
||||
values = MultiGet(cfs, keys);
|
||||
ASSERT_EQ(values.size(), 8);
|
||||
for (unsigned int j = 0; j < values.size(); ++j) {
|
||||
ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val2");
|
||||
}
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
||||
reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
|
||||
->cfd();
|
||||
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
|
||||
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, MultiGetMultiCFMutex) {
|
||||
Options options = CurrentOptions();
|
||||
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
|
||||
"alyosha", "popovich"},
|
||||
options);
|
||||
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
|
||||
"cf" + std::to_string(i) + "_val"));
|
||||
}
|
||||
|
||||
int get_sv_count = 0;
|
||||
int retries = 0;
|
||||
bool last_try = false;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
|
||||
last_try = true;
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
|
||||
if (last_try) {
|
||||
return;
|
||||
}
|
||||
if (++get_sv_count == 2) {
|
||||
++retries;
|
||||
get_sv_count = 0;
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
ASSERT_OK(Flush(i));
|
||||
ASSERT_OK(Put(
|
||||
i, "cf" + std::to_string(i) + "_key",
|
||||
"cf" + std::to_string(i) + "_val" + std::to_string(retries)));
|
||||
}
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
std::vector<int> cfs;
|
||||
std::vector<std::string> keys;
|
||||
std::vector<std::string> values;
|
||||
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
cfs.push_back(i);
|
||||
keys.push_back("cf" + std::to_string(i) + "_key");
|
||||
}
|
||||
|
||||
values = MultiGet(cfs, keys);
|
||||
ASSERT_TRUE(last_try);
|
||||
ASSERT_EQ(values.size(), 8);
|
||||
for (unsigned int j = 0; j < values.size(); ++j) {
|
||||
ASSERT_EQ(values[j],
|
||||
"cf" + std::to_string(j) + "_val" + std::to_string(retries));
|
||||
}
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
||||
reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
|
||||
->cfd();
|
||||
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) {
|
||||
Options options = CurrentOptions();
|
||||
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
|
||||
"alyosha", "popovich"},
|
||||
options);
|
||||
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
|
||||
"cf" + std::to_string(i) + "_val"));
|
||||
}
|
||||
|
||||
int get_sv_count = 0;
|
||||
rocksdb::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
|
||||
if (++get_sv_count == 2) {
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
ASSERT_OK(Flush(i));
|
||||
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
|
||||
"cf" + std::to_string(i) + "_val2"));
|
||||
}
|
||||
}
|
||||
if (get_sv_count == 8) {
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
||||
db->GetColumnFamilyHandle(i))
|
||||
->cfd();
|
||||
ASSERT_TRUE(
|
||||
(cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
|
||||
(cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
|
||||
}
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
std::vector<int> cfs;
|
||||
std::vector<std::string> keys;
|
||||
std::vector<std::string> values;
|
||||
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
cfs.push_back(i);
|
||||
keys.push_back("cf" + std::to_string(i) + "_key");
|
||||
}
|
||||
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
values = MultiGet(cfs, keys, snapshot);
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
ASSERT_EQ(values.size(), 8);
|
||||
for (unsigned int j = 0; j < values.size(); ++j) {
|
||||
ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
|
||||
}
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
||||
reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
|
||||
->cfd();
|
||||
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
107
db/db_impl.cc
107
db/db_impl.cc
@ -1340,33 +1340,96 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
struct MultiGetColumnFamilyData {
|
||||
ColumnFamilyData* cfd;
|
||||
SuperVersion* super_version;
|
||||
MultiGetColumnFamilyData(ColumnFamilyData* cf, SuperVersion* sv)
|
||||
: cfd(cf), super_version(sv) {}
|
||||
};
|
||||
std::unordered_map<uint32_t, MultiGetColumnFamilyData*> multiget_cf_data;
|
||||
// fill up and allocate outside of mutex
|
||||
std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
|
||||
column_family.size());
|
||||
for (auto cf : column_family) {
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
|
||||
auto cfd = cfh->cfd();
|
||||
if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
|
||||
auto mgcfd = new MultiGetColumnFamilyData();
|
||||
mgcfd->cfd = cfd;
|
||||
multiget_cf_data.insert({cfd->GetID(), mgcfd});
|
||||
multiget_cf_data.emplace(cfd->GetID(),
|
||||
MultiGetColumnFamilyData(cfd, nullptr));
|
||||
}
|
||||
}
|
||||
|
||||
bool last_try = false;
|
||||
{
|
||||
// If we end up with the same issue of memtable geting sealed during 2
|
||||
// consecutive retries, it means the write rate is very high. In that case
|
||||
// its probably ok to take the mutex on the 3rd try so we can succeed for
|
||||
// sure
|
||||
static const int num_retries = 3;
|
||||
for (auto i = 0; i < num_retries; ++i) {
|
||||
last_try = (i == num_retries - 1);
|
||||
bool retry = false;
|
||||
|
||||
if (i > 0) {
|
||||
for (auto mgd_iter = multiget_cf_data.begin();
|
||||
mgd_iter != multiget_cf_data.end(); ++mgd_iter) {
|
||||
auto super_version = mgd_iter->second.super_version;
|
||||
auto cfd = mgd_iter->second.cfd;
|
||||
if (super_version != nullptr) {
|
||||
ReturnAndCleanupSuperVersion(cfd, super_version);
|
||||
}
|
||||
mgd_iter->second.super_version = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (read_options.snapshot == nullptr) {
|
||||
if (last_try) {
|
||||
TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
|
||||
// We're close to max number of retries. For the last retry,
|
||||
// acquire the lock so we're sure to succeed
|
||||
mutex_.Lock();
|
||||
if (read_options.snapshot != nullptr) {
|
||||
snapshot =
|
||||
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
|
||||
} else {
|
||||
}
|
||||
snapshot = last_seq_same_as_publish_seq_
|
||||
? versions_->LastSequence()
|
||||
: versions_->LastPublishedSequence();
|
||||
} else {
|
||||
snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
|
||||
->number_;
|
||||
}
|
||||
for (auto mgd_iter : multiget_cf_data) {
|
||||
mgd_iter.second->super_version =
|
||||
mgd_iter.second->cfd->GetSuperVersion()->Ref();
|
||||
|
||||
for (auto mgd_iter = multiget_cf_data.begin();
|
||||
mgd_iter != multiget_cf_data.end(); ++mgd_iter) {
|
||||
if (!last_try) {
|
||||
mgd_iter->second.super_version =
|
||||
GetAndRefSuperVersion(mgd_iter->second.cfd);
|
||||
} else {
|
||||
mgd_iter->second.super_version =
|
||||
mgd_iter->second.cfd->GetSuperVersion()->Ref();
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
|
||||
if (read_options.snapshot != nullptr || last_try) {
|
||||
// If user passed a snapshot, then we don't care if a memtable is
|
||||
// sealed or compaction happens because the snapshot would ensure
|
||||
// that older key versions are kept around. If this is the last
|
||||
// retry, then we have the lock so nothing bad can happen
|
||||
continue;
|
||||
}
|
||||
// We could get the earliest sequence number for the whole list of
|
||||
// memtables, which will include immutable memtables as well, but that
|
||||
// might be tricky to maintain in case we decide, in future, to do
|
||||
// memtable compaction.
|
||||
if (!last_try) {
|
||||
auto seq =
|
||||
mgd_iter->second.super_version->mem->GetEarliestSequenceNumber();
|
||||
if (seq > snapshot) {
|
||||
retry = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!retry) {
|
||||
if (last_try) {
|
||||
mutex_.Unlock();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Contain a list of merge operations if merge occurs.
|
||||
MergeContext merge_context;
|
||||
@ -1396,7 +1459,7 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
|
||||
assert(mgd_iter != multiget_cf_data.end());
|
||||
auto mgd = mgd_iter->second;
|
||||
auto super_version = mgd->super_version;
|
||||
auto super_version = mgd.super_version;
|
||||
bool skip_memtable =
|
||||
(read_options.read_tier == kPersistedTier &&
|
||||
has_unpersisted_data_.load(std::memory_order_relaxed));
|
||||
@ -1432,24 +1495,14 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
PERF_TIMER_GUARD(get_post_process_time);
|
||||
autovector<SuperVersion*> superversions_to_delete;
|
||||
|
||||
// TODO(icanadi) do we need lock here or just around Cleanup()?
|
||||
mutex_.Lock();
|
||||
for (auto mgd_iter : multiget_cf_data) {
|
||||
auto mgd = mgd_iter.second;
|
||||
if (mgd->super_version->Unref()) {
|
||||
mgd->super_version->Cleanup();
|
||||
superversions_to_delete.push_back(mgd->super_version);
|
||||
if (!last_try) {
|
||||
ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
|
||||
} else {
|
||||
mgd.cfd->GetSuperVersion()->Unref();
|
||||
}
|
||||
}
|
||||
mutex_.Unlock();
|
||||
|
||||
for (auto td : superversions_to_delete) {
|
||||
delete td;
|
||||
}
|
||||
for (auto mgd : multiget_cf_data) {
|
||||
delete mgd.second;
|
||||
}
|
||||
|
||||
RecordTick(stats_, NUMBER_MULTIGET_CALLS);
|
||||
RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
|
||||
RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
|
||||
|
@ -763,6 +763,31 @@ std::string DBTestBase::Get(int cf, const std::string& k,
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
|
||||
const std::vector<std::string>& k,
|
||||
const Snapshot* snapshot) {
|
||||
ReadOptions options;
|
||||
options.verify_checksums = true;
|
||||
options.snapshot = snapshot;
|
||||
std::vector<ColumnFamilyHandle*> handles;
|
||||
std::vector<Slice> keys;
|
||||
std::vector<std::string> result;
|
||||
|
||||
for (unsigned int i = 0; i < cfs.size(); ++i) {
|
||||
handles.push_back(handles_[cfs[i]]);
|
||||
keys.push_back(k[i]);
|
||||
}
|
||||
std::vector<Status> s = db_->MultiGet(options, handles, keys, &result);
|
||||
for (unsigned int i = 0; i < s.size(); ++i) {
|
||||
if (s[i].IsNotFound()) {
|
||||
result[i] = "NOT_FOUND";
|
||||
} else if (!s[i].ok()) {
|
||||
result[i] = s[i].ToString();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
Status DBTestBase::Get(const std::string& k, PinnableSlice* v) {
|
||||
ReadOptions options;
|
||||
options.verify_checksums = true;
|
||||
|
@ -840,6 +840,10 @@ class DBTestBase : public testing::Test {
|
||||
|
||||
Status Get(const std::string& k, PinnableSlice* v);
|
||||
|
||||
std::vector<std::string> MultiGet(std::vector<int> cfs,
|
||||
const std::vector<std::string>& k,
|
||||
const Snapshot* snapshot = nullptr);
|
||||
|
||||
uint64_t GetNumSnapshots();
|
||||
|
||||
uint64_t GetTimeOldestSnapshots();
|
||||
|
Loading…
Reference in New Issue
Block a user