Fix data race in HashLinkList
Summary: 1) need to do acquire load when read the first entry in the bucket. 2) Make num_entries atomic Test Plan: Ran DBTest.MultiThreaded with TSAN Reviewers: yhchiang, rven, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D32361
This commit is contained in:
parent
2113ecd3c2
commit
e5aab4c2b2
@ -29,7 +29,7 @@ typedef std::atomic<void*> Pointer;
|
||||
// A data structure used as the header of a link list of a hash bucket.
|
||||
struct BucketHeader {
|
||||
Pointer next;
|
||||
uint32_t num_entries;
|
||||
std::atomic<uint32_t> num_entries;
|
||||
|
||||
explicit BucketHeader(void* n, uint32_t count)
|
||||
: next(n), num_entries(count) {}
|
||||
@ -37,6 +37,17 @@ struct BucketHeader {
|
||||
bool IsSkipListBucket() {
|
||||
return next.load(std::memory_order_relaxed) == this;
|
||||
}
|
||||
|
||||
uint32_t GetNumEntries() const {
|
||||
return num_entries.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
// REQUIRES: called from single-threaded Insert()
|
||||
void IncNumEntries() {
|
||||
// Only one thread can do write at one time. No need to do atomic
|
||||
// incremental. Update it with relaxed load and store.
|
||||
num_entries.store(GetNumEntries() + 1, std::memory_order_relaxed);
|
||||
}
|
||||
};
|
||||
|
||||
// A data structure used as the header of a skip list of a hash bucket.
|
||||
@ -503,14 +514,14 @@ SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader(
|
||||
// Counting header
|
||||
BucketHeader* header = reinterpret_cast<BucketHeader*>(first_next_pointer);
|
||||
if (header->IsSkipListBucket()) {
|
||||
assert(header->num_entries > threshold_use_skiplist_);
|
||||
assert(header->GetNumEntries() > threshold_use_skiplist_);
|
||||
auto* skip_list_bucket_header =
|
||||
reinterpret_cast<SkipListBucketHeader*>(header);
|
||||
assert(skip_list_bucket_header->Counting_header.next.load(
|
||||
std::memory_order_relaxed) == header);
|
||||
return skip_list_bucket_header;
|
||||
}
|
||||
assert(header->num_entries <= threshold_use_skiplist_);
|
||||
assert(header->GetNumEntries() <= threshold_use_skiplist_);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -525,11 +536,11 @@ Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const {
|
||||
// Counting header
|
||||
BucketHeader* header = reinterpret_cast<BucketHeader*>(first_next_pointer);
|
||||
if (!header->IsSkipListBucket()) {
|
||||
assert(header->num_entries <= threshold_use_skiplist_);
|
||||
assert(header->GetNumEntries() <= threshold_use_skiplist_);
|
||||
return reinterpret_cast<Node*>(
|
||||
header->next.load(std::memory_order_relaxed));
|
||||
header->next.load(std::memory_order_acquire));
|
||||
}
|
||||
assert(header->num_entries > threshold_use_skiplist_);
|
||||
assert(header->GetNumEntries() > threshold_use_skiplist_);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -568,26 +579,28 @@ void HashLinkListRep::Insert(KeyHandle handle) {
|
||||
header = reinterpret_cast<BucketHeader*>(first_next_pointer);
|
||||
if (header->IsSkipListBucket()) {
|
||||
// Case 4. Bucket is already a skip list
|
||||
assert(header->num_entries > threshold_use_skiplist_);
|
||||
assert(header->GetNumEntries() > threshold_use_skiplist_);
|
||||
auto* skip_list_bucket_header =
|
||||
reinterpret_cast<SkipListBucketHeader*>(header);
|
||||
skip_list_bucket_header->Counting_header.num_entries++;
|
||||
// Only one thread can execute Insert() at one time. No need to do atomic
|
||||
// incremental.
|
||||
skip_list_bucket_header->Counting_header.IncNumEntries();
|
||||
skip_list_bucket_header->skip_list.Insert(x->key);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (bucket_entries_logging_threshold_ > 0 &&
|
||||
header->num_entries ==
|
||||
header->GetNumEntries() ==
|
||||
static_cast<uint32_t>(bucket_entries_logging_threshold_)) {
|
||||
Info(logger_,
|
||||
"HashLinkedList bucket %zu has more than %d "
|
||||
"entries. Key to insert: %s",
|
||||
GetHash(transformed), header->num_entries,
|
||||
GetHash(transformed), header->GetNumEntries(),
|
||||
GetLengthPrefixedSlice(x->key).ToString(true).c_str());
|
||||
}
|
||||
|
||||
if (header->num_entries == threshold_use_skiplist_) {
|
||||
if (header->GetNumEntries() == threshold_use_skiplist_) {
|
||||
// Case 3. number of entries reaches the threshold so need to convert to
|
||||
// skip list.
|
||||
LinkListIterator bucket_iter(
|
||||
@ -595,7 +608,7 @@ void HashLinkListRep::Insert(KeyHandle handle) {
|
||||
first_next_pointer->load(std::memory_order_relaxed)));
|
||||
auto mem = allocator_->AllocateAligned(sizeof(SkipListBucketHeader));
|
||||
SkipListBucketHeader* new_skip_list_header = new (mem)
|
||||
SkipListBucketHeader(compare_, allocator_, header->num_entries + 1);
|
||||
SkipListBucketHeader(compare_, allocator_, header->GetNumEntries() + 1);
|
||||
auto& skip_list = new_skip_list_header->skip_list;
|
||||
|
||||
// Add all current entries to the skip list
|
||||
@ -616,7 +629,7 @@ void HashLinkListRep::Insert(KeyHandle handle) {
|
||||
// Advance counter unless the bucket needs to be advanced to skip list.
|
||||
// In that case, we need to make sure the previous count never exceeds
|
||||
// threshold_use_skiplist_ to avoid readers to cast to wrong format.
|
||||
header->num_entries++;
|
||||
header->IncNumEntries();
|
||||
|
||||
Node* cur = first;
|
||||
Node* prev = nullptr;
|
||||
|
Loading…
Reference in New Issue
Block a user