rocksdb/memtable/inlineskiplist_test.cc
Lingjing You 1a928c22a0 Add insert hints for each writebatch (#5728)
Summary:
Add insert hints for each writebatch so that they can be used in concurrent write, and add write option to enable it.

Bench result (qps):

`./db_bench --benchmarks=fillseq -allow_concurrent_memtable_write=true -num=4000000 -batch-size=1 -threads=1 -db=/data3/ylj/tmp -write_buffer_size=536870912 -num_column_families=4`

master:

| batch size \ thread num | 1       | 2       | 4       | 8       |
| ----------------------- | ------- | ------- | ------- | ------- |
| 1                       | 387883  | 220790  | 308294  | 490998  |
| 10                      | 1397208 | 978911  | 1275684 | 1733395 |
| 100                     | 2045414 | 1589927 | 1798782 | 2681039 |
| 1000                    | 2228038 | 1698252 | 1839877 | 2863490 |

fillseq with writebatch hint:

| batch size \ thread num | 1       | 2       | 4       | 8       |
| ----------------------- | ------- | ------- | ------- | ------- |
| 1                       | 286005  | 223570  | 300024  | 466981  |
| 10                      | 970374  | 813308  | 1399299 | 1753588 |
| 100                     | 1962768 | 1983023 | 2676577 | 3086426 |
| 1000                    | 2195853 | 2676782 | 3231048 | 3638143 |
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5728

Differential Revision: D17297240

fbshipit-source-id: b053590a6d77871f1ef2f911a7bd013b3899b26c
2019-09-12 17:15:18 -07:00

664 lines
18 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "memtable/inlineskiplist.h"
#include <set>
#include <unordered_set>
#include "memory/concurrent_arena.h"
#include "rocksdb/env.h"
#include "test_util/testharness.h"
#include "util/hash.h"
#include "util/random.h"
namespace rocksdb {
// Our test skip list stores 8-byte unsigned integers
typedef uint64_t Key;
static const char* Encode(const uint64_t* key) {
return reinterpret_cast<const char*>(key);
}
static Key Decode(const char* key) {
Key rv;
memcpy(&rv, key, sizeof(Key));
return rv;
}
struct TestComparator {
typedef Key DecodedType;
static DecodedType decode_key(const char* b) {
return Decode(b);
}
int operator()(const char* a, const char* b) const {
if (Decode(a) < Decode(b)) {
return -1;
} else if (Decode(a) > Decode(b)) {
return +1;
} else {
return 0;
}
}
int operator()(const char* a, const DecodedType b) const {
if (Decode(a) < b) {
return -1;
} else if (Decode(a) > b) {
return +1;
} else {
return 0;
}
}
};
typedef InlineSkipList<TestComparator> TestInlineSkipList;
class InlineSkipTest : public testing::Test {
public:
void Insert(TestInlineSkipList* list, Key key) {
char* buf = list->AllocateKey(sizeof(Key));
memcpy(buf, &key, sizeof(Key));
list->Insert(buf);
keys_.insert(key);
}
bool InsertWithHint(TestInlineSkipList* list, Key key, void** hint) {
char* buf = list->AllocateKey(sizeof(Key));
memcpy(buf, &key, sizeof(Key));
bool res = list->InsertWithHint(buf, hint);
keys_.insert(key);
return res;
}
void Validate(TestInlineSkipList* list) {
// Check keys exist.
for (Key key : keys_) {
ASSERT_TRUE(list->Contains(Encode(&key)));
}
// Iterate over the list, make sure keys appears in order and no extra
// keys exist.
TestInlineSkipList::Iterator iter(list);
ASSERT_FALSE(iter.Valid());
Key zero = 0;
iter.Seek(Encode(&zero));
for (Key key : keys_) {
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(key, Decode(iter.key()));
iter.Next();
}
ASSERT_FALSE(iter.Valid());
// Validate the list is well-formed.
list->TEST_Validate();
}
private:
std::set<Key> keys_;
};
TEST_F(InlineSkipTest, Empty) {
Arena arena;
TestComparator cmp;
InlineSkipList<TestComparator> list(cmp, &arena);
Key key = 10;
ASSERT_TRUE(!list.Contains(Encode(&key)));
InlineSkipList<TestComparator>::Iterator iter(&list);
ASSERT_TRUE(!iter.Valid());
iter.SeekToFirst();
ASSERT_TRUE(!iter.Valid());
key = 100;
iter.Seek(Encode(&key));
ASSERT_TRUE(!iter.Valid());
iter.SeekForPrev(Encode(&key));
ASSERT_TRUE(!iter.Valid());
iter.SeekToLast();
ASSERT_TRUE(!iter.Valid());
}
TEST_F(InlineSkipTest, InsertAndLookup) {
const int N = 2000;
const int R = 5000;
Random rnd(1000);
std::set<Key> keys;
ConcurrentArena arena;
TestComparator cmp;
InlineSkipList<TestComparator> list(cmp, &arena);
for (int i = 0; i < N; i++) {
Key key = rnd.Next() % R;
if (keys.insert(key).second) {
char* buf = list.AllocateKey(sizeof(Key));
memcpy(buf, &key, sizeof(Key));
list.Insert(buf);
}
}
for (Key i = 0; i < R; i++) {
if (list.Contains(Encode(&i))) {
ASSERT_EQ(keys.count(i), 1U);
} else {
ASSERT_EQ(keys.count(i), 0U);
}
}
// Simple iterator tests
{
InlineSkipList<TestComparator>::Iterator iter(&list);
ASSERT_TRUE(!iter.Valid());
uint64_t zero = 0;
iter.Seek(Encode(&zero));
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*(keys.begin()), Decode(iter.key()));
uint64_t max_key = R - 1;
iter.SeekForPrev(Encode(&max_key));
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*(keys.rbegin()), Decode(iter.key()));
iter.SeekToFirst();
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*(keys.begin()), Decode(iter.key()));
iter.SeekToLast();
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*(keys.rbegin()), Decode(iter.key()));
}
// Forward iteration test
for (Key i = 0; i < R; i++) {
InlineSkipList<TestComparator>::Iterator iter(&list);
iter.Seek(Encode(&i));
// Compare against model iterator
std::set<Key>::iterator model_iter = keys.lower_bound(i);
for (int j = 0; j < 3; j++) {
if (model_iter == keys.end()) {
ASSERT_TRUE(!iter.Valid());
break;
} else {
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*model_iter, Decode(iter.key()));
++model_iter;
iter.Next();
}
}
}
// Backward iteration test
for (Key i = 0; i < R; i++) {
InlineSkipList<TestComparator>::Iterator iter(&list);
iter.SeekForPrev(Encode(&i));
// Compare against model iterator
std::set<Key>::iterator model_iter = keys.upper_bound(i);
for (int j = 0; j < 3; j++) {
if (model_iter == keys.begin()) {
ASSERT_TRUE(!iter.Valid());
break;
} else {
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*--model_iter, Decode(iter.key()));
iter.Prev();
}
}
}
}
TEST_F(InlineSkipTest, InsertWithHint_Sequential) {
const int N = 100000;
Arena arena;
TestComparator cmp;
TestInlineSkipList list(cmp, &arena);
void* hint = nullptr;
for (int i = 0; i < N; i++) {
Key key = i;
InsertWithHint(&list, key, &hint);
}
Validate(&list);
}
TEST_F(InlineSkipTest, InsertWithHint_MultipleHints) {
const int N = 100000;
const int S = 100;
Random rnd(534);
Arena arena;
TestComparator cmp;
TestInlineSkipList list(cmp, &arena);
void* hints[S];
Key last_key[S];
for (int i = 0; i < S; i++) {
hints[i] = nullptr;
last_key[i] = 0;
}
for (int i = 0; i < N; i++) {
Key s = rnd.Uniform(S);
Key key = (s << 32) + (++last_key[s]);
InsertWithHint(&list, key, &hints[s]);
}
Validate(&list);
}
TEST_F(InlineSkipTest, InsertWithHint_MultipleHintsRandom) {
const int N = 100000;
const int S = 100;
Random rnd(534);
Arena arena;
TestComparator cmp;
TestInlineSkipList list(cmp, &arena);
void* hints[S];
for (int i = 0; i < S; i++) {
hints[i] = nullptr;
}
for (int i = 0; i < N; i++) {
Key s = rnd.Uniform(S);
Key key = (s << 32) + rnd.Next();
InsertWithHint(&list, key, &hints[s]);
}
Validate(&list);
}
TEST_F(InlineSkipTest, InsertWithHint_CompatibleWithInsertWithoutHint) {
const int N = 100000;
const int S1 = 100;
const int S2 = 100;
Random rnd(534);
Arena arena;
TestComparator cmp;
TestInlineSkipList list(cmp, &arena);
std::unordered_set<Key> used;
Key with_hint[S1];
Key without_hint[S2];
void* hints[S1];
for (int i = 0; i < S1; i++) {
hints[i] = nullptr;
while (true) {
Key s = rnd.Next();
if (used.insert(s).second) {
with_hint[i] = s;
break;
}
}
}
for (int i = 0; i < S2; i++) {
while (true) {
Key s = rnd.Next();
if (used.insert(s).second) {
without_hint[i] = s;
break;
}
}
}
for (int i = 0; i < N; i++) {
Key s = rnd.Uniform(S1 + S2);
if (s < S1) {
Key key = (with_hint[s] << 32) + rnd.Next();
InsertWithHint(&list, key, &hints[s]);
} else {
Key key = (without_hint[s - S1] << 32) + rnd.Next();
Insert(&list, key);
}
}
Validate(&list);
}
#ifndef ROCKSDB_VALGRIND_RUN
// We want to make sure that with a single writer and multiple
// concurrent readers (with no synchronization other than when a
// reader's iterator is created), the reader always observes all the
// data that was present in the skip list when the iterator was
// constructor. Because insertions are happening concurrently, we may
// also observe new values that were inserted since the iterator was
// constructed, but we should never miss any values that were present
// at iterator construction time.
//
// We generate multi-part keys:
// <key,gen,hash>
// where:
// key is in range [0..K-1]
// gen is a generation number for key
// hash is hash(key,gen)
//
// The insertion code picks a random key, sets gen to be 1 + the last
// generation number inserted for that key, and sets hash to Hash(key,gen).
//
// At the beginning of a read, we snapshot the last inserted
// generation number for each key. We then iterate, including random
// calls to Next() and Seek(). For every key we encounter, we
// check that it is either expected given the initial snapshot or has
// been concurrently added since the iterator started.
class ConcurrentTest {
public:
static const uint32_t K = 8;
private:
static uint64_t key(Key key) { return (key >> 40); }
static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
static uint64_t hash(Key key) { return key & 0xff; }
static uint64_t HashNumbers(uint64_t k, uint64_t g) {
uint64_t data[2] = {k, g};
return Hash(reinterpret_cast<char*>(data), sizeof(data), 0);
}
static Key MakeKey(uint64_t k, uint64_t g) {
assert(sizeof(Key) == sizeof(uint64_t));
assert(k <= K); // We sometimes pass K to seek to the end of the skiplist
assert(g <= 0xffffffffu);
return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff));
}
static bool IsValidKey(Key k) {
return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff);
}
static Key RandomTarget(Random* rnd) {
switch (rnd->Next() % 10) {
case 0:
// Seek to beginning
return MakeKey(0, 0);
case 1:
// Seek to end
return MakeKey(K, 0);
default:
// Seek to middle
return MakeKey(rnd->Next() % K, 0);
}
}
// Per-key generation
struct State {
std::atomic<int> generation[K];
void Set(int k, int v) {
generation[k].store(v, std::memory_order_release);
}
int Get(int k) { return generation[k].load(std::memory_order_acquire); }
State() {
for (unsigned int k = 0; k < K; k++) {
Set(k, 0);
}
}
};
// Current state of the test
State current_;
ConcurrentArena arena_;
// InlineSkipList is not protected by mu_. We just use a single writer
// thread to modify it.
InlineSkipList<TestComparator> list_;
public:
ConcurrentTest() : list_(TestComparator(), &arena_) {}
// REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep
void WriteStep(Random* rnd) {
const uint32_t k = rnd->Next() % K;
const int g = current_.Get(k) + 1;
const Key new_key = MakeKey(k, g);
char* buf = list_.AllocateKey(sizeof(Key));
memcpy(buf, &new_key, sizeof(Key));
list_.Insert(buf);
current_.Set(k, g);
}
// REQUIRES: No concurrent calls for the same k
void ConcurrentWriteStep(uint32_t k, bool use_hint = false) {
const int g = current_.Get(k) + 1;
const Key new_key = MakeKey(k, g);
char* buf = list_.AllocateKey(sizeof(Key));
memcpy(buf, &new_key, sizeof(Key));
if (use_hint) {
void* hint = nullptr;
list_.InsertWithHintConcurrently(buf, &hint);
delete[] reinterpret_cast<char*>(hint);
} else {
list_.InsertConcurrently(buf);
}
ASSERT_EQ(g, current_.Get(k) + 1);
current_.Set(k, g);
}
void ReadStep(Random* rnd) {
// Remember the initial committed state of the skiplist.
State initial_state;
for (unsigned int k = 0; k < K; k++) {
initial_state.Set(k, current_.Get(k));
}
Key pos = RandomTarget(rnd);
InlineSkipList<TestComparator>::Iterator iter(&list_);
iter.Seek(Encode(&pos));
while (true) {
Key current;
if (!iter.Valid()) {
current = MakeKey(K, 0);
} else {
current = Decode(iter.key());
ASSERT_TRUE(IsValidKey(current)) << current;
}
ASSERT_LE(pos, current) << "should not go backwards";
// Verify that everything in [pos,current) was not present in
// initial_state.
while (pos < current) {
ASSERT_LT(key(pos), K) << pos;
// Note that generation 0 is never inserted, so it is ok if
// <*,0,*> is missing.
ASSERT_TRUE((gen(pos) == 0U) ||
(gen(pos) > static_cast<uint64_t>(initial_state.Get(
static_cast<int>(key(pos))))))
<< "key: " << key(pos) << "; gen: " << gen(pos)
<< "; initgen: " << initial_state.Get(static_cast<int>(key(pos)));
// Advance to next key in the valid key space
if (key(pos) < key(current)) {
pos = MakeKey(key(pos) + 1, 0);
} else {
pos = MakeKey(key(pos), gen(pos) + 1);
}
}
if (!iter.Valid()) {
break;
}
if (rnd->Next() % 2) {
iter.Next();
pos = MakeKey(key(pos), gen(pos) + 1);
} else {
Key new_target = RandomTarget(rnd);
if (new_target > pos) {
pos = new_target;
iter.Seek(Encode(&new_target));
}
}
}
}
};
const uint32_t ConcurrentTest::K;
// Simple test that does single-threaded testing of the ConcurrentTest
// scaffolding.
TEST_F(InlineSkipTest, ConcurrentReadWithoutThreads) {
ConcurrentTest test;
Random rnd(test::RandomSeed());
for (int i = 0; i < 10000; i++) {
test.ReadStep(&rnd);
test.WriteStep(&rnd);
}
}
TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) {
ConcurrentTest test;
Random rnd(test::RandomSeed());
for (int i = 0; i < 10000; i++) {
test.ReadStep(&rnd);
uint32_t base = rnd.Next();
for (int j = 0; j < 4; ++j) {
test.ConcurrentWriteStep((base + j) % ConcurrentTest::K);
}
}
}
class TestState {
public:
ConcurrentTest t_;
bool use_hint_;
int seed_;
std::atomic<bool> quit_flag_;
std::atomic<uint32_t> next_writer_;
enum ReaderState { STARTING, RUNNING, DONE };
explicit TestState(int s)
: seed_(s),
quit_flag_(false),
state_(STARTING),
pending_writers_(0),
state_cv_(&mu_) {}
void Wait(ReaderState s) {
mu_.Lock();
while (state_ != s) {
state_cv_.Wait();
}
mu_.Unlock();
}
void Change(ReaderState s) {
mu_.Lock();
state_ = s;
state_cv_.Signal();
mu_.Unlock();
}
void AdjustPendingWriters(int delta) {
mu_.Lock();
pending_writers_ += delta;
if (pending_writers_ == 0) {
state_cv_.Signal();
}
mu_.Unlock();
}
void WaitForPendingWriters() {
mu_.Lock();
while (pending_writers_ != 0) {
state_cv_.Wait();
}
mu_.Unlock();
}
private:
port::Mutex mu_;
ReaderState state_;
int pending_writers_;
port::CondVar state_cv_;
};
static void ConcurrentReader(void* arg) {
TestState* state = reinterpret_cast<TestState*>(arg);
Random rnd(state->seed_);
int64_t reads = 0;
state->Change(TestState::RUNNING);
while (!state->quit_flag_.load(std::memory_order_acquire)) {
state->t_.ReadStep(&rnd);
++reads;
}
state->Change(TestState::DONE);
}
static void ConcurrentWriter(void* arg) {
TestState* state = reinterpret_cast<TestState*>(arg);
uint32_t k = state->next_writer_++ % ConcurrentTest::K;
state->t_.ConcurrentWriteStep(k, state->use_hint_);
state->AdjustPendingWriters(-1);
}
static void RunConcurrentRead(int run) {
const int seed = test::RandomSeed() + (run * 100);
Random rnd(seed);
const int N = 1000;
const int kSize = 1000;
for (int i = 0; i < N; i++) {
if ((i % 100) == 0) {
fprintf(stderr, "Run %d of %d\n", i, N);
}
TestState state(seed + 1);
Env::Default()->SetBackgroundThreads(1);
Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; ++k) {
state.t_.WriteStep(&rnd);
}
state.quit_flag_.store(true, std::memory_order_release);
state.Wait(TestState::DONE);
}
}
static void RunConcurrentInsert(int run, bool use_hint = false,
int write_parallelism = 4) {
Env::Default()->SetBackgroundThreads(1 + write_parallelism,
Env::Priority::LOW);
const int seed = test::RandomSeed() + (run * 100);
Random rnd(seed);
const int N = 1000;
const int kSize = 1000;
for (int i = 0; i < N; i++) {
if ((i % 100) == 0) {
fprintf(stderr, "Run %d of %d\n", i, N);
}
TestState state(seed + 1);
state.use_hint_ = use_hint;
Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; k += write_parallelism) {
state.next_writer_ = rnd.Next();
state.AdjustPendingWriters(write_parallelism);
for (int p = 0; p < write_parallelism; ++p) {
Env::Default()->Schedule(ConcurrentWriter, &state);
}
state.WaitForPendingWriters();
}
state.quit_flag_.store(true, std::memory_order_release);
state.Wait(TestState::DONE);
}
}
TEST_F(InlineSkipTest, ConcurrentRead1) { RunConcurrentRead(1); }
TEST_F(InlineSkipTest, ConcurrentRead2) { RunConcurrentRead(2); }
TEST_F(InlineSkipTest, ConcurrentRead3) { RunConcurrentRead(3); }
TEST_F(InlineSkipTest, ConcurrentRead4) { RunConcurrentRead(4); }
TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); }
TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); }
TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); }
TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); }
TEST_F(InlineSkipTest, ConcurrentInsertWithHint1) {
RunConcurrentInsert(1, true);
}
TEST_F(InlineSkipTest, ConcurrentInsertWithHint2) {
RunConcurrentInsert(2, true);
}
TEST_F(InlineSkipTest, ConcurrentInsertWithHint3) {
RunConcurrentInsert(3, true);
}
#endif // ROCKSDB_VALGRIND_RUN
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}