diff --git a/db/skiplist.h b/db/skiplist.h index c61c1947e..787fad59d 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -120,7 +120,10 @@ class SkipList { // values are ok. std::atomic max_height_; // Height of the entire list - // Used for optimizing sequential insert patterns + // Used for optimizing sequential insert patterns. Tricky. prev_[i] for + // i up to max_height_ is the predecessor of prev_[0] and prev_height_ + // is the height of prev_[0]. prev_[0] can only be equal to head before + // insertion, in which case max_height_ and prev_height_ are 1. Node** prev_; int32_t prev_height_; @@ -138,16 +141,15 @@ class SkipList { // Return true if key is greater than the data stored in "n" bool KeyIsAfterNode(const Key& key, Node* n) const; - // Return the earliest node that comes at or after key. + // Returns the earliest node with a key >= key. // Return nullptr if there is no such node. - // - // If prev is non-nullptr, fills prev[level] with pointer to previous - // node at "level" for every level in [0..max_height_-1]. - Node* FindGreaterOrEqual(const Key& key, Node** prev) const; + Node* FindGreaterOrEqual(const Key& key) const; // Return the latest node with a key < key. // Return head_ if there is no such node. - Node* FindLessThan(const Key& key) const; + // Fills prev[level] with pointer to previous node at "level" for every + // level in [0..max_height_-1], if prev is non-null. + Node* FindLessThan(const Key& key, Node** prev = nullptr) const; // Return the last node in the list. // Return head_ if list is empty. @@ -244,7 +246,7 @@ inline void SkipList::Iterator::Prev() { template inline void SkipList::Iterator::Seek(const Key& target) { - node_ = list_->FindGreaterOrEqual(target, nullptr); + node_ = list_->FindGreaterOrEqual(target); } template @@ -280,59 +282,61 @@ bool SkipList::KeyIsAfterNode(const Key& key, Node* n) const { template typename SkipList::Node* SkipList:: - FindGreaterOrEqual(const Key& key, Node** prev) const { - // Use prev as an optimization hint and fallback to slow path - if (prev && !KeyIsAfterNode(key, prev[0]->Next(0))) { - Node* x = prev[0]; - Node* next = x->Next(0); - if ((x == head_) || KeyIsAfterNode(key, x)) { - // Adjust all relevant insertion points to the previous entry - for (int i = 1; i < prev_height_; i++) { - prev[i] = x; - } - return next; - } - } - // Normal lookup + FindGreaterOrEqual(const Key& key) const { + // Note: It looks like we could reduce duplication by implementing + // this function as FindLessThan(key)->Next(0), but we wouldn't be able + // to exit early on equality and the result wouldn't even be correct. + // A concurrent insert might occur after FindLessThan(key) but before + // we get a chance to call Next(0). Node* x = head_; int level = GetMaxHeight() - 1; + Node* last_bigger = nullptr; while (true) { Node* next = x->Next(level); - // Make sure the lists are sorted. - // If x points to head_ or next points nullptr, it is trivially satisfied. - assert((x == head_) || (next == nullptr) || KeyIsAfterNode(next->key, x)); - if (KeyIsAfterNode(key, next)) { + // Make sure the lists are sorted + assert(x == head_ || next == nullptr || KeyIsAfterNode(next->key, x)); + // Make sure we haven't overshot during our search + assert(x == head_ || KeyIsAfterNode(key, x)); + int cmp = (next == nullptr || next == last_bigger) + ? 1 : compare_(next->key, key); + if (cmp == 0 || (cmp > 0 && level == 0)) { + return next; + } else if (cmp < 0) { // Keep searching in this list x = next; } else { - if (prev != nullptr) prev[level] = x; - if (level == 0) { - return next; - } else { - // Switch to next list - level--; - } + // Switch to next list, reuse compare_() result + last_bigger = next; + level--; } } } template typename SkipList::Node* -SkipList::FindLessThan(const Key& key) const { +SkipList::FindLessThan(const Key& key, Node** prev) const { Node* x = head_; int level = GetMaxHeight() - 1; + // KeyIsAfter(key, last_not_after) is definitely false + Node* last_not_after = nullptr; while (true) { - assert(x == head_ || compare_(x->key, key) < 0); Node* next = x->Next(level); - if (next == nullptr || compare_(next->key, key) >= 0) { + assert(x == head_ || next == nullptr || KeyIsAfterNode(next->key, x)); + assert(x == head_ || KeyIsAfterNode(key, x)); + if (next != last_not_after && KeyIsAfterNode(key, next)) { + // Keep searching in this list + x = next; + } else { + if (prev != nullptr) { + prev[level] = x; + } if (level == 0) { return x; } else { - // Switch to next list + // Switch to next list, reuse KeyIUsAfterNode() result + last_not_after = next; level--; } - } else { - x = next; } } } @@ -408,12 +412,27 @@ SkipList::SkipList(const Comparator cmp, Allocator* allocator, template void SkipList::Insert(const Key& key) { - // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() - // here since Insert() is externally synchronized. - Node* x = FindGreaterOrEqual(key, prev_); + // fast path for sequential insertion + if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) && + (prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) { + assert(prev_[0] != head_ || (prev_height_ == 1 && GetMaxHeight() == 1)); + + // Outside of this method prev_[1..max_height_] is the predecessor + // of prev_[0], and prev_height_ refers to prev_[0]. Inside Insert + // prev_[0..max_height - 1] is the predecessor of key. Switch from + // the external state to the internal + for (int i = 1; i < prev_height_; i++) { + prev_[i] = prev_[0]; + } + } else { + // TODO(opt): we could use a NoBarrier predecessor search as an + // optimization for architectures where memory_order_acquire needs + // a synchronization instruction. Doesn't matter on x86 + FindLessThan(key, prev_); + } // Our data structure does not allow duplicate insertion - assert(x == nullptr || !Equal(key, x->key)); + assert(prev_[0]->Next(0) == nullptr || !Equal(key, prev_[0]->Next(0)->key)); int height = RandomHeight(); if (height > GetMaxHeight()) { @@ -432,7 +451,7 @@ void SkipList::Insert(const Key& key) { max_height_.store(height, std::memory_order_relaxed); } - x = NewNode(key, height); + Node* x = NewNode(key, height); for (int i = 0; i < height; i++) { // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. @@ -445,7 +464,7 @@ void SkipList::Insert(const Key& key) { template bool SkipList::Contains(const Key& key) const { - Node* x = FindGreaterOrEqual(key, nullptr); + Node* x = FindGreaterOrEqual(key); if (x != nullptr && Equal(key, x->key)) { return true; } else {