From b2be5f50586fc19c526dfbf621764844b06a4fc6 Mon Sep 17 00:00:00 2001 From: norman Date: Wed, 7 Dec 2011 17:14:11 +0100 Subject: [PATCH] Backport latest LinkedTransferQueue to 3.2.x. This also includes changes to allow the usage in java5 enviroments. Be aware that the new LinkedTransferQueue will only be used when java6+ was found. See #102 --- pom.xml | 17 + .../channel/local/DefaultLocalChannel.java | 4 +- .../nio/NioClientSocketPipelineSink.java | 4 +- .../socket/nio/NioDatagramChannel.java | 7 +- .../channel/socket/nio/NioDatagramWorker.java | 6 +- .../channel/socket/nio/NioSocketChannel.java | 9 +- .../netty/channel/socket/nio/NioWorker.java | 6 +- .../handler/codec/http/HttpClientCodec.java | 4 +- .../codec/http/HttpContentEncoder.java | 4 +- .../MemoryAwareThreadPoolExecutor.java | 4 +- .../OrderedMemoryAwareThreadPoolExecutor.java | 4 +- .../handler/queue/BlockingReadHandler.java | 4 +- .../handler/queue/BufferedWriteHandler.java | 4 +- .../jboss/netty/handler/ssl/SslHandler.java | 4 +- .../handler/stream/ChunkedWriteHandler.java | 5 +- .../util/internal/LinkedTransferQueue.java | 510 +++++++++--------- 16 files changed, 310 insertions(+), 286 deletions(-) diff --git a/pom.xml b/pom.xml index 87b0c64065..17f2af1207 100644 --- a/pom.xml +++ b/pom.xml @@ -227,6 +227,23 @@ maven-enforcer-plugin 1.0.1 + + + enforce-java + + enforce + + + + + + + (1.6.0,) + + + + + maven-compiler-plugin diff --git a/src/main/java/org/jboss/netty/channel/local/DefaultLocalChannel.java b/src/main/java/org/jboss/netty/channel/local/DefaultLocalChannel.java index 980c705edb..442f780642 100644 --- a/src/main/java/org/jboss/netty/channel/local/DefaultLocalChannel.java +++ b/src/main/java/org/jboss/netty/channel/local/DefaultLocalChannel.java @@ -32,7 +32,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.DefaultChannelConfig; import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; import org.jboss.netty.util.internal.ThreadLocalBoolean; /** @@ -53,7 +53,7 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel private final ChannelConfig config; private final ThreadLocalBoolean delivering = new ThreadLocalBoolean(); - final Queue writeBuffer = new LinkedTransferQueue(); + final Queue writeBuffer = QueueFactory.createQueue(MessageEvent.class); volatile DefaultLocalChannel pairedChannel; volatile LocalAddress localAddress; diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 2547cf7072..e9802cef18 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -44,7 +44,7 @@ import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; /** * @@ -187,7 +187,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { private final int subId; private final AtomicBoolean wakenUp = new AtomicBoolean(); private final Object startStopLock = new Object(); - private final Queue registerTaskQueue = new LinkedTransferQueue(); + private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class);; Boss(int subId) { this.subId = subId; diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java index 4510185345..5c3decd6e1 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -242,9 +242,8 @@ class NioDatagramChannel extends AbstractChannel * that adds support for highWaterMark checking of the write buffer size. */ private final class WriteRequestQueue extends - LinkedTransferQueue { + AbstractWriteRequestQueue { - private static final long serialVersionUID = 5057413071460766376L; private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); @@ -258,7 +257,7 @@ class NioDatagramChannel extends AbstractChannel */ @Override public boolean offer(MessageEvent e) { - boolean success = super.offer(e); + boolean success = queue.offer(e); assert success; int messageSize = getMessageSize(e); @@ -284,7 +283,7 @@ class NioDatagramChannel extends AbstractChannel */ @Override public MessageEvent poll() { - MessageEvent e = super.poll(); + MessageEvent e = queue.poll(); if (e != null) { int messageSize = getMessageSize(e); int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java index 3bd30a0121..cd42c363c2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java @@ -46,7 +46,7 @@ import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; /** * A class responsible for registering channels with {@link Selector}. @@ -118,12 +118,12 @@ class NioDatagramWorker implements Runnable { /** * Queue of {@link ChannelRegistionTask}s */ - private final Queue registerTaskQueue = new LinkedTransferQueue(); + private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class);; /** * Queue of WriteTasks */ - private final Queue writeTaskQueue = new LinkedTransferQueue(); + private final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 6dc97527a5..23a50e736e 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -33,7 +33,6 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; -import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.ThreadLocalBoolean; /** @@ -194,9 +193,7 @@ class NioSocketChannel extends AbstractChannel } } - private final class WriteRequestQueue extends LinkedTransferQueue { - - private static final long serialVersionUID = -246694024103520626L; + private final class WriteRequestQueue extends AbstractWriteRequestQueue { private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); @@ -206,7 +203,7 @@ class NioSocketChannel extends AbstractChannel @Override public boolean offer(MessageEvent e) { - boolean success = super.offer(e); + boolean success = queue.offer(e); assert success; int messageSize = getMessageSize(e); @@ -228,7 +225,7 @@ class NioSocketChannel extends AbstractChannel @Override public MessageEvent poll() { - MessageEvent e = super.poll(); + MessageEvent e = queue.poll(); if (e != null) { int messageSize = getMessageSize(e); int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 7ab0e1f683..9a2015b7da 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -48,7 +48,7 @@ import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; /** * @@ -76,8 +76,8 @@ class NioWorker implements Runnable { private final AtomicBoolean wakenUp = new AtomicBoolean(); private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); private final Object startStopLock = new Object(); - private final Queue registerTaskQueue = new LinkedTransferQueue(); - private final Queue writeTaskQueue = new LinkedTransferQueue(); + private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); + private final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); diff --git a/src/main/java/org/jboss/netty/handler/codec/http/HttpClientCodec.java b/src/main/java/org/jboss/netty/handler/codec/http/HttpClientCodec.java index ab40a92e2a..f24cecc02f 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/HttpClientCodec.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/HttpClientCodec.java @@ -23,7 +23,7 @@ import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelUpstreamHandler; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; /** * A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder} @@ -47,7 +47,7 @@ public class HttpClientCodec implements ChannelUpstreamHandler, ChannelDownstreamHandler { /** A queue that is used for correlating a request and a response. */ - final Queue queue = new LinkedTransferQueue(); + final Queue queue = QueueFactory.createQueue(HttpMethod.class); /** If true, decoding stops (i.e. pass-through) */ volatile boolean done; diff --git a/src/main/java/org/jboss/netty/handler/codec/http/HttpContentEncoder.java b/src/main/java/org/jboss/netty/handler/codec/http/HttpContentEncoder.java index f5811619c0..bc61fb161e 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/HttpContentEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/HttpContentEncoder.java @@ -24,7 +24,7 @@ import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.handler.codec.embedder.EncoderEmbedder; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; /** * Encodes the content of the outbound {@link HttpResponse} and {@link HttpChunk}. @@ -53,7 +53,7 @@ import org.jboss.netty.util.internal.LinkedTransferQueue; */ public abstract class HttpContentEncoder extends SimpleChannelHandler { - private final Queue acceptEncodingQueue = new LinkedTransferQueue(); + private final Queue acceptEncodingQueue = QueueFactory.createQueue(String.class); private volatile EncoderEmbedder encoder; /** diff --git a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java index af9287f752..a8e979313f 100644 --- a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java @@ -39,7 +39,7 @@ import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.DefaultObjectSizeEstimator; import org.jboss.netty.util.ObjectSizeEstimator; import org.jboss.netty.util.internal.ConcurrentIdentityHashMap; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; import org.jboss.netty.util.internal.SharedResourceMisuseDetector; /** @@ -220,7 +220,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { ThreadFactory threadFactory) { super(corePoolSize, corePoolSize, keepAliveTime, unit, - new LinkedTransferQueue(), threadFactory, new NewThreadRunsPolicy()); + QueueFactory.createQueue(Runnable.class), threadFactory, new NewThreadRunsPolicy()); if (objectSizeEstimator == null) { throw new NullPointerException("objectSizeEstimator"); diff --git a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java index 776460beb9..699bf55da2 100644 --- a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java @@ -31,7 +31,7 @@ import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.util.ObjectSizeEstimator; import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; /** * A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the @@ -286,7 +286,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends } private final class ChildExecutor implements Executor, Runnable { - private final Queue tasks = new LinkedTransferQueue(); + private final Queue tasks = QueueFactory.createQueue(Runnable.class); private final AtomicBoolean isRunning = new AtomicBoolean(false); ChildExecutor() { diff --git a/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java b/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java index 177a4594ac..b53db8323e 100644 --- a/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java +++ b/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java @@ -29,7 +29,7 @@ import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.util.internal.DeadLockProofWorker; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; /** * Emulates blocking read operation. This handler stores all received messages @@ -85,7 +85,7 @@ public class BlockingReadHandler extends SimpleChannelUpstreamHandler { * implementation. */ public BlockingReadHandler() { - this(new LinkedTransferQueue()); + this(QueueFactory.createQueue(ChannelEvent.class)); } /** diff --git a/src/main/java/org/jboss/netty/handler/queue/BufferedWriteHandler.java b/src/main/java/org/jboss/netty/handler/queue/BufferedWriteHandler.java index 5cd32626c5..67b612cb9a 100644 --- a/src/main/java/org/jboss/netty/handler/queue/BufferedWriteHandler.java +++ b/src/main/java/org/jboss/netty/handler/queue/BufferedWriteHandler.java @@ -33,7 +33,7 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig; import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; /** * Emulates buffered write operation. This handler stores all write requests @@ -194,7 +194,7 @@ public class BufferedWriteHandler extends SimpleChannelHandler { * into a single write request on {@link #flush()} */ public BufferedWriteHandler(boolean consolidateOnFlush) { - this(new LinkedTransferQueue(), consolidateOnFlush); + this(QueueFactory.createQueue(MessageEvent.class), consolidateOnFlush); } /** diff --git a/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java b/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java index c2dcf9ead4..4b58b7112c 100644 --- a/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java +++ b/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java @@ -50,8 +50,8 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.handler.codec.frame.FrameDecoder; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.NonReentrantLock; +import org.jboss.netty.util.internal.QueueFactory; /** * Adds SSL @@ -198,7 +198,7 @@ public class SslHandler extends FrameDecoder int ignoreClosedChannelException; final Object ignoreClosedChannelExceptionLock = new Object(); private final Queue pendingUnencryptedWrites = new LinkedList(); - private final Queue pendingEncryptedWrites = new LinkedTransferQueue(); + private final Queue pendingEncryptedWrites = QueueFactory.createQueue(MessageEvent.class); private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock(); private volatile boolean issueHandshake = false; diff --git a/src/main/java/org/jboss/netty/handler/stream/ChunkedWriteHandler.java b/src/main/java/org/jboss/netty/handler/stream/ChunkedWriteHandler.java index 92c9bf08c0..bc98cba8bb 100644 --- a/src/main/java/org/jboss/netty/handler/stream/ChunkedWriteHandler.java +++ b/src/main/java/org/jboss/netty/handler/stream/ChunkedWriteHandler.java @@ -35,7 +35,7 @@ import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.QueueFactory; /** * A {@link ChannelHandler} that adds support for writing a large data stream @@ -80,8 +80,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); - private final Queue queue = - new LinkedTransferQueue(); + private final Queue queue = QueueFactory.createQueue(MessageEvent.class); private ChannelHandlerContext ctx; private MessageEvent currentEvent; diff --git a/src/main/java/org/jboss/netty/util/internal/LinkedTransferQueue.java b/src/main/java/org/jboss/netty/util/internal/LinkedTransferQueue.java index 4968070438..991a36605e 100644 --- a/src/main/java/org/jboss/netty/util/internal/LinkedTransferQueue.java +++ b/src/main/java/org/jboss/netty/util/internal/LinkedTransferQueue.java @@ -1,38 +1,32 @@ -/* - * Copyright 2009 Red Hat, Inc. - * - * Red Hat licenses this file to you under the Apache License, version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ */ package org.jboss.netty.util.internal; import java.util.AbstractQueue; import java.util.Collection; -import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.LockSupport; + /** + * This class is a copied from URL revision 1.91 + *
+ * The only difference is that it replace {@link BlockingQueue} and any reference to the TransferQueue interface was removed + *
+ * + * + * Please use {@link QueueFactory} to create a Queue as it will use the "optimal" implementation depending on the JVM + * + *
+ *
+ * * An unbounded {@link BlockingQueue} based on linked nodes. * This queue orders elements FIFO (first-in-first-out) with respect * to any given producer. The head of the queue is that @@ -40,10 +34,17 @@ import java.util.concurrent.locks.LockSupport; * producer. The tail of the queue is that element that has * been on the queue the shortest time for some producer. * - *

Beware that, unlike in most collections, the {@code size} - * method is NOT a constant-time operation. Because of the + *

Beware that, unlike in most collections, the {@code size} method + * is NOT a constant-time operation. Because of the * asynchronous nature of these queues, determining the current number - * of elements requires a traversal of the elements. + * of elements requires a traversal of the elements, and so may report + * inaccurate results if this collection is modified during traversal. + * Additionally, the bulk operations {@code addAll}, + * {@code removeAll}, {@code retainAll}, {@code containsAll}, + * {@code equals}, and {@code toArray} are not guaranteed + * to be performed atomically. For example, an iterator operating + * concurrently with an {@code addAll} operation might view only some + * of the added elements. * *

This class and its iterator implement all of the * optional methods of the {@link Collection} and {@link @@ -60,11 +61,8 @@ import java.util.concurrent.locks.LockSupport; * * Java Collections Framework. * - * @author The Netty Project + * @since 1.7 * @author Doug Lea - * @author Trustin Lee - * @version $Rev: 2373 $, $Date: 2010-10-20 20:33:23 +0900 (Wed, 20 Oct 2010) $ (Upstream: 1.79) - * * @param the type of elements held in this collection */ public class LinkedTransferQueue extends AbstractQueue @@ -315,8 +313,8 @@ public class LinkedTransferQueue extends AbstractQueue * of less-contended queues. During spins threads check their * interrupt status and generate a thread-local random number * to decide to occasionally perform a Thread.yield. While - * yield has underdefined specs, we assume that might it help, - * and will not hurt in limiting impact of spinning on busy + * yield has underdefined specs, we assume that it might help, + * and will not hurt, in limiting impact of spinning on busy * systems. We also use smaller (1/2) spins for nodes that are * not known to be front but whose predecessors have not * blocked -- these "chained" spins avoid artifacts of @@ -438,34 +436,12 @@ public class LinkedTransferQueue extends AbstractQueue // CAS methods for fields final boolean casNext(Node cmp, Node val) { - if (AtomicFieldUpdaterUtil.isAvailable()) { - return nextUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (next == cmp) { - next = val; - return true; - } else { - return false; - } - } - } + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; - if (AtomicFieldUpdaterUtil.isAvailable()) { - return itemUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (item == cmp) { - item = val; - return true; - } else { - return false; - } - } - } + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** @@ -473,7 +449,7 @@ public class LinkedTransferQueue extends AbstractQueue * only be seen after publication via casNext. */ Node(Object item, boolean isData) { - this.item = item; + UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } @@ -482,21 +458,21 @@ public class LinkedTransferQueue extends AbstractQueue * only after CASing head field, so uses relaxed write. */ final void forgetNext() { - this.next = this; + UNSAFE.putObject(this, nextOffset, this); } /** * Sets item to self and waiter to null, to avoid garbage * retention after matching or cancelling. Uses relaxed writes - * bacause order is already constrained in the only calling + * because order is already constrained in the only calling * contexts: item is forgotten only after volatile/atomic * mechanics that extract items. Similarly, clearing waiter * follows either CAS or return from park (if ever parked; * else we don't care). */ final void forgetContents() { - this.item = this; - this.waiter = null; + UNSAFE.putObject(this, itemOffset, this); + UNSAFE.putObject(this, waiterOffset, null); } /** @@ -505,7 +481,7 @@ public class LinkedTransferQueue extends AbstractQueue */ final boolean isMatched() { Object x = item; - return x == this || x == null == isData; + return (x == this) || ((x == null) == isData); } /** @@ -523,7 +499,7 @@ public class LinkedTransferQueue extends AbstractQueue final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; - return d != haveData && (x = item) != this && x != null == d; + return d != haveData && (x = item) != this && (x != null) == d; } /** @@ -539,66 +515,49 @@ public class LinkedTransferQueue extends AbstractQueue return false; } - private static final AtomicReferenceFieldUpdater nextUpdater = - AtomicFieldUpdaterUtil.newRefUpdater(Node.class, Node.class, "next"); - private static final AtomicReferenceFieldUpdater itemUpdater = - AtomicFieldUpdaterUtil.newRefUpdater(Node.class, Object.class, "item"); + private static final long serialVersionUID = -3375979862319811754L; + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long itemOffset; + private static final long nextOffset; + private static final long waiterOffset; + static { + try { + UNSAFE = getUnsafe(); + Class k = Node.class; + itemOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("item")); + nextOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("next")); + waiterOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("waiter")); + } catch (Exception e) { + throw new Error(e); + } + } } /** head of the queue; null until first enqueue */ transient volatile Node head; /** tail of the queue; null until first append */ - transient volatile Node tail; + private transient volatile Node tail; /** The number of apparent failures to unsplice removed nodes */ - transient volatile int sweepVotes; + private transient volatile int sweepVotes; // CAS methods for fields private boolean casTail(Node cmp, Node val) { - if (AtomicFieldUpdaterUtil.isAvailable()) { - return tailUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (tail == cmp) { - tail = val; - return true; - } else { - return false; - } - } - } + return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); } private boolean casHead(Node cmp, Node val) { - if (AtomicFieldUpdaterUtil.isAvailable()) { - return headUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (head == cmp) { - head = val; - return true; - } else { - return false; - } - } - } + return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); } private boolean casSweepVotes(int cmp, int val) { - if (AtomicFieldUpdaterUtil.isAvailable()) { - return sweepVotesUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (sweepVotes == cmp) { - sweepVotes = val; - return true; - } else { - return false; - } - } - } + return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val); } /* @@ -626,51 +585,46 @@ public class LinkedTransferQueue extends AbstractQueue * @throws NullPointerException if haveData mode but e is null */ private E xfer(E e, boolean haveData, int how, long nanos) { - if (haveData && e == null) { + if (haveData && (e == null)) throw new NullPointerException(); - } Node s = null; // the node to append, if needed - retry: for (;;) { // restart on append race + retry: + for (;;) { // restart on append race for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; Object item = p.item; - if (item != p && item != null == isData) { // unmatched - if (isData == haveData) { // can't match + if (item != p && (item != null) == isData) { // unmatched + if (isData == haveData) // can't match break; - } if (p.casItem(item, e)) { // match for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton - if (head == h && casHead(h, n == null? q : n)) { + if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || - (q = h.next) == null || !q.isMatched()) { + (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 - } } LockSupport.unpark(p.waiter); return LinkedTransferQueue.cast(item); } } Node n = p.next; - p = p != n ? n : (h = head); // Use head if p offlist + p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { // No matches available - if (s == null) { + if (s == null) s = new Node(e, haveData); - } Node pred = tryAppend(s, haveData); - if (pred == null) { + if (pred == null) continue retry; // lost race vs opposite mode - } - if (how != ASYNC) { + if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); - } } return e; // not waiting } @@ -689,25 +643,22 @@ public class LinkedTransferQueue extends AbstractQueue for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail if (p == null && (p = head) == null) { - if (casHead(null, s)) { + if (casHead(null, s)) return s; // initialize - } } - else if (p.cannotPrecede(haveData)) { + else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode - } else if ((n = p.next) != null) { // not last; keep traversing + else if ((n = p.next) != null) // not last; keep traversing p = p != t && t != (u = tail) ? (t = u) : // stale tail - p != n ? n : null; // restart if off list - } else if (!p.casNext(null, s)) { + (p != n) ? n : null; // restart if off list + else if (!p.casNext(null, s)) p = p.next; // re-read on CAS failure - } else { + else { if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry - (s = s.next) != null && s != t) { - continue; - } + (s = s.next) != null && s != t); } return p; } @@ -739,35 +690,32 @@ public class LinkedTransferQueue extends AbstractQueue s.forgetContents(); // avoid garbage return LinkedTransferQueue.cast(item); } - if ((w.isInterrupted() || timed && nanos <= 0) && + if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel unsplice(pred, s); return e; } if (spins < 0) { // establish spins at/near front - if ((spins = spinsFor(pred, s.isData)) > 0) { + if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); - } } else if (spins > 0) { // spin --spins; - if (randomYields.nextInt(CHAINED_SPINS) == 0) { + if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield - } } else if (s.waiter == null) { s.waiter = w; // request unpark then recheck } else if (timed) { long now = System.nanoTime(); - if ((nanos -= now - lastTime) > 0) { - LockSupport.parkNanos(nanos); - } + if ((nanos -= now - lastTime) > 0) + LockSupport.parkNanos(this, nanos); lastTime = now; } else { - LockSupport.park(); + LockSupport.park(this); } } } @@ -778,15 +726,12 @@ public class LinkedTransferQueue extends AbstractQueue */ private static int spinsFor(Node pred, boolean haveData) { if (MP && pred != null) { - if (pred.isData != haveData) { // phase change + if (pred.isData != haveData) // phase change return FRONT_SPINS + CHAINED_SPINS; - } - if (pred.isMatched()) { // probably at front + if (pred.isMatched()) // probably at front return FRONT_SPINS; - } - if (pred.waiter == null) { // pred apparently spinning + if (pred.waiter == null) // pred apparently spinning return CHAINED_SPINS; - } } return 0; } @@ -800,7 +745,7 @@ public class LinkedTransferQueue extends AbstractQueue */ final Node succ(Node p) { Node next = p.next; - return p == next ? head : next; + return (p == next) ? head : next; } /** @@ -809,9 +754,8 @@ public class LinkedTransferQueue extends AbstractQueue */ private Node firstOfMode(boolean isData) { for (Node p = head; p != null; p = succ(p)) { - if (!p.isMatched()) { - return p.isData == isData ? p : null; - } + if (!p.isMatched()) + return (p.isData == isData) ? p : null; } return null; } @@ -824,13 +768,11 @@ public class LinkedTransferQueue extends AbstractQueue for (Node p = head; p != null; p = succ(p)) { Object item = p.item; if (p.isData) { - if (item != null && item != p) { + if (item != null && item != p) return LinkedTransferQueue.cast(item); - } } - else if (item == null) { + else if (item == null) return null; - } } return null; } @@ -843,17 +785,15 @@ public class LinkedTransferQueue extends AbstractQueue int count = 0; for (Node p = head; p != null; ) { if (!p.isMatched()) { - if (p.isData != data) { + if (p.isData != data) return 0; - } - if (++count == Integer.MAX_VALUE) { // saturated + if (++count == Integer.MAX_VALUE) // saturated break; - } } Node n = p.next; - if (n != p) { + if (n != p) p = n; - } else { + else { count = 0; p = head; } @@ -871,23 +811,61 @@ public class LinkedTransferQueue extends AbstractQueue * Moves to next node after prev, or first node if prev null. */ private void advance(Node prev) { - lastPred = lastRet; - lastRet = prev; - for (Node p = prev == null ? head : succ(prev); - p != null; p = succ(p)) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p) { + /* + * To track and avoid buildup of deleted nodes in the face + * of calls to both Queue.remove and Itr.remove, we must + * include variants of unsplice and sweep upon each + * advance: Upon Itr.remove, we may need to catch up links + * from lastPred, and upon other removes, we might need to + * skip ahead from stale nodes and unsplice deleted ones + * found while advancing. + */ + + Node r, b; // reset lastPred upon possible deletion of lastRet + if ((r = lastRet) != null && !r.isMatched()) + lastPred = r; // next lastPred is old lastRet + else if ((b = lastPred) == null || b.isMatched()) + lastPred = null; // at start of list + else { + Node s, n; // help with removal of lastPred.next + while ((s = b.next) != null && + s != b && s.isMatched() && + (n = s.next) != null && n != s) + b.casNext(s, n); + } + + this.lastRet = prev; + + for (Node p = prev, s, n;;) { + s = (p == null) ? head : p.next; + if (s == null) + break; + else if (s == p) { + p = null; + continue; + } + Object item = s.item; + if (s.isData) { + if (item != null && item != s) { nextItem = LinkedTransferQueue.cast(item); - nextNode = p; + nextNode = s; return; } } - else if (item == null) { + else if (item == null) break; - } + // assert s.isMatched(); + if (p == null) + p = s; + else if ((n = s.next) == null) + break; + else if (s == n) + p = null; + else + p.casNext(s, n); } nextNode = null; + nextItem = null; } Itr() { @@ -900,22 +878,19 @@ public class LinkedTransferQueue extends AbstractQueue public final E next() { Node p = nextNode; - if (p == null) { - throw new NoSuchElementException(); - } + if (p == null) throw new NoSuchElementException(); E e = nextItem; advance(p); return e; } public final void remove() { - Node p = lastRet; - if (p == null) { + final Node lastRet = this.lastRet; + if (lastRet == null) throw new IllegalStateException(); - } - if (p.tryMatchData()) { - unsplice(lastPred, p); - } + this.lastRet = null; + if (lastRet.tryMatchData()) + unsplice(lastPred, lastRet); } } @@ -941,30 +916,25 @@ public class LinkedTransferQueue extends AbstractQueue if (pred != null && pred != s && pred.next == s) { Node n = s.next; if (n == null || - n != s && pred.casNext(s, n) && pred.isMatched()) { + (n != s && pred.casNext(s, n) && pred.isMatched())) { for (;;) { // check if at, or could be, head Node h = head; - if (h == pred || h == s || h == null) { + if (h == pred || h == s || h == null) return; // at head or list empty - } - if (!h.isMatched()) { + if (!h.isMatched()) break; - } Node hn = h.next; - if (hn == null) { + if (hn == null) return; // now empty - } - if (hn != h && casHead(h, hn)) { + if (hn != h && casHead(h, hn)) h.forgetNext(); // advance head - } } if (pred.next != pred && s.next != s) { // recheck if offlist for (;;) { // sweep now if enough votes int v = sweepVotes; if (v < SWEEP_THRESHOLD) { - if (casSweepVotes(v, v + 1)) { + if (casSweepVotes(v, v + 1)) break; - } } else if (casSweepVotes(v, 0)) { sweep(); @@ -982,17 +952,16 @@ public class LinkedTransferQueue extends AbstractQueue */ private void sweep() { for (Node p = head, s, n; p != null && (s = p.next) != null; ) { - if (!s.isMatched()) { + if (!s.isMatched()) // Unmatched nodes are never self-linked p = s; - } else if ((n = s.next) == null) { // trailing node is pinned + else if ((n = s.next) == null) // trailing node is pinned break; - } else if (s == n) { // stale + else if (s == n) // stale // No need to also check for p == s, since that implies s == n p = head; - } else { + else p.casNext(s, n); - } } } @@ -1010,9 +979,8 @@ public class LinkedTransferQueue extends AbstractQueue return true; } } - else if (item == null) { + else if (item == null) break; - } pred = p; if ((p = p.next) == pred) { // stale pred = null; @@ -1028,7 +996,6 @@ public class LinkedTransferQueue extends AbstractQueue * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { - super(); } /** @@ -1061,7 +1028,8 @@ public class LinkedTransferQueue extends AbstractQueue * return {@code false}. * * @return {@code true} (as specified by - * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) + * {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit) + * BlockingQueue.offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e, long timeout, TimeUnit unit) { @@ -1073,8 +1041,7 @@ public class LinkedTransferQueue extends AbstractQueue * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * - * @return {@code true} (as specified by - * {@link BlockingQueue#offer(Object) BlockingQueue.offer}) + * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { @@ -1090,7 +1057,6 @@ public class LinkedTransferQueue extends AbstractQueue * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ - @Override public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; @@ -1144,29 +1110,25 @@ public class LinkedTransferQueue extends AbstractQueue */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { - if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) { + if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; - } - if (!Thread.interrupted()) { + if (!Thread.interrupted()) return false; - } throw new InterruptedException(); } public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); - if (e != null) { + if (e != null) return e; - } Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = xfer(null, false, TIMED, unit.toNanos(timeout)); - if (e != null || !Thread.interrupted()) { + if (e != null || !Thread.interrupted()) return e; - } throw new InterruptedException(); } @@ -1179,15 +1141,12 @@ public class LinkedTransferQueue extends AbstractQueue * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection c) { - if (c == null) { + if (c == null) throw new NullPointerException(); - } - if (c == this) { + if (c == this) throw new IllegalArgumentException(); - } int n = 0; - E e; - while ( (e = poll()) != null) { + for (E e; (e = poll()) != null;) { c.add(e); ++n; } @@ -1199,15 +1158,12 @@ public class LinkedTransferQueue extends AbstractQueue * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection c, int maxElements) { - if (c == null) { + if (c == null) throw new NullPointerException(); - } - if (c == this) { + if (c == this) throw new IllegalArgumentException(); - } int n = 0; - E e; - while (n < maxElements && (e = poll()) != null) { + for (E e; n < maxElements && (e = poll()) != null;) { c.add(e); ++n; } @@ -1215,19 +1171,18 @@ public class LinkedTransferQueue extends AbstractQueue } /** - * Returns an iterator over the elements in this queue in proper - * sequence, from head to tail. + * Returns an iterator over the elements in this queue in proper sequence. + * The elements will be returned in order from first (head) to last (tail). * *

The returned iterator is a "weakly consistent" iterator that - * will never throw - * {@link ConcurrentModificationException ConcurrentModificationException}, - * and guarantees to traverse elements as they existed upon - * construction of the iterator, and may (but is not guaranteed - * to) reflect any modifications subsequent to construction. + * will never throw {@link java.util.ConcurrentModificationException + * ConcurrentModificationException}, and guarantees to traverse + * elements as they existed upon construction of the iterator, and + * may (but is not guaranteed to) reflect any modifications + * subsequent to construction. * * @return an iterator over the elements in this queue in proper sequence */ - @Override public Iterator iterator() { return new Itr(); } @@ -1241,12 +1196,10 @@ public class LinkedTransferQueue extends AbstractQueue * * @return {@code true} if this queue contains no elements */ - @Override public boolean isEmpty() { for (Node p = head; p != null; p = succ(p)) { - if (!p.isMatched()) { + if (!p.isMatched()) return !p.isData; - } } return true; } @@ -1267,7 +1220,6 @@ public class LinkedTransferQueue extends AbstractQueue * * @return the number of elements in this queue */ - @Override public int size() { return countOfMode(true); } @@ -1287,17 +1239,39 @@ public class LinkedTransferQueue extends AbstractQueue * @param o element to be removed from this queue, if present * @return {@code true} if this queue changed as a result of the call */ - @Override public boolean remove(Object o) { return findAndRemove(o); } + /** + * Returns {@code true} if this queue contains the specified element. + * More formally, returns {@code true} if and only if this queue contains + * at least one element {@code e} such that {@code o.equals(e)}. + * + * @param o object to be checked for containment in this queue + * @return {@code true} if this queue contains the specified element + */ + public boolean contains(Object o) { + if (o == null) return false; + for (Node p = head; p != null; p = succ(p)) { + Object item = p.item; + if (p.isData) { + if (item != null && item != p && o.equals(item)) + return true; + } + else if (item == null) + break; + } + return false; + } + /** * Always returns {@code Integer.MAX_VALUE} because a * {@code LinkedTransferQueue} is not capacity constrained. * * @return {@code Integer.MAX_VALUE} (as specified by - * {@link BlockingQueue#remainingCapacity()}) + * {@link java.util.concurrent.BlockingQueue#remainingCapacity() + * BlockingQueue.remainingCapacity}) */ public int remainingCapacity() { return Integer.MAX_VALUE; @@ -1313,9 +1287,8 @@ public class LinkedTransferQueue extends AbstractQueue private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { s.defaultWriteObject(); - for (E e : this) { + for (E e : this) s.writeObject(e); - } // Use trailing null as sentinel s.writeObject(null); } @@ -1330,23 +1303,62 @@ public class LinkedTransferQueue extends AbstractQueue throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); for (;;) { - @SuppressWarnings("unchecked") E item = (E) s.readObject(); - if (item == null) { + @SuppressWarnings("unchecked") + E item = (E) s.readObject(); + if (item == null) break; - } else { + else offer(item); + } + } + + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE; + private static final long headOffset; + private static final long tailOffset; + private static final long sweepVotesOffset; + static { + try { + UNSAFE = getUnsafe(); + Class k = LinkedTransferQueue.class; + headOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("head")); + tailOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("tail")); + sweepVotesOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("sweepVotes")); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } } - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater headUpdater = - AtomicFieldUpdaterUtil.newRefUpdater(LinkedTransferQueue.class, Node.class, "head"); - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater tailUpdater = - AtomicFieldUpdaterUtil.newRefUpdater(LinkedTransferQueue.class, Node.class, "tail"); - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater sweepVotesUpdater = - AtomicFieldUpdaterUtil.newIntUpdater(LinkedTransferQueue.class, "sweepVotes"); -} - +} \ No newline at end of file