diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java index 836d9c73c2..ca7a5907ab 100644 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java +++ b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java @@ -18,6 +18,8 @@ */ package io.netty.util.internal; +import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; + import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -28,7 +30,51 @@ import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +abstract class MpscLinkedQueuePad0 { + long p00, p01, p02, p03, p04, p05, p06, p07; + long p30, p31, p32, p33, p34, p35, p36, p37; +} + +abstract class MpscLinkedQueueHeadRef extends MpscLinkedQueuePad0 { + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater UPDATER = + newUpdater(MpscLinkedQueueHeadRef.class, MpscLinkedQueueNode.class, "headRef"); + private volatile MpscLinkedQueueNode headRef; + + protected final MpscLinkedQueueNode headRef() { + return headRef; + } + protected final void headRef(MpscLinkedQueueNode val) { + headRef = val; + } + protected final void lazySetHeadRef(MpscLinkedQueueNode newVal) { + UPDATER.lazySet(this, newVal); + } +} + +abstract class MpscLinkedQueuePad1 extends MpscLinkedQueueHeadRef { + long p00, p01, p02, p03, p04, p05, p06, p07; + long p30, p31, p32, p33, p34, p35, p36, p37; +} + +abstract class MpscLinkedQueueTailRef extends MpscLinkedQueuePad1 { + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater UPDATER = + newUpdater(MpscLinkedQueueTailRef.class, MpscLinkedQueueNode.class, "tailRef"); + private volatile MpscLinkedQueueNode tailRef; + protected final MpscLinkedQueueNode tailRef() { + return tailRef; + } + protected final void tailRef(MpscLinkedQueueNode val) { + tailRef = val; + } + @SuppressWarnings("unchecked") + protected final MpscLinkedQueueNode getAndSetTailRef(MpscLinkedQueueNode newVal) { + return (MpscLinkedQueueNode) UPDATER.getAndSet(this, newVal); + } +} /** * A lock-free concurrent single-consumer multi-producer {@link Queue}. @@ -64,11 +110,10 @@ import java.util.concurrent.atomic.AtomicReference; * + * data structure modified to avoid false sharing between head and tail Ref as per implementation of MpscLinkedQueue + * on JCTools project. */ -final class MpscLinkedQueue extends AtomicReference> implements Queue { - - private static final long serialVersionUID = -7505862422018495345L; - +public final class MpscLinkedQueue extends MpscLinkedQueueTailRef implements Queue { // offer() occurs at the tail of the linked list. // poll() occurs at the head of the linked list. // @@ -84,26 +129,10 @@ final class MpscLinkedQueue extends AtomicReference> i // // Also note that this class extends AtomicReference for the "tail" slot (which is the one that is appended to) // since Unsafe does not expose XCHG operation intrinsically. - - private final FullyPaddedReference> headRef; - MpscLinkedQueue() { MpscLinkedQueueNode tombstone = new DefaultNode(null); - headRef = new FullyPaddedReference>(); - headRef.set(tombstone); - setTail(tombstone); - } - - private MpscLinkedQueueNode getTail() { - return get(); - } - - private void setTail(MpscLinkedQueueNode tail) { - set(tail); - } - - private MpscLinkedQueueNode replaceTail(MpscLinkedQueueNode node) { - return getAndSet(node); + headRef(tombstone); + tailRef(tombstone); } /** @@ -111,12 +140,12 @@ final class MpscLinkedQueue extends AtomicReference> i */ private MpscLinkedQueueNode peekNode() { for (;;) { - final MpscLinkedQueueNode head = headRef.get(); + final MpscLinkedQueueNode head = headRef(); final MpscLinkedQueueNode next = head.next(); if (next != null) { return next; } - if (head == getTail()) { + if (head == tailRef()) { return null; } @@ -142,7 +171,7 @@ final class MpscLinkedQueue extends AtomicReference> i newTail = new DefaultNode(value); } - MpscLinkedQueueNode oldTail = replaceTail(newTail); + MpscLinkedQueueNode oldTail = getAndSetTailRef(newTail); oldTail.setNext(newTail); return true; } @@ -155,10 +184,11 @@ final class MpscLinkedQueue extends AtomicReference> i } // next becomes a new head. - MpscLinkedQueueNode oldHead = headRef.get(); + MpscLinkedQueueNode oldHead = headRef(); // Similar to 'headRef.node = next', but slightly faster (storestore vs loadstore) // See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html - headRef.lazySet(next); + // See: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html + lazySetHeadRef(next); // Break the linkage between the old head and the new head. oldHead.setNext(null); @@ -373,8 +403,8 @@ final class MpscLinkedQueue extends AtomicReference> i in.defaultReadObject(); final MpscLinkedQueueNode tombstone = new DefaultNode(null); - headRef.set(tombstone); - setTail(tombstone); + headRef(tombstone); + tailRef(tombstone); for (;;) { @SuppressWarnings("unchecked")