diff --git a/common/src/main/java/io/netty/util/HashedWheelTimer.java b/common/src/main/java/io/netty/util/HashedWheelTimer.java index ad835c515a..1c98980b22 100644 --- a/common/src/main/java/io/netty/util/HashedWheelTimer.java +++ b/common/src/main/java/io/netty/util/HashedWheelTimer.java @@ -15,7 +15,7 @@ */ package io.netty.util; -import io.netty.util.internal.MpscLinkedQueue; +import io.netty.util.internal.MpscLinkedQueueNode; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -444,7 +444,7 @@ public class HashedWheelTimer implements Timer { } } - private static final class HashedWheelTimeout extends MpscLinkedQueue.Node + private static final class HashedWheelTimeout extends MpscLinkedQueueNode implements Timeout { private static final int ST_INIT = 0; diff --git a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java index bea2fc7da3..f98870c235 100644 --- a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java +++ b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java @@ -17,7 +17,7 @@ package io.netty.util; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.internal.MpscLinkedQueue; +import io.netty.util.internal.MpscLinkedQueueNode; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -175,7 +175,7 @@ public final class ThreadDeathWatcher { } } - private static final class Entry extends MpscLinkedQueue.Node { + private static final class Entry extends MpscLinkedQueueNode { final Thread thread; final Runnable task; diff --git a/common/src/main/java/io/netty/util/internal/FullyPaddedReference.java b/common/src/main/java/io/netty/util/internal/FullyPaddedReference.java new file mode 100644 index 0000000000..ccc478eef8 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/FullyPaddedReference.java @@ -0,0 +1,24 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.internal; + +public final class FullyPaddedReference extends LeftPaddedReference { + private static final long serialVersionUID = -5986650399506826641L; + // cache line padding (must be public) + public transient long rp1, rp2, rp3, rp4, rp5, rp6, rp7; // 56 bytes (excluding LeftPaddedReference.referent) + public transient long rpA, rpB, rpC, rpD, rpE, rpF, rpG, rpH; // 64 bytes +} diff --git a/common/src/main/java/io/netty/util/internal/LeftPaddedReference.java b/common/src/main/java/io/netty/util/internal/LeftPaddedReference.java new file mode 100644 index 0000000000..6e50ba248d --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/LeftPaddedReference.java @@ -0,0 +1,69 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.internal; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +abstract class LeftPaddedReference extends LeftPadding { + + private static final long serialVersionUID = 6513142711280243198L; + + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater referentUpdater; + + static { + @SuppressWarnings("rawtypes") + AtomicReferenceFieldUpdater u; + u = PlatformDependent.newAtomicReferenceFieldUpdater(LeftPaddedReference.class, "referent"); + if (u == null) { + u = AtomicReferenceFieldUpdater.newUpdater(LeftPaddedReference.class, Object.class, "referent"); + } + referentUpdater = u; + } + + private volatile T referent; // 8-byte object field (or 4-byte + padding) + + public final T get() { + return referent; + } + + public final void set(T referent) { + this.referent = referent; + } + + public final void lazySet(T referent) { + referentUpdater.lazySet(this, referent); + } + + public final boolean compareAndSet(T expect, T update) { + return referentUpdater.compareAndSet(this, expect, update); + } + + public final boolean weakCompareAndSet(T expect, T update) { + return referentUpdater.weakCompareAndSet(this, expect, update); + } + + @SuppressWarnings("unchecked") + public final T getAndSet(T referent) { + return (T) referentUpdater.getAndSet(this, referent); + } + + @Override + public String toString() { + return String.valueOf(get()); + } +} diff --git a/common/src/main/java/io/netty/util/internal/LeftPadding.java b/common/src/main/java/io/netty/util/internal/LeftPadding.java new file mode 100644 index 0000000000..196108dfc1 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/LeftPadding.java @@ -0,0 +1,26 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.internal; + +import java.io.Serializable; + +abstract class LeftPadding implements Serializable { + private static final long serialVersionUID = -9129166504419549394L; + // cache line padding (must be public) + public transient long lp1, lp2, lp3, lp4, lp5, lp6; // 48 bytes (excluding 16-byte object header) + public transient long lpA, lpB, lpC, lpD, lpE, lpF, lpG, lpH; // 64 bytes +} diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java index b2bc425bd4..836d9c73c2 100644 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java +++ b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java @@ -18,7 +18,12 @@ */ package io.netty.util.internal; - +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; @@ -26,93 +31,144 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicReference; /** - * A lock-free concurrent {@link java.util.Queue} implementations for single-consumer multiple-producer pattern. - * It's important is is only used for this as otherwise it is not thread-safe. - * - * This implementation is based on: + * A lock-free concurrent single-consumer multi-producer {@link Queue}. + * It allows multiple producer threads to perform the following operations simultaneously: *
    - *
  • AbstractNodeQueue
  • - *
  • Non intrusive MPSC node based queue
  • + *
  • {@link #offer(Object)}, {@link #add(Object)}, and {@link #addAll(Collection)}
  • + *
  • All other read-only operations: + *
      + *
    • {@link #contains(Object)} and {@link #containsAll(Collection)}
    • + *
    • {@link #element()}, {@link #peek()}
    • + *
    • {@link #size()} and {@link #isEmpty()}
    • + *
    • {@link #iterator()} (except {@link Iterator#remove()}
    • + *
    • {@link #toArray()} and {@link #toArray(Object[])}
    • + *
    + *
  • + *
+ * .. while only one consumer thread is allowed to perform the following operations exclusively: + *
    + *
  • {@link #poll()} and {@link #remove()}
  • + *
  • {@link #remove(Object)}, {@link #removeAll(Collection)}, and {@link #retainAll(Collection)}
  • + *
  • {@link #clear()}
  • {@link #} *
* + * The behavior of this implementation is undefined if you perform the operations for a consumer thread only + * from multiple threads. + * + * The initial implementation is based on: + * + * and adopted padded head node changes from: + * */ -@SuppressWarnings("serial") -public final class MpscLinkedQueue extends AtomicReference> implements Queue { - private static final long tailOffset; +final class MpscLinkedQueue extends AtomicReference> implements Queue { - static { - try { - tailOffset = PlatformDependent.objectFieldOffset( - MpscLinkedQueue.class.getDeclaredField("tail")); - } catch (Throwable t) { - throw new ExceptionInInitializerError(t); - } - } + private static final long serialVersionUID = -7505862422018495345L; - // Extends AtomicReference for the "head" slot (which is the one that is appended to) - // since Unsafe does not expose XCHG operation intrinsically - @SuppressWarnings({ "unused", "FieldMayBeFinal" }) - private volatile Node tail; + // offer() occurs at the tail of the linked list. + // poll() occurs at the head of the linked list. + // + // Resulting layout is: + // + // head --next--> 1st element --next--> 2nd element --next--> ... tail (last element) + // + // where the head is a dummy node whose value is null. + // + // offer() appends a new node next to the tail using AtomicReference.getAndSet() + // poll() removes head from the linked list and promotes the 1st element to the head, + // setting its value to null if possible. + // + // Also note that this class extends AtomicReference for the "tail" slot (which is the one that is appended to) + // since Unsafe does not expose XCHG operation intrinsically. + + private final FullyPaddedReference> headRef; MpscLinkedQueue() { - final Node task = new DefaultNode(null); - tail = task; - set(task); + MpscLinkedQueueNode tombstone = new DefaultNode(null); + headRef = new FullyPaddedReference>(); + headRef.set(tombstone); + setTail(tombstone); } - @SuppressWarnings("unchecked") - @Override - public boolean add(T value) { - if (value instanceof Node) { - Node node = (Node) value; - node.setNext(null); - getAndSet(node).setNext(node); - } else { - final Node n = new DefaultNode(value); - getAndSet(n).setNext(n); + private MpscLinkedQueueNode getTail() { + return get(); + } + + private void setTail(MpscLinkedQueueNode tail) { + set(tail); + } + + private MpscLinkedQueueNode replaceTail(MpscLinkedQueueNode node) { + return getAndSet(node); + } + + /** + * Returns the node right next to the head, which contains the first element of this queue. + */ + private MpscLinkedQueueNode peekNode() { + for (;;) { + final MpscLinkedQueueNode head = headRef.get(); + final MpscLinkedQueueNode next = head.next(); + if (next != null) { + return next; + } + if (head == getTail()) { + return null; + } + + // If we are here, it means: + // * offer() is adding the first element, and + // * it's between replaceTail(newTail) and oldTail.setNext(newTail). + // (i.e. next == oldTail and oldTail.next == null and head == oldTail != newTail) } + } + + @Override + @SuppressWarnings("unchecked") + public boolean offer(E value) { + if (value == null) { + throw new NullPointerException("value"); + } + + final MpscLinkedQueueNode newTail; + if (value instanceof MpscLinkedQueueNode) { + newTail = (MpscLinkedQueueNode) value; + newTail.setNext(null); + } else { + newTail = new DefaultNode(value); + } + + MpscLinkedQueueNode oldTail = replaceTail(newTail); + oldTail.setNext(newTail); return true; } @Override - public boolean offer(T value) { - return add(value); - } - - @Override - public T remove() { - T v = poll(); - if (v == null) { - throw new NoSuchElementException(); - } - return v; - } - - @Override - public T poll() { - final Node next = peekNode(); + public E poll() { + final MpscLinkedQueueNode next = peekNode(); if (next == null) { return null; } - final Node ret = next; - PlatformDependent.putOrderedObject(this, tailOffset, next); - return ret.value(); + + // next becomes a new head. + MpscLinkedQueueNode oldHead = headRef.get(); + // Similar to 'headRef.node = next', but slightly faster (storestore vs loadstore) + // See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html + headRef.lazySet(next); + + // Break the linkage between the old head and the new head. + oldHead.setNext(null); + + return next.clearMaybe(); } @Override - public T element() { - final Node next = peekNode(); - if (next == null) { - throw new NoSuchElementException(); - } - return next.value(); - } - - @Override - public T peek() { - final Node next = peekNode(); + public E peek() { + final MpscLinkedQueueNode next = peekNode(); if (next == null) { return null; } @@ -122,36 +178,25 @@ public final class MpscLinkedQueue extends AtomicReference n = peekNode(); + MpscLinkedQueueNode n = peekNode(); for (;;) { if (n == null) { break; } - count++; + count ++; n = n.next(); } return count; } - @SuppressWarnings("unchecked") - private Node peekNode() { - for (;;) { - final Node tail = (Node) PlatformDependent.getObjectVolatile(this, tailOffset); - final Node next = tail.next(); - if (next != null || get() == tail) { - return next; - } - } - } - @Override public boolean isEmpty() { - return peek() == null; + return peekNode() == null; } @Override public boolean contains(Object o) { - Node n = peekNode(); + MpscLinkedQueueNode n = peekNode(); for (;;) { if (n == null) { break; @@ -165,29 +210,117 @@ public final class MpscLinkedQueue extends AtomicReference iterator() { - throw new UnsupportedOperationException(); + public Iterator iterator() { + return new Iterator() { + private MpscLinkedQueueNode node = peekNode(); + + @Override + public boolean hasNext() { + return node != null; + } + + @Override + public E next() { + MpscLinkedQueueNode node = this.node; + if (node == null) { + throw new NoSuchElementException(); + } + E value = node.value(); + this.node = node.next(); + return value; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public boolean add(E e) { + if (offer(e)) { + return true; + } + throw new IllegalStateException("queue full"); + } + + @Override + public E remove() { + E e = poll(); + if (e != null) { + return e; + } + throw new NoSuchElementException(); + } + + @Override + public E element() { + E e = peek(); + if (e != null) { + return e; + } + throw new NoSuchElementException(); } @Override public Object[] toArray() { - throw new UnsupportedOperationException(); + final Object[] array = new Object[size()]; + final Iterator it = iterator(); + for (int i = 0; i < array.length; i ++) { + if (it.hasNext()) { + array[i] = it.next(); + } else { + return Arrays.copyOf(array, i); + } + } + return array; } @Override + @SuppressWarnings("unchecked") public T[] toArray(T[] a) { - throw new UnsupportedOperationException(); + final int size = size(); + final T[] array; + if (a.length >= size) { + array = a; + } else { + array = (T[]) Array.newInstance(a.getClass().getComponentType(), size); + } + + final Iterator it = iterator(); + for (int i = 0; i < array.length; i++) { + if (it.hasNext()) { + array[i] = (T) it.next(); + } else { + if (a == array) { + array[i] = null; + return array; + } + + if (a.length < i) { + return Arrays.copyOf(array, i); + } + + System.arraycopy(array, 0, a, 0, i); + if (a.length > i) { + a[i] = null; + } + return a; + } + } + return array; } @Override public boolean remove(Object o) { - return false; + throw new UnsupportedOperationException(); } @Override public boolean containsAll(Collection c) { - for (Object o: c) { - if (!contains(o)) { + for (Object e: c) { + if (!contains(e)) { return false; } } @@ -195,34 +328,69 @@ public final class MpscLinkedQueue extends AtomicReference c) { - for (T r: c) { - add(r); + public boolean addAll(Collection c) { + if (c == null) { + throw new NullPointerException("c"); } - return false; + if (c == this) { + throw new IllegalArgumentException("c == this"); + } + + boolean modified = false; + for (E e: c) { + add(e); + modified = true; + } + return modified; } @Override public boolean removeAll(Collection c) { - return false; + throw new UnsupportedOperationException(); } @Override public boolean retainAll(Collection c) { - return false; + throw new UnsupportedOperationException(); } @Override public void clear() { - for (;;) { - if (poll() == null) { - break; - } + while (poll() != null) { + continue; } } - private static final class DefaultNode extends Node { - private final T value; + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + for (E e: this) { + out.writeObject(e); + } + out.writeObject(null); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + final MpscLinkedQueueNode tombstone = new DefaultNode(null); + headRef.set(tombstone); + setTail(tombstone); + + for (;;) { + @SuppressWarnings("unchecked") + E e = (E) in.readObject(); + if (e == null) { + break; + } + add(e); + } + } + + private static final class DefaultNode extends MpscLinkedQueueNode implements Serializable { + + private static final long serialVersionUID = 1006745279405945948L; + + private T value; DefaultNode(T value) { this.value = value; @@ -232,39 +400,12 @@ public final class MpscLinkedQueue extends AtomicReference { - - private static final long nextOffset; - - static { - if (PlatformDependent0.hasUnsafe()) { - try { - nextOffset = PlatformDependent.objectFieldOffset( - Node.class.getDeclaredField("tail")); - } catch (Throwable t) { - throw new ExceptionInInitializerError(t); - } - } else { - nextOffset = -1; - } + @Override + protected T clearMaybe() { + T value = this.value; + this.value = null; + return value; } - - @SuppressWarnings("unused") - private volatile Node tail; - - // Only use from MpscLinkedQueue and so we are sure Unsafe is present - @SuppressWarnings("unchecked") - final Node next() { - return (Node) PlatformDependent.getObjectVolatile(this, nextOffset); - } - - // Only use from MpscLinkedQueue and so we are sure Unsafe is present - final void setNext(final Node newNext) { - PlatformDependent.putOrderedObject(this, nextOffset, newNext); - } - - public abstract T value(); } } diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueueNode.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueueNode.java new file mode 100644 index 0000000000..433e0418ab --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/MpscLinkedQueueNode.java @@ -0,0 +1,58 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.internal; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +public abstract class MpscLinkedQueueNode { + + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater nextUpdater; + + static { + @SuppressWarnings("rawtypes") + AtomicReferenceFieldUpdater u; + + u = PlatformDependent.newAtomicReferenceFieldUpdater(MpscLinkedQueueNode.class, "next"); + if (u == null) { + u = AtomicReferenceFieldUpdater.newUpdater(MpscLinkedQueueNode.class, MpscLinkedQueueNode.class, "next"); + } + nextUpdater = u; + } + + @SuppressWarnings("unused") + private volatile MpscLinkedQueueNode next; + + final MpscLinkedQueueNode next() { + return next; + } + + final void setNext(final MpscLinkedQueueNode newNext) { + // Similar to 'next = newNext', but slightly faster (storestore vs loadstore) + // See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html + nextUpdater.lazySet(this, newNext); + } + + public abstract T value(); + + /** + * Sets the element this node contains to {@code null} so that the node can be used as a tombstone. + */ + protected T clearMaybe() { + return value(); + } +} diff --git a/common/src/main/java/io/netty/util/internal/OneTimeTask.java b/common/src/main/java/io/netty/util/internal/OneTimeTask.java index 69819a3fdf..d9473bd45c 100644 --- a/common/src/main/java/io/netty/util/internal/OneTimeTask.java +++ b/common/src/main/java/io/netty/util/internal/OneTimeTask.java @@ -23,7 +23,7 @@ import io.netty.util.concurrent.EventExecutor; * * It is important this will not be reused. After submitted it is not allowed to get submitted again! */ -public abstract class OneTimeTask extends MpscLinkedQueue.Node implements Runnable { +public abstract class OneTimeTask extends MpscLinkedQueueNode implements Runnable { @Override public Runnable value() { diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 733c356fba..bf26702650 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -35,7 +35,6 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -380,11 +379,7 @@ public final class PlatformDependent { * consumer (one thread!). */ public static Queue newMpscQueue() { - if (hasUnsafe()) { - return new MpscLinkedQueue(); - } else { - return new ConcurrentLinkedQueue(); - } + return new MpscLinkedQueue(); } /** diff --git a/common/src/main/java/io/netty/util/internal/RightPaddedReference.java b/common/src/main/java/io/netty/util/internal/RightPaddedReference.java new file mode 100644 index 0000000000..7f667e23a6 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/RightPaddedReference.java @@ -0,0 +1,33 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.internal; + +import java.util.concurrent.atomic.AtomicReference; + +public final class RightPaddedReference extends AtomicReference { + private static final long serialVersionUID = -467619563034125237L; + + // cache line padding (must be public) + public transient long rp1, rp2, rp3, rp4, rp5; // 40 bytes (excluding AtomicReference.value and object header) + public transient long rpA, rpB, rpC, rpD, rpE, rpF, rpG, rpH; // 64 bytes + + public RightPaddedReference() { } + + public RightPaddedReference(T initialValue) { + super(initialValue); + } +}