skiplist: optimize for sequential insert pattern

Summary:
skiplist doesn't cache the location of the last insert and becomes
CPU bound when the input data has sequential keys.

Notes on thread safety: ::Insert() already requires external
synchronization. So this change is not making it any worse.

Test Plan: skiplist_test

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D3129
This commit is contained in:
Arun Sharma 2012-05-04 21:40:36 +00:00
parent cc6c32535a
commit 90b2924fb2

View File

@ -104,6 +104,9 @@ class SkipList {
// values are ok. // values are ok.
port::AtomicPointer max_height_; // Height of the entire list port::AtomicPointer max_height_; // Height of the entire list
// Used for optimizing sequential insert patterns
Node* prev_[kMaxHeight];
inline int GetMaxHeight() const { inline int GetMaxHeight() const {
return static_cast<int>( return static_cast<int>(
reinterpret_cast<intptr_t>(max_height_.NoBarrier_Load())); reinterpret_cast<intptr_t>(max_height_.NoBarrier_Load()));
@ -258,6 +261,15 @@ bool SkipList<Key,Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
template<typename Key, class Comparator> template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev) typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
const { const {
// Use prev as an optimization hint and fallback to slow path
if (prev && !KeyIsAfterNode(key, prev[0]->Next(0))) {
Node* x = prev[0];
Node* next = x->Next(0);
if ((x == head_) || KeyIsAfterNode(key, x)) {
return next;
}
}
// Normal lookup
Node* x = head_; Node* x = head_;
int level = GetMaxHeight() - 1; int level = GetMaxHeight() - 1;
while (true) { while (true) {
@ -327,6 +339,7 @@ SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
rnd_(0xdeadbeef) { rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) { for (int i = 0; i < kMaxHeight; i++) {
head_->SetNext(i, NULL); head_->SetNext(i, NULL);
prev_[i] = head_;
} }
} }
@ -334,8 +347,7 @@ template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) { void SkipList<Key,Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized. // here since Insert() is externally synchronized.
Node* prev[kMaxHeight]; Node* x = FindGreaterOrEqual(key, prev_);
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion // Our data structure does not allow duplicate insertion
assert(x == NULL || !Equal(key, x->key)); assert(x == NULL || !Equal(key, x->key));
@ -343,7 +355,7 @@ void SkipList<Key,Comparator>::Insert(const Key& key) {
int height = RandomHeight(); int height = RandomHeight();
if (height > GetMaxHeight()) { if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) { for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_; prev_[i] = head_;
} }
//fprintf(stderr, "Change height from %d to %d\n", max_height_, height); //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
@ -361,9 +373,10 @@ void SkipList<Key,Comparator>::Insert(const Key& key) {
for (int i = 0; i < height; i++) { for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when // NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i]. // we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); x->NoBarrier_SetNext(i, prev_[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x); prev_[i]->SetNext(i, x);
} }
prev_[0] = x;
} }
template<typename Key, class Comparator> template<typename Key, class Comparator>