Clean up MpscLinkedQueue, fix its leak, and make it work without Unsafe
Motivation: MpscLinkedQueue has various issues: - It does not work without sun.misc.Unsafe. - Some field names are confusing. - Node.tail does not refer to the tail node really. - The tail node is the starting point of iteration. I think the tail node should be the head node and vice versa to reduce confusion. - Some important methods are not implemented (e.g. iterator()) - Not serializable - Potential false cache sharing problem due to lack of padding - MpscLinkedQueue extends AtomicReference and thus exposes various operations that mutates the internal state of the queue directly. Modifications: - Use AtomicReferenceFieldUpdater wherever possible so that we do not use Unsafe directly. (e.g. use lazySet() instead of putOrderedObject) - Extend AbstractQueue to implement most operations - Implement serialization and iterator() - Rename tail to head and head to tail to reduce confusion. - Rename Node.tail to Node.next. - Fix a leak where the references in the removed head are not cleared properly. - Add Node.clearMaybe() method so that the value of the new head node is cleared if possible. - Add some comments for my own educational purposes - Add padding to the head node - Add FullyPaddedReference and RightPaddedReference for future reuse - Make MpscLinkedQueue package-local so that a user cannot access the dangerous yet public operations exposed by the superclass. - MpscLinkedQueue.Node becomes MpscLinkedQueueNode, a top level class Result: - It's more like a drop-in replacement of ConcurrentLinkedQueue for the MPSC case. - Works without sun.misc.Unsafe - Code potentially easier to understand - Fixed leak (related: #2372)
This commit is contained in:
parent
72a4e8c178
commit
7ce8dca97c
@ -15,7 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.util;
|
package io.netty.util;
|
||||||
|
|
||||||
import io.netty.util.internal.MpscLinkedQueue;
|
import io.netty.util.internal.MpscLinkedQueueNode;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
@ -444,7 +444,7 @@ public class HashedWheelTimer implements Timer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class HashedWheelTimeout extends MpscLinkedQueue.Node<Timeout>
|
private static final class HashedWheelTimeout extends MpscLinkedQueueNode<Timeout>
|
||||||
implements Timeout {
|
implements Timeout {
|
||||||
|
|
||||||
private static final int ST_INIT = 0;
|
private static final int ST_INIT = 0;
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
package io.netty.util;
|
package io.netty.util;
|
||||||
|
|
||||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||||
import io.netty.util.internal.MpscLinkedQueue;
|
import io.netty.util.internal.MpscLinkedQueueNode;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
@ -175,7 +175,7 @@ public final class ThreadDeathWatcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class Entry extends MpscLinkedQueue.Node<Entry> {
|
private static final class Entry extends MpscLinkedQueueNode<Entry> {
|
||||||
final Thread thread;
|
final Thread thread;
|
||||||
final Runnable task;
|
final Runnable task;
|
||||||
|
|
||||||
|
@ -0,0 +1,24 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2014 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.util.internal;
|
||||||
|
|
||||||
|
public final class FullyPaddedReference<T> extends LeftPaddedReference<T> {
|
||||||
|
private static final long serialVersionUID = -5986650399506826641L;
|
||||||
|
// cache line padding (must be public)
|
||||||
|
public transient long rp1, rp2, rp3, rp4, rp5, rp6, rp7; // 56 bytes (excluding LeftPaddedReference.referent)
|
||||||
|
public transient long rpA, rpB, rpC, rpD, rpE, rpF, rpG, rpH; // 64 bytes
|
||||||
|
}
|
@ -0,0 +1,69 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2014 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.util.internal;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||||
|
|
||||||
|
abstract class LeftPaddedReference<T> extends LeftPadding {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 6513142711280243198L;
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
private static final AtomicReferenceFieldUpdater<LeftPaddedReference, Object> referentUpdater;
|
||||||
|
|
||||||
|
static {
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
AtomicReferenceFieldUpdater<LeftPaddedReference, Object> u;
|
||||||
|
u = PlatformDependent.newAtomicReferenceFieldUpdater(LeftPaddedReference.class, "referent");
|
||||||
|
if (u == null) {
|
||||||
|
u = AtomicReferenceFieldUpdater.newUpdater(LeftPaddedReference.class, Object.class, "referent");
|
||||||
|
}
|
||||||
|
referentUpdater = u;
|
||||||
|
}
|
||||||
|
|
||||||
|
private volatile T referent; // 8-byte object field (or 4-byte + padding)
|
||||||
|
|
||||||
|
public final T get() {
|
||||||
|
return referent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final void set(T referent) {
|
||||||
|
this.referent = referent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final void lazySet(T referent) {
|
||||||
|
referentUpdater.lazySet(this, referent);
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean compareAndSet(T expect, T update) {
|
||||||
|
return referentUpdater.compareAndSet(this, expect, update);
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean weakCompareAndSet(T expect, T update) {
|
||||||
|
return referentUpdater.weakCompareAndSet(this, expect, update);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public final T getAndSet(T referent) {
|
||||||
|
return (T) referentUpdater.getAndSet(this, referent);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.valueOf(get());
|
||||||
|
}
|
||||||
|
}
|
26
common/src/main/java/io/netty/util/internal/LeftPadding.java
Normal file
26
common/src/main/java/io/netty/util/internal/LeftPadding.java
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2014 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.util.internal;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
abstract class LeftPadding implements Serializable {
|
||||||
|
private static final long serialVersionUID = -9129166504419549394L;
|
||||||
|
// cache line padding (must be public)
|
||||||
|
public transient long lp1, lp2, lp3, lp4, lp5, lp6; // 48 bytes (excluding 16-byte object header)
|
||||||
|
public transient long lpA, lpB, lpC, lpD, lpE, lpF, lpG, lpH; // 64 bytes
|
||||||
|
}
|
@ -18,7 +18,12 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.util.internal;
|
package io.netty.util.internal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectInputStream;
|
||||||
|
import java.io.ObjectOutputStream;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.lang.reflect.Array;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
@ -26,93 +31,144 @@ import java.util.Queue;
|
|||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A lock-free concurrent {@link java.util.Queue} implementations for single-consumer multiple-producer pattern.
|
* A lock-free concurrent single-consumer multi-producer {@link Queue}.
|
||||||
* <strong>It's important is is only used for this as otherwise it is not thread-safe.</strong>
|
* It allows multiple producer threads to perform the following operations simultaneously:
|
||||||
*
|
|
||||||
* This implementation is based on:
|
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li><a href="https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/
|
* <li>{@link #offer(Object)}, {@link #add(Object)}, and {@link #addAll(Collection)}</li>
|
||||||
* AbstractNodeQueue.java">AbstractNodeQueue</a></li>
|
* <li>All other read-only operations:
|
||||||
* <li><a href="http://www.1024cores.net/home/lock-free-algorithms/
|
* <ul>
|
||||||
* queues/non-intrusive-mpsc-node-based-queue">Non intrusive MPSC node based queue</a></li>
|
* <li>{@link #contains(Object)} and {@link #containsAll(Collection)}</li>
|
||||||
|
* <li>{@link #element()}, {@link #peek()}</li>
|
||||||
|
* <li>{@link #size()} and {@link #isEmpty()}</li>
|
||||||
|
* <li>{@link #iterator()} (except {@link Iterator#remove()}</li>
|
||||||
|
* <li>{@link #toArray()} and {@link #toArray(Object[])}</li>
|
||||||
|
* </ul>
|
||||||
|
* </li>
|
||||||
|
* </ul>
|
||||||
|
* .. while only one consumer thread is allowed to perform the following operations exclusively:
|
||||||
|
* <ul>
|
||||||
|
* <li>{@link #poll()} and {@link #remove()}</li>
|
||||||
|
* <li>{@link #remove(Object)}, {@link #removeAll(Collection)}, and {@link #retainAll(Collection)}</li>
|
||||||
|
* <li>{@link #clear()}</li> {@link #}
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
|
* <strong>The behavior of this implementation is undefined if you perform the operations for a consumer thread only
|
||||||
|
* from multiple threads.</strong>
|
||||||
|
*
|
||||||
|
* The initial implementation is based on:
|
||||||
|
* <ul>
|
||||||
|
* <li><a href="http://goo.gl/sZE3ie">Non-intrusive MPSC node based queue</a> from 1024cores.net</li>
|
||||||
|
* <li><a href="http://goo.gl/O0spmV">AbstractNodeQueue</a> from Akka</li>
|
||||||
|
* </ul>
|
||||||
|
* and adopted padded head node changes from:
|
||||||
|
* <ul>
|
||||||
|
* <li><a href="http://goo.gl/bD5ZUV">MpscPaddedQueue</a> from RxJava</li>
|
||||||
|
* </ul>
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("serial")
|
final class MpscLinkedQueue<E> extends AtomicReference<MpscLinkedQueueNode<E>> implements Queue<E> {
|
||||||
public final class MpscLinkedQueue<T> extends AtomicReference<MpscLinkedQueue.Node<T>> implements Queue<T> {
|
|
||||||
private static final long tailOffset;
|
|
||||||
|
|
||||||
static {
|
private static final long serialVersionUID = -7505862422018495345L;
|
||||||
try {
|
|
||||||
tailOffset = PlatformDependent.objectFieldOffset(
|
|
||||||
MpscLinkedQueue.class.getDeclaredField("tail"));
|
|
||||||
} catch (Throwable t) {
|
|
||||||
throw new ExceptionInInitializerError(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extends AtomicReference for the "head" slot (which is the one that is appended to)
|
// offer() occurs at the tail of the linked list.
|
||||||
// since Unsafe does not expose XCHG operation intrinsically
|
// poll() occurs at the head of the linked list.
|
||||||
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
|
//
|
||||||
private volatile Node<T> tail;
|
// Resulting layout is:
|
||||||
|
//
|
||||||
|
// head --next--> 1st element --next--> 2nd element --next--> ... tail (last element)
|
||||||
|
//
|
||||||
|
// where the head is a dummy node whose value is null.
|
||||||
|
//
|
||||||
|
// offer() appends a new node next to the tail using AtomicReference.getAndSet()
|
||||||
|
// poll() removes head from the linked list and promotes the 1st element to the head,
|
||||||
|
// setting its value to null if possible.
|
||||||
|
//
|
||||||
|
// Also note that this class extends AtomicReference for the "tail" slot (which is the one that is appended to)
|
||||||
|
// since Unsafe does not expose XCHG operation intrinsically.
|
||||||
|
|
||||||
|
private final FullyPaddedReference<MpscLinkedQueueNode<E>> headRef;
|
||||||
|
|
||||||
MpscLinkedQueue() {
|
MpscLinkedQueue() {
|
||||||
final Node<T> task = new DefaultNode<T>(null);
|
MpscLinkedQueueNode<E> tombstone = new DefaultNode<E>(null);
|
||||||
tail = task;
|
headRef = new FullyPaddedReference<MpscLinkedQueueNode<E>>();
|
||||||
set(task);
|
headRef.set(tombstone);
|
||||||
|
setTail(tombstone);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
private MpscLinkedQueueNode<E> getTail() {
|
||||||
@Override
|
return get();
|
||||||
public boolean add(T value) {
|
}
|
||||||
if (value instanceof Node) {
|
|
||||||
Node<T> node = (Node<T>) value;
|
private void setTail(MpscLinkedQueueNode<E> tail) {
|
||||||
node.setNext(null);
|
set(tail);
|
||||||
getAndSet(node).setNext(node);
|
}
|
||||||
} else {
|
|
||||||
final Node<T> n = new DefaultNode<T>(value);
|
private MpscLinkedQueueNode<E> replaceTail(MpscLinkedQueueNode<E> node) {
|
||||||
getAndSet(n).setNext(n);
|
return getAndSet(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the node right next to the head, which contains the first element of this queue.
|
||||||
|
*/
|
||||||
|
private MpscLinkedQueueNode<E> peekNode() {
|
||||||
|
for (;;) {
|
||||||
|
final MpscLinkedQueueNode<E> head = headRef.get();
|
||||||
|
final MpscLinkedQueueNode<E> next = head.next();
|
||||||
|
if (next != null) {
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
if (head == getTail()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we are here, it means:
|
||||||
|
// * offer() is adding the first element, and
|
||||||
|
// * it's between replaceTail(newTail) and oldTail.setNext(newTail).
|
||||||
|
// (i.e. next == oldTail and oldTail.next == null and head == oldTail != newTail)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public boolean offer(E value) {
|
||||||
|
if (value == null) {
|
||||||
|
throw new NullPointerException("value");
|
||||||
|
}
|
||||||
|
|
||||||
|
final MpscLinkedQueueNode<E> newTail;
|
||||||
|
if (value instanceof MpscLinkedQueueNode) {
|
||||||
|
newTail = (MpscLinkedQueueNode<E>) value;
|
||||||
|
newTail.setNext(null);
|
||||||
|
} else {
|
||||||
|
newTail = new DefaultNode<E>(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
MpscLinkedQueueNode<E> oldTail = replaceTail(newTail);
|
||||||
|
oldTail.setNext(newTail);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean offer(T value) {
|
public E poll() {
|
||||||
return add(value);
|
final MpscLinkedQueueNode<E> next = peekNode();
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T remove() {
|
|
||||||
T v = poll();
|
|
||||||
if (v == null) {
|
|
||||||
throw new NoSuchElementException();
|
|
||||||
}
|
|
||||||
return v;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T poll() {
|
|
||||||
final Node<T> next = peekNode();
|
|
||||||
if (next == null) {
|
if (next == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
final Node<T> ret = next;
|
|
||||||
PlatformDependent.putOrderedObject(this, tailOffset, next);
|
// next becomes a new head.
|
||||||
return ret.value();
|
MpscLinkedQueueNode<E> oldHead = headRef.get();
|
||||||
|
// Similar to 'headRef.node = next', but slightly faster (storestore vs loadstore)
|
||||||
|
// See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html
|
||||||
|
headRef.lazySet(next);
|
||||||
|
|
||||||
|
// Break the linkage between the old head and the new head.
|
||||||
|
oldHead.setNext(null);
|
||||||
|
|
||||||
|
return next.clearMaybe();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public T element() {
|
public E peek() {
|
||||||
final Node<T> next = peekNode();
|
final MpscLinkedQueueNode<E> next = peekNode();
|
||||||
if (next == null) {
|
|
||||||
throw new NoSuchElementException();
|
|
||||||
}
|
|
||||||
return next.value();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T peek() {
|
|
||||||
final Node<T> next = peekNode();
|
|
||||||
if (next == null) {
|
if (next == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -122,36 +178,25 @@ public final class MpscLinkedQueue<T> extends AtomicReference<MpscLinkedQueue.No
|
|||||||
@Override
|
@Override
|
||||||
public int size() {
|
public int size() {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
Node<T> n = peekNode();
|
MpscLinkedQueueNode<E> n = peekNode();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (n == null) {
|
if (n == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
count++;
|
count ++;
|
||||||
n = n.next();
|
n = n.next();
|
||||||
}
|
}
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private Node<T> peekNode() {
|
|
||||||
for (;;) {
|
|
||||||
final Node<T> tail = (Node<T>) PlatformDependent.getObjectVolatile(this, tailOffset);
|
|
||||||
final Node<T> next = tail.next();
|
|
||||||
if (next != null || get() == tail) {
|
|
||||||
return next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isEmpty() {
|
public boolean isEmpty() {
|
||||||
return peek() == null;
|
return peekNode() == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean contains(Object o) {
|
public boolean contains(Object o) {
|
||||||
Node<T> n = peekNode();
|
MpscLinkedQueueNode<E> n = peekNode();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (n == null) {
|
if (n == null) {
|
||||||
break;
|
break;
|
||||||
@ -165,29 +210,117 @@ public final class MpscLinkedQueue<T> extends AtomicReference<MpscLinkedQueue.No
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<T> iterator() {
|
public Iterator<E> iterator() {
|
||||||
throw new UnsupportedOperationException();
|
return new Iterator<E>() {
|
||||||
|
private MpscLinkedQueueNode<E> node = peekNode();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return node != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E next() {
|
||||||
|
MpscLinkedQueueNode<E> node = this.node;
|
||||||
|
if (node == null) {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
E value = node.value();
|
||||||
|
this.node = node.next();
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean add(E e) {
|
||||||
|
if (offer(e)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
throw new IllegalStateException("queue full");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E remove() {
|
||||||
|
E e = poll();
|
||||||
|
if (e != null) {
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E element() {
|
||||||
|
E e = peek();
|
||||||
|
if (e != null) {
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
throw new NoSuchElementException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object[] toArray() {
|
public Object[] toArray() {
|
||||||
throw new UnsupportedOperationException();
|
final Object[] array = new Object[size()];
|
||||||
|
final Iterator<E> it = iterator();
|
||||||
|
for (int i = 0; i < array.length; i ++) {
|
||||||
|
if (it.hasNext()) {
|
||||||
|
array[i] = it.next();
|
||||||
|
} else {
|
||||||
|
return Arrays.copyOf(array, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return array;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public <T> T[] toArray(T[] a) {
|
public <T> T[] toArray(T[] a) {
|
||||||
throw new UnsupportedOperationException();
|
final int size = size();
|
||||||
|
final T[] array;
|
||||||
|
if (a.length >= size) {
|
||||||
|
array = a;
|
||||||
|
} else {
|
||||||
|
array = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Iterator<E> it = iterator();
|
||||||
|
for (int i = 0; i < array.length; i++) {
|
||||||
|
if (it.hasNext()) {
|
||||||
|
array[i] = (T) it.next();
|
||||||
|
} else {
|
||||||
|
if (a == array) {
|
||||||
|
array[i] = null;
|
||||||
|
return array;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (a.length < i) {
|
||||||
|
return Arrays.copyOf(array, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
System.arraycopy(array, 0, a, 0, i);
|
||||||
|
if (a.length > i) {
|
||||||
|
a[i] = null;
|
||||||
|
}
|
||||||
|
return a;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return array;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean remove(Object o) {
|
public boolean remove(Object o) {
|
||||||
return false;
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean containsAll(Collection<?> c) {
|
public boolean containsAll(Collection<?> c) {
|
||||||
for (Object o: c) {
|
for (Object e: c) {
|
||||||
if (!contains(o)) {
|
if (!contains(e)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -195,34 +328,69 @@ public final class MpscLinkedQueue<T> extends AtomicReference<MpscLinkedQueue.No
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean addAll(Collection<? extends T> c) {
|
public boolean addAll(Collection<? extends E> c) {
|
||||||
for (T r: c) {
|
if (c == null) {
|
||||||
add(r);
|
throw new NullPointerException("c");
|
||||||
}
|
}
|
||||||
return false;
|
if (c == this) {
|
||||||
|
throw new IllegalArgumentException("c == this");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean modified = false;
|
||||||
|
for (E e: c) {
|
||||||
|
add(e);
|
||||||
|
modified = true;
|
||||||
|
}
|
||||||
|
return modified;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean removeAll(Collection<?> c) {
|
public boolean removeAll(Collection<?> c) {
|
||||||
return false;
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean retainAll(Collection<?> c) {
|
public boolean retainAll(Collection<?> c) {
|
||||||
return false;
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clear() {
|
public void clear() {
|
||||||
for (;;) {
|
while (poll() != null) {
|
||||||
if (poll() == null) {
|
continue;
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class DefaultNode<T> extends Node<T> {
|
private void writeObject(ObjectOutputStream out) throws IOException {
|
||||||
private final T value;
|
out.defaultWriteObject();
|
||||||
|
for (E e: this) {
|
||||||
|
out.writeObject(e);
|
||||||
|
}
|
||||||
|
out.writeObject(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
|
||||||
|
in.defaultReadObject();
|
||||||
|
|
||||||
|
final MpscLinkedQueueNode<E> tombstone = new DefaultNode<E>(null);
|
||||||
|
headRef.set(tombstone);
|
||||||
|
setTail(tombstone);
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
E e = (E) in.readObject();
|
||||||
|
if (e == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
add(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class DefaultNode<T> extends MpscLinkedQueueNode<T> implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1006745279405945948L;
|
||||||
|
|
||||||
|
private T value;
|
||||||
|
|
||||||
DefaultNode(T value) {
|
DefaultNode(T value) {
|
||||||
this.value = value;
|
this.value = value;
|
||||||
@ -232,39 +400,12 @@ public final class MpscLinkedQueue<T> extends AtomicReference<MpscLinkedQueue.No
|
|||||||
public T value() {
|
public T value() {
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
public abstract static class Node<T> {
|
@Override
|
||||||
|
protected T clearMaybe() {
|
||||||
private static final long nextOffset;
|
T value = this.value;
|
||||||
|
this.value = null;
|
||||||
static {
|
return value;
|
||||||
if (PlatformDependent0.hasUnsafe()) {
|
|
||||||
try {
|
|
||||||
nextOffset = PlatformDependent.objectFieldOffset(
|
|
||||||
Node.class.getDeclaredField("tail"));
|
|
||||||
} catch (Throwable t) {
|
|
||||||
throw new ExceptionInInitializerError(t);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
nextOffset = -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
|
||||||
private volatile Node<T> tail;
|
|
||||||
|
|
||||||
// Only use from MpscLinkedQueue and so we are sure Unsafe is present
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
final Node<T> next() {
|
|
||||||
return (Node<T>) PlatformDependent.getObjectVolatile(this, nextOffset);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only use from MpscLinkedQueue and so we are sure Unsafe is present
|
|
||||||
final void setNext(final Node<T> newNext) {
|
|
||||||
PlatformDependent.putOrderedObject(this, nextOffset, newNext);
|
|
||||||
}
|
|
||||||
|
|
||||||
public abstract T value();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,58 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2014 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.util.internal;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||||
|
|
||||||
|
public abstract class MpscLinkedQueueNode<T> {
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
private static final AtomicReferenceFieldUpdater<MpscLinkedQueueNode, MpscLinkedQueueNode> nextUpdater;
|
||||||
|
|
||||||
|
static {
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
AtomicReferenceFieldUpdater<MpscLinkedQueueNode, MpscLinkedQueueNode> u;
|
||||||
|
|
||||||
|
u = PlatformDependent.newAtomicReferenceFieldUpdater(MpscLinkedQueueNode.class, "next");
|
||||||
|
if (u == null) {
|
||||||
|
u = AtomicReferenceFieldUpdater.newUpdater(MpscLinkedQueueNode.class, MpscLinkedQueueNode.class, "next");
|
||||||
|
}
|
||||||
|
nextUpdater = u;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
private volatile MpscLinkedQueueNode<T> next;
|
||||||
|
|
||||||
|
final MpscLinkedQueueNode<T> next() {
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
final void setNext(final MpscLinkedQueueNode<T> newNext) {
|
||||||
|
// Similar to 'next = newNext', but slightly faster (storestore vs loadstore)
|
||||||
|
// See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html
|
||||||
|
nextUpdater.lazySet(this, newNext);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract T value();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the element this node contains to {@code null} so that the node can be used as a tombstone.
|
||||||
|
*/
|
||||||
|
protected T clearMaybe() {
|
||||||
|
return value();
|
||||||
|
}
|
||||||
|
}
|
@ -23,7 +23,7 @@ import io.netty.util.concurrent.EventExecutor;
|
|||||||
*
|
*
|
||||||
* <strong>It is important this will not be reused. After submitted it is not allowed to get submitted again!</strong>
|
* <strong>It is important this will not be reused. After submitted it is not allowed to get submitted again!</strong>
|
||||||
*/
|
*/
|
||||||
public abstract class OneTimeTask extends MpscLinkedQueue.Node<Runnable> implements Runnable {
|
public abstract class OneTimeTask extends MpscLinkedQueueNode<Runnable> implements Runnable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Runnable value() {
|
public Runnable value() {
|
||||||
|
@ -35,7 +35,6 @@ import java.util.Map;
|
|||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||||
@ -380,11 +379,7 @@ public final class PlatformDependent {
|
|||||||
* consumer (one thread!).
|
* consumer (one thread!).
|
||||||
*/
|
*/
|
||||||
public static <T> Queue<T> newMpscQueue() {
|
public static <T> Queue<T> newMpscQueue() {
|
||||||
if (hasUnsafe()) {
|
return new MpscLinkedQueue<T>();
|
||||||
return new MpscLinkedQueue<T>();
|
|
||||||
} else {
|
|
||||||
return new ConcurrentLinkedQueue<T>();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,33 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2014 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.util.internal;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
public final class RightPaddedReference<T> extends AtomicReference<T> {
|
||||||
|
private static final long serialVersionUID = -467619563034125237L;
|
||||||
|
|
||||||
|
// cache line padding (must be public)
|
||||||
|
public transient long rp1, rp2, rp3, rp4, rp5; // 40 bytes (excluding AtomicReference.value and object header)
|
||||||
|
public transient long rpA, rpB, rpC, rpD, rpE, rpF, rpG, rpH; // 64 bytes
|
||||||
|
|
||||||
|
public RightPaddedReference() { }
|
||||||
|
|
||||||
|
public RightPaddedReference(T initialValue) {
|
||||||
|
super(initialValue);
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user