Backport latest LinkedTransferQueue to 3.2.x. This also includes changes
to allow the usage in java5 enviroments. Be aware that the new LinkedTransferQueue will only be used when java6+ was found. See #102
This commit is contained in:
parent
1a6dae4c66
commit
b2be5f5058
17
pom.xml
17
pom.xml
@ -227,6 +227,23 @@
|
||||
<!-- See org.jboss:jboss-parent -->
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.0.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce-java</id>
|
||||
<goals>
|
||||
<goal>enforce</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<rules>
|
||||
<requireJavaVersion>
|
||||
<!-- Enforce java 1.6 as minimum for compiling -->
|
||||
<!-- This is needed because of the Unsafe detection code -->
|
||||
<version>(1.6.0,)</version>
|
||||
</requireJavaVersion>
|
||||
</rules>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
|
@ -32,7 +32,7 @@ import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelSink;
|
||||
import org.jboss.netty.channel.DefaultChannelConfig;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
import org.jboss.netty.util.internal.ThreadLocalBoolean;
|
||||
|
||||
/**
|
||||
@ -53,7 +53,7 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel
|
||||
private final ChannelConfig config;
|
||||
private final ThreadLocalBoolean delivering = new ThreadLocalBoolean();
|
||||
|
||||
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
|
||||
final Queue<MessageEvent> writeBuffer = QueueFactory.createQueue(MessageEvent.class);
|
||||
|
||||
volatile DefaultLocalChannel pairedChannel;
|
||||
volatile LocalAddress localAddress;
|
||||
|
@ -44,7 +44,7 @@ import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||
import org.jboss.netty.util.internal.DeadLockProofWorker;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -187,7 +187,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
|
||||
private final int subId;
|
||||
private final AtomicBoolean wakenUp = new AtomicBoolean();
|
||||
private final Object startStopLock = new Object();
|
||||
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);;
|
||||
|
||||
Boss(int subId) {
|
||||
this.subId = subId;
|
||||
|
@ -242,9 +242,8 @@ class NioDatagramChannel extends AbstractChannel
|
||||
* that adds support for highWaterMark checking of the write buffer size.
|
||||
*/
|
||||
private final class WriteRequestQueue extends
|
||||
LinkedTransferQueue<MessageEvent> {
|
||||
AbstractWriteRequestQueue {
|
||||
|
||||
private static final long serialVersionUID = 5057413071460766376L;
|
||||
|
||||
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
|
||||
|
||||
@ -258,7 +257,7 @@ class NioDatagramChannel extends AbstractChannel
|
||||
*/
|
||||
@Override
|
||||
public boolean offer(MessageEvent e) {
|
||||
boolean success = super.offer(e);
|
||||
boolean success = queue.offer(e);
|
||||
assert success;
|
||||
|
||||
int messageSize = getMessageSize(e);
|
||||
@ -284,7 +283,7 @@ class NioDatagramChannel extends AbstractChannel
|
||||
*/
|
||||
@Override
|
||||
public MessageEvent poll() {
|
||||
MessageEvent e = super.poll();
|
||||
MessageEvent e = queue.poll();
|
||||
if (e != null) {
|
||||
int messageSize = getMessageSize(e);
|
||||
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
|
||||
|
@ -46,7 +46,7 @@ import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* A class responsible for registering channels with {@link Selector}.
|
||||
@ -118,12 +118,12 @@ class NioDatagramWorker implements Runnable {
|
||||
/**
|
||||
* Queue of {@link ChannelRegistionTask}s
|
||||
*/
|
||||
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);;
|
||||
|
||||
/**
|
||||
* Queue of WriteTasks
|
||||
*/
|
||||
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||
private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
|
||||
|
||||
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
|
||||
|
||||
|
@ -33,7 +33,6 @@ import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelSink;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.ThreadLocalBoolean;
|
||||
|
||||
/**
|
||||
@ -194,9 +193,7 @@ class NioSocketChannel extends AbstractChannel
|
||||
}
|
||||
}
|
||||
|
||||
private final class WriteRequestQueue extends LinkedTransferQueue<MessageEvent> {
|
||||
|
||||
private static final long serialVersionUID = -246694024103520626L;
|
||||
private final class WriteRequestQueue extends AbstractWriteRequestQueue {
|
||||
|
||||
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
|
||||
|
||||
@ -206,7 +203,7 @@ class NioSocketChannel extends AbstractChannel
|
||||
|
||||
@Override
|
||||
public boolean offer(MessageEvent e) {
|
||||
boolean success = super.offer(e);
|
||||
boolean success = queue.offer(e);
|
||||
assert success;
|
||||
|
||||
int messageSize = getMessageSize(e);
|
||||
@ -228,7 +225,7 @@ class NioSocketChannel extends AbstractChannel
|
||||
|
||||
@Override
|
||||
public MessageEvent poll() {
|
||||
MessageEvent e = super.poll();
|
||||
MessageEvent e = queue.poll();
|
||||
if (e != null) {
|
||||
int messageSize = getMessageSize(e);
|
||||
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
|
||||
|
@ -48,7 +48,7 @@ import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||
import org.jboss.netty.util.internal.DeadLockProofWorker;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -76,8 +76,8 @@ class NioWorker implements Runnable {
|
||||
private final AtomicBoolean wakenUp = new AtomicBoolean();
|
||||
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
|
||||
private final Object startStopLock = new Object();
|
||||
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
|
||||
private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
|
||||
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
|
||||
|
||||
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
|
||||
|
@ -23,7 +23,7 @@ import org.jboss.netty.channel.ChannelDownstreamHandler;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder}
|
||||
@ -47,7 +47,7 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
|
||||
ChannelDownstreamHandler {
|
||||
|
||||
/** A queue that is used for correlating a request and a response. */
|
||||
final Queue<HttpMethod> queue = new LinkedTransferQueue<HttpMethod>();
|
||||
final Queue<HttpMethod> queue = QueueFactory.createQueue(HttpMethod.class);
|
||||
|
||||
/** If true, decoding stops (i.e. pass-through) */
|
||||
volatile boolean done;
|
||||
|
@ -24,7 +24,7 @@ import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelHandler;
|
||||
import org.jboss.netty.handler.codec.embedder.EncoderEmbedder;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* Encodes the content of the outbound {@link HttpResponse} and {@link HttpChunk}.
|
||||
@ -53,7 +53,7 @@ import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
*/
|
||||
public abstract class HttpContentEncoder extends SimpleChannelHandler {
|
||||
|
||||
private final Queue<String> acceptEncodingQueue = new LinkedTransferQueue<String>();
|
||||
private final Queue<String> acceptEncodingQueue = QueueFactory.createQueue(String.class);
|
||||
private volatile EncoderEmbedder<ChannelBuffer> encoder;
|
||||
|
||||
/**
|
||||
|
@ -39,7 +39,7 @@ import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.DefaultObjectSizeEstimator;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
|
||||
|
||||
/**
|
||||
@ -220,7 +220,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
ThreadFactory threadFactory) {
|
||||
|
||||
super(corePoolSize, corePoolSize, keepAliveTime, unit,
|
||||
new LinkedTransferQueue<Runnable>(), threadFactory, new NewThreadRunsPolicy());
|
||||
QueueFactory.createQueue(Runnable.class), threadFactory, new NewThreadRunsPolicy());
|
||||
|
||||
if (objectSizeEstimator == null) {
|
||||
throw new NullPointerException("objectSizeEstimator");
|
||||
|
@ -31,7 +31,7 @@ import org.jboss.netty.channel.ChannelState;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
|
||||
@ -286,7 +286,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
|
||||
}
|
||||
|
||||
private final class ChildExecutor implements Executor, Runnable {
|
||||
private final Queue<Runnable> tasks = new LinkedTransferQueue<Runnable>();
|
||||
private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class);
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
||||
|
||||
ChildExecutor() {
|
||||
|
@ -29,7 +29,7 @@ import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.util.internal.DeadLockProofWorker;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* Emulates blocking read operation. This handler stores all received messages
|
||||
@ -85,7 +85,7 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
|
||||
* implementation.
|
||||
*/
|
||||
public BlockingReadHandler() {
|
||||
this(new LinkedTransferQueue<ChannelEvent>());
|
||||
this(QueueFactory.createQueue(ChannelEvent.class));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -33,7 +33,7 @@ import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelHandler;
|
||||
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
|
||||
import org.jboss.netty.util.HashedWheelTimer;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* Emulates buffered write operation. This handler stores all write requests
|
||||
@ -194,7 +194,7 @@ public class BufferedWriteHandler extends SimpleChannelHandler {
|
||||
* into a single write request on {@link #flush()}
|
||||
*/
|
||||
public BufferedWriteHandler(boolean consolidateOnFlush) {
|
||||
this(new LinkedTransferQueue<MessageEvent>(), consolidateOnFlush);
|
||||
this(QueueFactory.createQueue(MessageEvent.class), consolidateOnFlush);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -50,8 +50,8 @@ import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.NonReentrantLock;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
|
||||
@ -198,7 +198,7 @@ public class SslHandler extends FrameDecoder
|
||||
int ignoreClosedChannelException;
|
||||
final Object ignoreClosedChannelExceptionLock = new Object();
|
||||
private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
|
||||
private final Queue<MessageEvent> pendingEncryptedWrites = new LinkedTransferQueue<MessageEvent>();
|
||||
private final Queue<MessageEvent> pendingEncryptedWrites = QueueFactory.createQueue(MessageEvent.class);
|
||||
private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
|
||||
|
||||
private volatile boolean issueHandshake = false;
|
||||
|
@ -35,7 +35,7 @@ import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* A {@link ChannelHandler} that adds support for writing a large data stream
|
||||
@ -80,8 +80,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||
private static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
|
||||
|
||||
private final Queue<MessageEvent> queue =
|
||||
new LinkedTransferQueue<MessageEvent>();
|
||||
private final Queue<MessageEvent> queue = QueueFactory.createQueue(MessageEvent.class);
|
||||
|
||||
private ChannelHandlerContext ctx;
|
||||
private MessageEvent currentEvent;
|
||||
|
@ -1,38 +1,32 @@
|
||||
/*
|
||||
* Copyright 2009 Red Hat, Inc.
|
||||
*
|
||||
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with the
|
||||
* License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package org.jboss.netty.util.internal;
|
||||
|
||||
import java.util.AbstractQueue;
|
||||
import java.util.Collection;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
/**
|
||||
* This class is a copied from <a href="http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/LinkedTransferQueue.java"> URL revision 1.91 </a>
|
||||
* <br>
|
||||
* The only difference is that it replace {@link BlockingQueue} and any reference to the TransferQueue interface was removed
|
||||
* <br>
|
||||
*
|
||||
* <strong>
|
||||
* Please use {@link QueueFactory} to create a Queue as it will use the "optimal" implementation depending on the JVM
|
||||
* </strong>
|
||||
* <br>
|
||||
* <br>
|
||||
*
|
||||
* An unbounded {@link BlockingQueue} based on linked nodes.
|
||||
* This queue orders elements FIFO (first-in-first-out) with respect
|
||||
* to any given producer. The <em>head</em> of the queue is that
|
||||
@ -40,10 +34,17 @@ import java.util.concurrent.locks.LockSupport;
|
||||
* producer. The <em>tail</em> of the queue is that element that has
|
||||
* been on the queue the shortest time for some producer.
|
||||
*
|
||||
* <p>Beware that, unlike in most collections, the {@code size}
|
||||
* method is <em>NOT</em> a constant-time operation. Because of the
|
||||
* <p>Beware that, unlike in most collections, the {@code size} method
|
||||
* is <em>NOT</em> a constant-time operation. Because of the
|
||||
* asynchronous nature of these queues, determining the current number
|
||||
* of elements requires a traversal of the elements.
|
||||
* of elements requires a traversal of the elements, and so may report
|
||||
* inaccurate results if this collection is modified during traversal.
|
||||
* Additionally, the bulk operations {@code addAll},
|
||||
* {@code removeAll}, {@code retainAll}, {@code containsAll},
|
||||
* {@code equals}, and {@code toArray} are <em>not</em> guaranteed
|
||||
* to be performed atomically. For example, an iterator operating
|
||||
* concurrently with an {@code addAll} operation might view only some
|
||||
* of the added elements.
|
||||
*
|
||||
* <p>This class and its iterator implement all of the
|
||||
* <em>optional</em> methods of the {@link Collection} and {@link
|
||||
@ -60,11 +61,8 @@ import java.util.concurrent.locks.LockSupport;
|
||||
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
|
||||
* Java Collections Framework</a>.
|
||||
*
|
||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||
* @version $Rev: 2373 $, $Date: 2010-10-20 20:33:23 +0900 (Wed, 20 Oct 2010) $ (Upstream: 1.79)
|
||||
*
|
||||
* @param <E> the type of elements held in this collection
|
||||
*/
|
||||
public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
@ -315,8 +313,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* of less-contended queues. During spins threads check their
|
||||
* interrupt status and generate a thread-local random number
|
||||
* to decide to occasionally perform a Thread.yield. While
|
||||
* yield has underdefined specs, we assume that might it help,
|
||||
* and will not hurt in limiting impact of spinning on busy
|
||||
* yield has underdefined specs, we assume that it might help,
|
||||
* and will not hurt, in limiting impact of spinning on busy
|
||||
* systems. We also use smaller (1/2) spins for nodes that are
|
||||
* not known to be front but whose predecessors have not
|
||||
* blocked -- these "chained" spins avoid artifacts of
|
||||
@ -438,34 +436,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
|
||||
// CAS methods for fields
|
||||
final boolean casNext(Node cmp, Node val) {
|
||||
if (AtomicFieldUpdaterUtil.isAvailable()) {
|
||||
return nextUpdater.compareAndSet(this, cmp, val);
|
||||
} else {
|
||||
synchronized (this) {
|
||||
if (next == cmp) {
|
||||
next = val;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
|
||||
}
|
||||
|
||||
final boolean casItem(Object cmp, Object val) {
|
||||
// assert cmp == null || cmp.getClass() != Node.class;
|
||||
if (AtomicFieldUpdaterUtil.isAvailable()) {
|
||||
return itemUpdater.compareAndSet(this, cmp, val);
|
||||
} else {
|
||||
synchronized (this) {
|
||||
if (item == cmp) {
|
||||
item = val;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -473,7 +449,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* only be seen after publication via casNext.
|
||||
*/
|
||||
Node(Object item, boolean isData) {
|
||||
this.item = item;
|
||||
UNSAFE.putObject(this, itemOffset, item); // relaxed write
|
||||
this.isData = isData;
|
||||
}
|
||||
|
||||
@ -482,21 +458,21 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* only after CASing head field, so uses relaxed write.
|
||||
*/
|
||||
final void forgetNext() {
|
||||
this.next = this;
|
||||
UNSAFE.putObject(this, nextOffset, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets item to self and waiter to null, to avoid garbage
|
||||
* retention after matching or cancelling. Uses relaxed writes
|
||||
* bacause order is already constrained in the only calling
|
||||
* because order is already constrained in the only calling
|
||||
* contexts: item is forgotten only after volatile/atomic
|
||||
* mechanics that extract items. Similarly, clearing waiter
|
||||
* follows either CAS or return from park (if ever parked;
|
||||
* else we don't care).
|
||||
*/
|
||||
final void forgetContents() {
|
||||
this.item = this;
|
||||
this.waiter = null;
|
||||
UNSAFE.putObject(this, itemOffset, this);
|
||||
UNSAFE.putObject(this, waiterOffset, null);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -505,7 +481,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*/
|
||||
final boolean isMatched() {
|
||||
Object x = item;
|
||||
return x == this || x == null == isData;
|
||||
return (x == this) || ((x == null) == isData);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -523,7 +499,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
final boolean cannotPrecede(boolean haveData) {
|
||||
boolean d = isData;
|
||||
Object x;
|
||||
return d != haveData && (x = item) != this && x != null == d;
|
||||
return d != haveData && (x = item) != this && (x != null) == d;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -539,66 +515,49 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
return false;
|
||||
}
|
||||
|
||||
private static final AtomicReferenceFieldUpdater<Node, Node> nextUpdater =
|
||||
AtomicFieldUpdaterUtil.newRefUpdater(Node.class, Node.class, "next");
|
||||
private static final AtomicReferenceFieldUpdater<Node, Object> itemUpdater =
|
||||
AtomicFieldUpdaterUtil.newRefUpdater(Node.class, Object.class, "item");
|
||||
private static final long serialVersionUID = -3375979862319811754L;
|
||||
|
||||
// Unsafe mechanics
|
||||
private static final sun.misc.Unsafe UNSAFE;
|
||||
private static final long itemOffset;
|
||||
private static final long nextOffset;
|
||||
private static final long waiterOffset;
|
||||
static {
|
||||
try {
|
||||
UNSAFE = getUnsafe();
|
||||
Class<?> k = Node.class;
|
||||
itemOffset = UNSAFE.objectFieldOffset
|
||||
(k.getDeclaredField("item"));
|
||||
nextOffset = UNSAFE.objectFieldOffset
|
||||
(k.getDeclaredField("next"));
|
||||
waiterOffset = UNSAFE.objectFieldOffset
|
||||
(k.getDeclaredField("waiter"));
|
||||
} catch (Exception e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** head of the queue; null until first enqueue */
|
||||
transient volatile Node head;
|
||||
|
||||
/** tail of the queue; null until first append */
|
||||
transient volatile Node tail;
|
||||
private transient volatile Node tail;
|
||||
|
||||
/** The number of apparent failures to unsplice removed nodes */
|
||||
transient volatile int sweepVotes;
|
||||
private transient volatile int sweepVotes;
|
||||
|
||||
// CAS methods for fields
|
||||
private boolean casTail(Node cmp, Node val) {
|
||||
if (AtomicFieldUpdaterUtil.isAvailable()) {
|
||||
return tailUpdater.compareAndSet(this, cmp, val);
|
||||
} else {
|
||||
synchronized (this) {
|
||||
if (tail == cmp) {
|
||||
tail = val;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
|
||||
}
|
||||
|
||||
private boolean casHead(Node cmp, Node val) {
|
||||
if (AtomicFieldUpdaterUtil.isAvailable()) {
|
||||
return headUpdater.compareAndSet(this, cmp, val);
|
||||
} else {
|
||||
synchronized (this) {
|
||||
if (head == cmp) {
|
||||
head = val;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
|
||||
}
|
||||
|
||||
private boolean casSweepVotes(int cmp, int val) {
|
||||
if (AtomicFieldUpdaterUtil.isAvailable()) {
|
||||
return sweepVotesUpdater.compareAndSet(this, cmp, val);
|
||||
} else {
|
||||
synchronized (this) {
|
||||
if (sweepVotes == cmp) {
|
||||
sweepVotes = val;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -626,51 +585,46 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* @throws NullPointerException if haveData mode but e is null
|
||||
*/
|
||||
private E xfer(E e, boolean haveData, int how, long nanos) {
|
||||
if (haveData && e == null) {
|
||||
if (haveData && (e == null))
|
||||
throw new NullPointerException();
|
||||
}
|
||||
Node s = null; // the node to append, if needed
|
||||
|
||||
retry: for (;;) { // restart on append race
|
||||
retry:
|
||||
for (;;) { // restart on append race
|
||||
|
||||
for (Node h = head, p = h; p != null;) { // find & match first node
|
||||
boolean isData = p.isData;
|
||||
Object item = p.item;
|
||||
if (item != p && item != null == isData) { // unmatched
|
||||
if (isData == haveData) { // can't match
|
||||
if (item != p && (item != null) == isData) { // unmatched
|
||||
if (isData == haveData) // can't match
|
||||
break;
|
||||
}
|
||||
if (p.casItem(item, e)) { // match
|
||||
for (Node q = p; q != h;) {
|
||||
Node n = q.next; // update by 2 unless singleton
|
||||
if (head == h && casHead(h, n == null? q : n)) {
|
||||
if (head == h && casHead(h, n == null ? q : n)) {
|
||||
h.forgetNext();
|
||||
break;
|
||||
} // advance and retry
|
||||
if ((h = head) == null ||
|
||||
(q = h.next) == null || !q.isMatched()) {
|
||||
(q = h.next) == null || !q.isMatched())
|
||||
break; // unless slack < 2
|
||||
}
|
||||
}
|
||||
LockSupport.unpark(p.waiter);
|
||||
return LinkedTransferQueue.<E>cast(item);
|
||||
}
|
||||
}
|
||||
Node n = p.next;
|
||||
p = p != n ? n : (h = head); // Use head if p offlist
|
||||
p = (p != n) ? n : (h = head); // Use head if p offlist
|
||||
}
|
||||
|
||||
if (how != NOW) { // No matches available
|
||||
if (s == null) {
|
||||
if (s == null)
|
||||
s = new Node(e, haveData);
|
||||
}
|
||||
Node pred = tryAppend(s, haveData);
|
||||
if (pred == null) {
|
||||
if (pred == null)
|
||||
continue retry; // lost race vs opposite mode
|
||||
}
|
||||
if (how != ASYNC) {
|
||||
if (how != ASYNC)
|
||||
return awaitMatch(s, pred, e, (how == TIMED), nanos);
|
||||
}
|
||||
}
|
||||
return e; // not waiting
|
||||
}
|
||||
@ -689,25 +643,22 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
for (Node t = tail, p = t;;) { // move p to last node and append
|
||||
Node n, u; // temps for reads of next & tail
|
||||
if (p == null && (p = head) == null) {
|
||||
if (casHead(null, s)) {
|
||||
if (casHead(null, s))
|
||||
return s; // initialize
|
||||
}
|
||||
}
|
||||
else if (p.cannotPrecede(haveData)) {
|
||||
else if (p.cannotPrecede(haveData))
|
||||
return null; // lost race vs opposite mode
|
||||
} else if ((n = p.next) != null) { // not last; keep traversing
|
||||
else if ((n = p.next) != null) // not last; keep traversing
|
||||
p = p != t && t != (u = tail) ? (t = u) : // stale tail
|
||||
p != n ? n : null; // restart if off list
|
||||
} else if (!p.casNext(null, s)) {
|
||||
(p != n) ? n : null; // restart if off list
|
||||
else if (!p.casNext(null, s))
|
||||
p = p.next; // re-read on CAS failure
|
||||
} else {
|
||||
else {
|
||||
if (p != t) { // update if slack now >= 2
|
||||
while ((tail != t || !casTail(t, s)) &&
|
||||
(t = tail) != null &&
|
||||
(s = t.next) != null && // advance and retry
|
||||
(s = s.next) != null && s != t) {
|
||||
continue;
|
||||
}
|
||||
(s = s.next) != null && s != t);
|
||||
}
|
||||
return p;
|
||||
}
|
||||
@ -739,35 +690,32 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
s.forgetContents(); // avoid garbage
|
||||
return LinkedTransferQueue.<E>cast(item);
|
||||
}
|
||||
if ((w.isInterrupted() || timed && nanos <= 0) &&
|
||||
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
|
||||
s.casItem(e, s)) { // cancel
|
||||
unsplice(pred, s);
|
||||
return e;
|
||||
}
|
||||
|
||||
if (spins < 0) { // establish spins at/near front
|
||||
if ((spins = spinsFor(pred, s.isData)) > 0) {
|
||||
if ((spins = spinsFor(pred, s.isData)) > 0)
|
||||
randomYields = ThreadLocalRandom.current();
|
||||
}
|
||||
}
|
||||
else if (spins > 0) { // spin
|
||||
--spins;
|
||||
if (randomYields.nextInt(CHAINED_SPINS) == 0) {
|
||||
if (randomYields.nextInt(CHAINED_SPINS) == 0)
|
||||
Thread.yield(); // occasionally yield
|
||||
}
|
||||
}
|
||||
else if (s.waiter == null) {
|
||||
s.waiter = w; // request unpark then recheck
|
||||
}
|
||||
else if (timed) {
|
||||
long now = System.nanoTime();
|
||||
if ((nanos -= now - lastTime) > 0) {
|
||||
LockSupport.parkNanos(nanos);
|
||||
}
|
||||
if ((nanos -= now - lastTime) > 0)
|
||||
LockSupport.parkNanos(this, nanos);
|
||||
lastTime = now;
|
||||
}
|
||||
else {
|
||||
LockSupport.park();
|
||||
LockSupport.park(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -778,15 +726,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*/
|
||||
private static int spinsFor(Node pred, boolean haveData) {
|
||||
if (MP && pred != null) {
|
||||
if (pred.isData != haveData) { // phase change
|
||||
if (pred.isData != haveData) // phase change
|
||||
return FRONT_SPINS + CHAINED_SPINS;
|
||||
}
|
||||
if (pred.isMatched()) { // probably at front
|
||||
if (pred.isMatched()) // probably at front
|
||||
return FRONT_SPINS;
|
||||
}
|
||||
if (pred.waiter == null) { // pred apparently spinning
|
||||
if (pred.waiter == null) // pred apparently spinning
|
||||
return CHAINED_SPINS;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -800,7 +745,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*/
|
||||
final Node succ(Node p) {
|
||||
Node next = p.next;
|
||||
return p == next ? head : next;
|
||||
return (p == next) ? head : next;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -809,9 +754,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*/
|
||||
private Node firstOfMode(boolean isData) {
|
||||
for (Node p = head; p != null; p = succ(p)) {
|
||||
if (!p.isMatched()) {
|
||||
return p.isData == isData ? p : null;
|
||||
}
|
||||
if (!p.isMatched())
|
||||
return (p.isData == isData) ? p : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -824,13 +768,11 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
for (Node p = head; p != null; p = succ(p)) {
|
||||
Object item = p.item;
|
||||
if (p.isData) {
|
||||
if (item != null && item != p) {
|
||||
if (item != null && item != p)
|
||||
return LinkedTransferQueue.<E>cast(item);
|
||||
}
|
||||
}
|
||||
else if (item == null) {
|
||||
else if (item == null)
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -843,17 +785,15 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
int count = 0;
|
||||
for (Node p = head; p != null; ) {
|
||||
if (!p.isMatched()) {
|
||||
if (p.isData != data) {
|
||||
if (p.isData != data)
|
||||
return 0;
|
||||
}
|
||||
if (++count == Integer.MAX_VALUE) { // saturated
|
||||
if (++count == Integer.MAX_VALUE) // saturated
|
||||
break;
|
||||
}
|
||||
}
|
||||
Node n = p.next;
|
||||
if (n != p) {
|
||||
if (n != p)
|
||||
p = n;
|
||||
} else {
|
||||
else {
|
||||
count = 0;
|
||||
p = head;
|
||||
}
|
||||
@ -871,23 +811,61 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* Moves to next node after prev, or first node if prev null.
|
||||
*/
|
||||
private void advance(Node prev) {
|
||||
lastPred = lastRet;
|
||||
lastRet = prev;
|
||||
for (Node p = prev == null ? head : succ(prev);
|
||||
p != null; p = succ(p)) {
|
||||
Object item = p.item;
|
||||
if (p.isData) {
|
||||
if (item != null && item != p) {
|
||||
/*
|
||||
* To track and avoid buildup of deleted nodes in the face
|
||||
* of calls to both Queue.remove and Itr.remove, we must
|
||||
* include variants of unsplice and sweep upon each
|
||||
* advance: Upon Itr.remove, we may need to catch up links
|
||||
* from lastPred, and upon other removes, we might need to
|
||||
* skip ahead from stale nodes and unsplice deleted ones
|
||||
* found while advancing.
|
||||
*/
|
||||
|
||||
Node r, b; // reset lastPred upon possible deletion of lastRet
|
||||
if ((r = lastRet) != null && !r.isMatched())
|
||||
lastPred = r; // next lastPred is old lastRet
|
||||
else if ((b = lastPred) == null || b.isMatched())
|
||||
lastPred = null; // at start of list
|
||||
else {
|
||||
Node s, n; // help with removal of lastPred.next
|
||||
while ((s = b.next) != null &&
|
||||
s != b && s.isMatched() &&
|
||||
(n = s.next) != null && n != s)
|
||||
b.casNext(s, n);
|
||||
}
|
||||
|
||||
this.lastRet = prev;
|
||||
|
||||
for (Node p = prev, s, n;;) {
|
||||
s = (p == null) ? head : p.next;
|
||||
if (s == null)
|
||||
break;
|
||||
else if (s == p) {
|
||||
p = null;
|
||||
continue;
|
||||
}
|
||||
Object item = s.item;
|
||||
if (s.isData) {
|
||||
if (item != null && item != s) {
|
||||
nextItem = LinkedTransferQueue.<E>cast(item);
|
||||
nextNode = p;
|
||||
nextNode = s;
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (item == null) {
|
||||
else if (item == null)
|
||||
break;
|
||||
}
|
||||
// assert s.isMatched();
|
||||
if (p == null)
|
||||
p = s;
|
||||
else if ((n = s.next) == null)
|
||||
break;
|
||||
else if (s == n)
|
||||
p = null;
|
||||
else
|
||||
p.casNext(s, n);
|
||||
}
|
||||
nextNode = null;
|
||||
nextItem = null;
|
||||
}
|
||||
|
||||
Itr() {
|
||||
@ -900,22 +878,19 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
|
||||
public final E next() {
|
||||
Node p = nextNode;
|
||||
if (p == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
if (p == null) throw new NoSuchElementException();
|
||||
E e = nextItem;
|
||||
advance(p);
|
||||
return e;
|
||||
}
|
||||
|
||||
public final void remove() {
|
||||
Node p = lastRet;
|
||||
if (p == null) {
|
||||
final Node lastRet = this.lastRet;
|
||||
if (lastRet == null)
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
if (p.tryMatchData()) {
|
||||
unsplice(lastPred, p);
|
||||
}
|
||||
this.lastRet = null;
|
||||
if (lastRet.tryMatchData())
|
||||
unsplice(lastPred, lastRet);
|
||||
}
|
||||
}
|
||||
|
||||
@ -941,30 +916,25 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
if (pred != null && pred != s && pred.next == s) {
|
||||
Node n = s.next;
|
||||
if (n == null ||
|
||||
n != s && pred.casNext(s, n) && pred.isMatched()) {
|
||||
(n != s && pred.casNext(s, n) && pred.isMatched())) {
|
||||
for (;;) { // check if at, or could be, head
|
||||
Node h = head;
|
||||
if (h == pred || h == s || h == null) {
|
||||
if (h == pred || h == s || h == null)
|
||||
return; // at head or list empty
|
||||
}
|
||||
if (!h.isMatched()) {
|
||||
if (!h.isMatched())
|
||||
break;
|
||||
}
|
||||
Node hn = h.next;
|
||||
if (hn == null) {
|
||||
if (hn == null)
|
||||
return; // now empty
|
||||
}
|
||||
if (hn != h && casHead(h, hn)) {
|
||||
if (hn != h && casHead(h, hn))
|
||||
h.forgetNext(); // advance head
|
||||
}
|
||||
}
|
||||
if (pred.next != pred && s.next != s) { // recheck if offlist
|
||||
for (;;) { // sweep now if enough votes
|
||||
int v = sweepVotes;
|
||||
if (v < SWEEP_THRESHOLD) {
|
||||
if (casSweepVotes(v, v + 1)) {
|
||||
if (casSweepVotes(v, v + 1))
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if (casSweepVotes(v, 0)) {
|
||||
sweep();
|
||||
@ -982,17 +952,16 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*/
|
||||
private void sweep() {
|
||||
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
|
||||
if (!s.isMatched()) {
|
||||
if (!s.isMatched())
|
||||
// Unmatched nodes are never self-linked
|
||||
p = s;
|
||||
} else if ((n = s.next) == null) { // trailing node is pinned
|
||||
else if ((n = s.next) == null) // trailing node is pinned
|
||||
break;
|
||||
} else if (s == n) { // stale
|
||||
else if (s == n) // stale
|
||||
// No need to also check for p == s, since that implies s == n
|
||||
p = head;
|
||||
} else {
|
||||
else
|
||||
p.casNext(s, n);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1010,9 +979,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
return true;
|
||||
}
|
||||
}
|
||||
else if (item == null) {
|
||||
else if (item == null)
|
||||
break;
|
||||
}
|
||||
pred = p;
|
||||
if ((p = p.next) == pred) { // stale
|
||||
pred = null;
|
||||
@ -1028,7 +996,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* Creates an initially empty {@code LinkedTransferQueue}.
|
||||
*/
|
||||
public LinkedTransferQueue() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1061,7 +1028,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* return {@code false}.
|
||||
*
|
||||
* @return {@code true} (as specified by
|
||||
* {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
|
||||
* {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
|
||||
* BlockingQueue.offer})
|
||||
* @throws NullPointerException if the specified element is null
|
||||
*/
|
||||
public boolean offer(E e, long timeout, TimeUnit unit) {
|
||||
@ -1073,8 +1041,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* Inserts the specified element at the tail of this queue.
|
||||
* As the queue is unbounded, this method will never return {@code false}.
|
||||
*
|
||||
* @return {@code true} (as specified by
|
||||
* {@link BlockingQueue#offer(Object) BlockingQueue.offer})
|
||||
* @return {@code true} (as specified by {@link Queue#offer})
|
||||
* @throws NullPointerException if the specified element is null
|
||||
*/
|
||||
public boolean offer(E e) {
|
||||
@ -1090,7 +1057,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* @return {@code true} (as specified by {@link Collection#add})
|
||||
* @throws NullPointerException if the specified element is null
|
||||
*/
|
||||
@Override
|
||||
public boolean add(E e) {
|
||||
xfer(e, true, ASYNC, 0);
|
||||
return true;
|
||||
@ -1144,29 +1110,25 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*/
|
||||
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) {
|
||||
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
|
||||
return true;
|
||||
}
|
||||
if (!Thread.interrupted()) {
|
||||
if (!Thread.interrupted())
|
||||
return false;
|
||||
}
|
||||
throw new InterruptedException();
|
||||
}
|
||||
|
||||
public E take() throws InterruptedException {
|
||||
E e = xfer(null, false, SYNC, 0);
|
||||
if (e != null) {
|
||||
if (e != null)
|
||||
return e;
|
||||
}
|
||||
Thread.interrupted();
|
||||
throw new InterruptedException();
|
||||
}
|
||||
|
||||
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
|
||||
if (e != null || !Thread.interrupted()) {
|
||||
if (e != null || !Thread.interrupted())
|
||||
return e;
|
||||
}
|
||||
throw new InterruptedException();
|
||||
}
|
||||
|
||||
@ -1179,15 +1141,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* @throws IllegalArgumentException {@inheritDoc}
|
||||
*/
|
||||
public int drainTo(Collection<? super E> c) {
|
||||
if (c == null) {
|
||||
if (c == null)
|
||||
throw new NullPointerException();
|
||||
}
|
||||
if (c == this) {
|
||||
if (c == this)
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
int n = 0;
|
||||
E e;
|
||||
while ( (e = poll()) != null) {
|
||||
for (E e; (e = poll()) != null;) {
|
||||
c.add(e);
|
||||
++n;
|
||||
}
|
||||
@ -1199,15 +1158,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* @throws IllegalArgumentException {@inheritDoc}
|
||||
*/
|
||||
public int drainTo(Collection<? super E> c, int maxElements) {
|
||||
if (c == null) {
|
||||
if (c == null)
|
||||
throw new NullPointerException();
|
||||
}
|
||||
if (c == this) {
|
||||
if (c == this)
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
int n = 0;
|
||||
E e;
|
||||
while (n < maxElements && (e = poll()) != null) {
|
||||
for (E e; n < maxElements && (e = poll()) != null;) {
|
||||
c.add(e);
|
||||
++n;
|
||||
}
|
||||
@ -1215,19 +1171,18 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an iterator over the elements in this queue in proper
|
||||
* sequence, from head to tail.
|
||||
* Returns an iterator over the elements in this queue in proper sequence.
|
||||
* The elements will be returned in order from first (head) to last (tail).
|
||||
*
|
||||
* <p>The returned iterator is a "weakly consistent" iterator that
|
||||
* will never throw
|
||||
* {@link ConcurrentModificationException ConcurrentModificationException},
|
||||
* and guarantees to traverse elements as they existed upon
|
||||
* construction of the iterator, and may (but is not guaranteed
|
||||
* to) reflect any modifications subsequent to construction.
|
||||
* will never throw {@link java.util.ConcurrentModificationException
|
||||
* ConcurrentModificationException}, and guarantees to traverse
|
||||
* elements as they existed upon construction of the iterator, and
|
||||
* may (but is not guaranteed to) reflect any modifications
|
||||
* subsequent to construction.
|
||||
*
|
||||
* @return an iterator over the elements in this queue in proper sequence
|
||||
*/
|
||||
@Override
|
||||
public Iterator<E> iterator() {
|
||||
return new Itr();
|
||||
}
|
||||
@ -1241,12 +1196,10 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*
|
||||
* @return {@code true} if this queue contains no elements
|
||||
*/
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
for (Node p = head; p != null; p = succ(p)) {
|
||||
if (!p.isMatched()) {
|
||||
if (!p.isMatched())
|
||||
return !p.isData;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -1267,7 +1220,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*
|
||||
* @return the number of elements in this queue
|
||||
*/
|
||||
@Override
|
||||
public int size() {
|
||||
return countOfMode(true);
|
||||
}
|
||||
@ -1287,17 +1239,39 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* @param o element to be removed from this queue, if present
|
||||
* @return {@code true} if this queue changed as a result of the call
|
||||
*/
|
||||
@Override
|
||||
public boolean remove(Object o) {
|
||||
return findAndRemove(o);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if this queue contains the specified element.
|
||||
* More formally, returns {@code true} if and only if this queue contains
|
||||
* at least one element {@code e} such that {@code o.equals(e)}.
|
||||
*
|
||||
* @param o object to be checked for containment in this queue
|
||||
* @return {@code true} if this queue contains the specified element
|
||||
*/
|
||||
public boolean contains(Object o) {
|
||||
if (o == null) return false;
|
||||
for (Node p = head; p != null; p = succ(p)) {
|
||||
Object item = p.item;
|
||||
if (p.isData) {
|
||||
if (item != null && item != p && o.equals(item))
|
||||
return true;
|
||||
}
|
||||
else if (item == null)
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Always returns {@code Integer.MAX_VALUE} because a
|
||||
* {@code LinkedTransferQueue} is not capacity constrained.
|
||||
*
|
||||
* @return {@code Integer.MAX_VALUE} (as specified by
|
||||
* {@link BlockingQueue#remainingCapacity()})
|
||||
* {@link java.util.concurrent.BlockingQueue#remainingCapacity()
|
||||
* BlockingQueue.remainingCapacity})
|
||||
*/
|
||||
public int remainingCapacity() {
|
||||
return Integer.MAX_VALUE;
|
||||
@ -1313,9 +1287,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
private void writeObject(java.io.ObjectOutputStream s)
|
||||
throws java.io.IOException {
|
||||
s.defaultWriteObject();
|
||||
for (E e : this) {
|
||||
for (E e : this)
|
||||
s.writeObject(e);
|
||||
}
|
||||
// Use trailing null as sentinel
|
||||
s.writeObject(null);
|
||||
}
|
||||
@ -1330,23 +1303,62 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
throws java.io.IOException, ClassNotFoundException {
|
||||
s.defaultReadObject();
|
||||
for (;;) {
|
||||
@SuppressWarnings("unchecked") E item = (E) s.readObject();
|
||||
if (item == null) {
|
||||
@SuppressWarnings("unchecked")
|
||||
E item = (E) s.readObject();
|
||||
if (item == null)
|
||||
break;
|
||||
} else {
|
||||
else
|
||||
offer(item);
|
||||
}
|
||||
}
|
||||
|
||||
// Unsafe mechanics
|
||||
|
||||
private static final sun.misc.Unsafe UNSAFE;
|
||||
private static final long headOffset;
|
||||
private static final long tailOffset;
|
||||
private static final long sweepVotesOffset;
|
||||
static {
|
||||
try {
|
||||
UNSAFE = getUnsafe();
|
||||
Class<?> k = LinkedTransferQueue.class;
|
||||
headOffset = UNSAFE.objectFieldOffset
|
||||
(k.getDeclaredField("head"));
|
||||
tailOffset = UNSAFE.objectFieldOffset
|
||||
(k.getDeclaredField("tail"));
|
||||
sweepVotesOffset = UNSAFE.objectFieldOffset
|
||||
(k.getDeclaredField("sweepVotes"));
|
||||
} catch (Exception e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
|
||||
* Replace with a simple call to Unsafe.getUnsafe when integrating
|
||||
* into a jdk.
|
||||
*
|
||||
* @return a sun.misc.Unsafe
|
||||
*/
|
||||
static sun.misc.Unsafe getUnsafe() {
|
||||
try {
|
||||
return sun.misc.Unsafe.getUnsafe();
|
||||
} catch (SecurityException se) {
|
||||
try {
|
||||
return java.security.AccessController.doPrivileged
|
||||
(new java.security
|
||||
.PrivilegedExceptionAction<sun.misc.Unsafe>() {
|
||||
public sun.misc.Unsafe run() throws Exception {
|
||||
java.lang.reflect.Field f = sun.misc
|
||||
.Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
return (sun.misc.Unsafe) f.get(null);
|
||||
}});
|
||||
} catch (java.security.PrivilegedActionException e) {
|
||||
throw new RuntimeException("Could not initialize intrinsics",
|
||||
e.getCause());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private static final AtomicReferenceFieldUpdater<LinkedTransferQueue, Node> headUpdater =
|
||||
AtomicFieldUpdaterUtil.newRefUpdater(LinkedTransferQueue.class, Node.class, "head");
|
||||
@SuppressWarnings("rawtypes")
|
||||
private static final AtomicReferenceFieldUpdater<LinkedTransferQueue, Node> tailUpdater =
|
||||
AtomicFieldUpdaterUtil.newRefUpdater(LinkedTransferQueue.class, Node.class, "tail");
|
||||
@SuppressWarnings("rawtypes")
|
||||
private static final AtomicIntegerFieldUpdater<LinkedTransferQueue> sweepVotesUpdater =
|
||||
AtomicFieldUpdaterUtil.newIntUpdater(LinkedTransferQueue.class, "sweepVotes");
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user