Fix false sharing between head and tail reference in MpscLinkedQueue

Motivation:

The tail node reference writes (by producer threads) are very likely to
invalidate the cache line holding the headRef which is read by the
consumer threads in order to access the padded reference to the head
node. This is because the resulting layout for the object is:

- header
- Object AtomicReference.value -> Tail node
- Object MpscLinkedQueue.headRef -> PaddedRef -> Head node

This is 'passive' false sharing where one thread reads and the other
writes.  The current implementation suffers from further passive false
sharing potential from any and all neighbours to the queue object as no
pre/post padding is provided for the class fields.

Modifications:

Fix the memory layout by adding pre-post padding for the head node and
putting the tail node reference in the same object.

Result:

Fixed false sharing
This commit is contained in:
nitsanw 2014-06-20 13:44:51 +02:00 committed by Trustin Lee
parent c65d5b170d
commit 32aab3b0b3

View File

@ -18,6 +18,8 @@
*/
package io.netty.util.internal;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@ -28,7 +30,51 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
abstract class MpscLinkedQueuePad0<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}
abstract class MpscLinkedQueueHeadRef<E> extends MpscLinkedQueuePad0<E> {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<MpscLinkedQueueHeadRef, MpscLinkedQueueNode> UPDATER =
newUpdater(MpscLinkedQueueHeadRef.class, MpscLinkedQueueNode.class, "headRef");
private volatile MpscLinkedQueueNode<E> headRef;
protected final MpscLinkedQueueNode<E> headRef() {
return headRef;
}
protected final void headRef(MpscLinkedQueueNode<E> val) {
headRef = val;
}
protected final void lazySetHeadRef(MpscLinkedQueueNode<E> newVal) {
UPDATER.lazySet(this, newVal);
}
}
abstract class MpscLinkedQueuePad1<E> extends MpscLinkedQueueHeadRef<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}
abstract class MpscLinkedQueueTailRef<E> extends MpscLinkedQueuePad1<E> {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<MpscLinkedQueueTailRef, MpscLinkedQueueNode> UPDATER =
newUpdater(MpscLinkedQueueTailRef.class, MpscLinkedQueueNode.class, "tailRef");
private volatile MpscLinkedQueueNode<E> tailRef;
protected final MpscLinkedQueueNode<E> tailRef() {
return tailRef;
}
protected final void tailRef(MpscLinkedQueueNode<E> val) {
tailRef = val;
}
@SuppressWarnings("unchecked")
protected final MpscLinkedQueueNode<E> getAndSetTailRef(MpscLinkedQueueNode<E> newVal) {
return (MpscLinkedQueueNode<E>) UPDATER.getAndSet(this, newVal);
}
}
/**
* A lock-free concurrent single-consumer multi-producer {@link Queue}.
@ -64,11 +110,10 @@ import java.util.concurrent.atomic.AtomicReference;
* <ul>
* <li><a href="http://goo.gl/bD5ZUV">MpscPaddedQueue</a> from RxJava</li>
* </ul>
* data structure modified to avoid false sharing between head and tail Ref as per implementation of MpscLinkedQueue
* on <a href="https://github.com/JCTools/JCTools">JCTools project</a>.
*/
final class MpscLinkedQueue<E> extends AtomicReference<MpscLinkedQueueNode<E>> implements Queue<E> {
private static final long serialVersionUID = -7505862422018495345L;
public final class MpscLinkedQueue<E> extends MpscLinkedQueueTailRef<E> implements Queue<E> {
// offer() occurs at the tail of the linked list.
// poll() occurs at the head of the linked list.
//
@ -84,26 +129,10 @@ final class MpscLinkedQueue<E> extends AtomicReference<MpscLinkedQueueNode<E>> i
//
// Also note that this class extends AtomicReference for the "tail" slot (which is the one that is appended to)
// since Unsafe does not expose XCHG operation intrinsically.
private final FullyPaddedReference<MpscLinkedQueueNode<E>> headRef;
MpscLinkedQueue() {
MpscLinkedQueueNode<E> tombstone = new DefaultNode<E>(null);
headRef = new FullyPaddedReference<MpscLinkedQueueNode<E>>();
headRef.set(tombstone);
setTail(tombstone);
}
private MpscLinkedQueueNode<E> getTail() {
return get();
}
private void setTail(MpscLinkedQueueNode<E> tail) {
set(tail);
}
private MpscLinkedQueueNode<E> replaceTail(MpscLinkedQueueNode<E> node) {
return getAndSet(node);
headRef(tombstone);
tailRef(tombstone);
}
/**
@ -111,12 +140,12 @@ final class MpscLinkedQueue<E> extends AtomicReference<MpscLinkedQueueNode<E>> i
*/
private MpscLinkedQueueNode<E> peekNode() {
for (;;) {
final MpscLinkedQueueNode<E> head = headRef.get();
final MpscLinkedQueueNode<E> head = headRef();
final MpscLinkedQueueNode<E> next = head.next();
if (next != null) {
return next;
}
if (head == getTail()) {
if (head == tailRef()) {
return null;
}
@ -142,7 +171,7 @@ final class MpscLinkedQueue<E> extends AtomicReference<MpscLinkedQueueNode<E>> i
newTail = new DefaultNode<E>(value);
}
MpscLinkedQueueNode<E> oldTail = replaceTail(newTail);
MpscLinkedQueueNode<E> oldTail = getAndSetTailRef(newTail);
oldTail.setNext(newTail);
return true;
}
@ -155,10 +184,11 @@ final class MpscLinkedQueue<E> extends AtomicReference<MpscLinkedQueueNode<E>> i
}
// next becomes a new head.
MpscLinkedQueueNode<E> oldHead = headRef.get();
MpscLinkedQueueNode<E> oldHead = headRef();
// Similar to 'headRef.node = next', but slightly faster (storestore vs loadstore)
// See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html
headRef.lazySet(next);
// See: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html
lazySetHeadRef(next);
// Break the linkage between the old head and the new head.
oldHead.setNext(null);
@ -373,8 +403,8 @@ final class MpscLinkedQueue<E> extends AtomicReference<MpscLinkedQueueNode<E>> i
in.defaultReadObject();
final MpscLinkedQueueNode<E> tombstone = new DefaultNode<E>(null);
headRef.set(tombstone);
setTail(tombstone);
headRef(tombstone);
tailRef(tombstone);
for (;;) {
@SuppressWarnings("unchecked")