From a5d585a8ef88f0e0888776b6fa1b58bf0c992c15 Mon Sep 17 00:00:00 2001 From: nitsanw Date: Fri, 20 Jun 2014 13:44:51 +0200 Subject: [PATCH] Fix false sharing between head and tail reference in MpscLinkedQueue Motivation: The tail node reference writes (by producer threads) are very likely to invalidate the cache line holding the headRef which is read by the consumer threads in order to access the padded reference to the head node. This is because the resulting layout for the object is: - header - Object AtomicReference.value -> Tail node - Object MpscLinkedQueue.headRef -> PaddedRef -> Head node This is 'passive' false sharing where one thread reads and the other writes. The current implementation suffers from further passive false sharing potential from any and all neighbours to the queue object as no pre/post padding is provided for the class fields. Modifications: Fix the memory layout by adding pre-post padding for the head node and putting the tail node reference in the same object. Result: Fixed false sharing --- .../netty/util/internal/MpscLinkedQueue.java | 90 ++++++++++++------- 1 file changed, 60 insertions(+), 30 deletions(-) diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java index 836d9c73c2..ca7a5907ab 100644 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java +++ b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java @@ -18,6 +18,8 @@ */ package io.netty.util.internal; +import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; + import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -28,7 +30,51 @@ import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +abstract class MpscLinkedQueuePad0 { + long p00, p01, p02, p03, p04, p05, p06, p07; + long p30, p31, p32, p33, p34, p35, p36, p37; +} + +abstract class MpscLinkedQueueHeadRef extends MpscLinkedQueuePad0 { + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater UPDATER = + newUpdater(MpscLinkedQueueHeadRef.class, MpscLinkedQueueNode.class, "headRef"); + private volatile MpscLinkedQueueNode headRef; + + protected final MpscLinkedQueueNode headRef() { + return headRef; + } + protected final void headRef(MpscLinkedQueueNode val) { + headRef = val; + } + protected final void lazySetHeadRef(MpscLinkedQueueNode newVal) { + UPDATER.lazySet(this, newVal); + } +} + +abstract class MpscLinkedQueuePad1 extends MpscLinkedQueueHeadRef { + long p00, p01, p02, p03, p04, p05, p06, p07; + long p30, p31, p32, p33, p34, p35, p36, p37; +} + +abstract class MpscLinkedQueueTailRef extends MpscLinkedQueuePad1 { + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater UPDATER = + newUpdater(MpscLinkedQueueTailRef.class, MpscLinkedQueueNode.class, "tailRef"); + private volatile MpscLinkedQueueNode tailRef; + protected final MpscLinkedQueueNode tailRef() { + return tailRef; + } + protected final void tailRef(MpscLinkedQueueNode val) { + tailRef = val; + } + @SuppressWarnings("unchecked") + protected final MpscLinkedQueueNode getAndSetTailRef(MpscLinkedQueueNode newVal) { + return (MpscLinkedQueueNode) UPDATER.getAndSet(this, newVal); + } +} /** * A lock-free concurrent single-consumer multi-producer {@link Queue}. @@ -64,11 +110,10 @@ import java.util.concurrent.atomic.AtomicReference; * + * data structure modified to avoid false sharing between head and tail Ref as per implementation of MpscLinkedQueue + * on JCTools project. */ -final class MpscLinkedQueue extends AtomicReference> implements Queue { - - private static final long serialVersionUID = -7505862422018495345L; - +public final class MpscLinkedQueue extends MpscLinkedQueueTailRef implements Queue { // offer() occurs at the tail of the linked list. // poll() occurs at the head of the linked list. // @@ -84,26 +129,10 @@ final class MpscLinkedQueue extends AtomicReference> i // // Also note that this class extends AtomicReference for the "tail" slot (which is the one that is appended to) // since Unsafe does not expose XCHG operation intrinsically. - - private final FullyPaddedReference> headRef; - MpscLinkedQueue() { MpscLinkedQueueNode tombstone = new DefaultNode(null); - headRef = new FullyPaddedReference>(); - headRef.set(tombstone); - setTail(tombstone); - } - - private MpscLinkedQueueNode getTail() { - return get(); - } - - private void setTail(MpscLinkedQueueNode tail) { - set(tail); - } - - private MpscLinkedQueueNode replaceTail(MpscLinkedQueueNode node) { - return getAndSet(node); + headRef(tombstone); + tailRef(tombstone); } /** @@ -111,12 +140,12 @@ final class MpscLinkedQueue extends AtomicReference> i */ private MpscLinkedQueueNode peekNode() { for (;;) { - final MpscLinkedQueueNode head = headRef.get(); + final MpscLinkedQueueNode head = headRef(); final MpscLinkedQueueNode next = head.next(); if (next != null) { return next; } - if (head == getTail()) { + if (head == tailRef()) { return null; } @@ -142,7 +171,7 @@ final class MpscLinkedQueue extends AtomicReference> i newTail = new DefaultNode(value); } - MpscLinkedQueueNode oldTail = replaceTail(newTail); + MpscLinkedQueueNode oldTail = getAndSetTailRef(newTail); oldTail.setNext(newTail); return true; } @@ -155,10 +184,11 @@ final class MpscLinkedQueue extends AtomicReference> i } // next becomes a new head. - MpscLinkedQueueNode oldHead = headRef.get(); + MpscLinkedQueueNode oldHead = headRef(); // Similar to 'headRef.node = next', but slightly faster (storestore vs loadstore) // See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html - headRef.lazySet(next); + // See: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html + lazySetHeadRef(next); // Break the linkage between the old head and the new head. oldHead.setNext(null); @@ -373,8 +403,8 @@ final class MpscLinkedQueue extends AtomicReference> i in.defaultReadObject(); final MpscLinkedQueueNode tombstone = new DefaultNode(null); - headRef.set(tombstone); - setTail(tombstone); + headRef(tombstone); + tailRef(tombstone); for (;;) { @SuppressWarnings("unchecked")