Replace usage of QueueFactory with ConcurrentLinkedQueue and LinkedBlockingQueue. See #477
This commit is contained in:
parent
5189bb1f3f
commit
26b513236f
@ -20,6 +20,7 @@ import static org.jboss.netty.channel.Channels.*;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.NotYetConnectedException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.jboss.netty.channel.AbstractChannel;
|
||||
@ -32,7 +33,6 @@ import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelSink;
|
||||
import org.jboss.netty.channel.DefaultChannelConfig;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
import org.jboss.netty.util.internal.ThreadLocalBoolean;
|
||||
|
||||
/**
|
||||
@ -49,7 +49,7 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel
|
||||
private final ChannelConfig config;
|
||||
private final ThreadLocalBoolean delivering = new ThreadLocalBoolean();
|
||||
|
||||
final Queue<MessageEvent> writeBuffer = QueueFactory.createQueue(MessageEvent.class);
|
||||
final Queue<MessageEvent> writeBuffer = new ConcurrentLinkedQueue<MessageEvent>();
|
||||
|
||||
volatile DefaultLocalChannel pairedChannel;
|
||||
volatile LocalAddress localAddress;
|
||||
|
@ -23,8 +23,7 @@ import java.nio.channels.WritableByteChannel;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@ -36,7 +35,6 @@ import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelSink;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
import org.jboss.netty.util.internal.ThreadLocalBoolean;
|
||||
|
||||
abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
|
||||
@ -204,13 +202,13 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
|
||||
|
||||
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
|
||||
|
||||
private final class WriteRequestQueue implements BlockingQueue<MessageEvent> {
|
||||
private final class WriteRequestQueue implements Queue<MessageEvent> {
|
||||
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
|
||||
|
||||
private final BlockingQueue<MessageEvent> queue;
|
||||
private final Queue<MessageEvent> queue;
|
||||
|
||||
public WriteRequestQueue() {
|
||||
this.queue = QueueFactory.createQueue(MessageEvent.class);
|
||||
this.queue = new ConcurrentLinkedQueue<MessageEvent>();
|
||||
}
|
||||
|
||||
public MessageEvent remove() {
|
||||
@ -269,26 +267,6 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
|
||||
return queue.add(e);
|
||||
}
|
||||
|
||||
public void put(MessageEvent e) throws InterruptedException {
|
||||
queue.put(e);
|
||||
}
|
||||
|
||||
public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException {
|
||||
return queue.offer(e, timeout, unit);
|
||||
}
|
||||
|
||||
public MessageEvent take() throws InterruptedException {
|
||||
return queue.take();
|
||||
}
|
||||
|
||||
public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
return queue.poll(timeout, unit);
|
||||
}
|
||||
|
||||
public int remainingCapacity() {
|
||||
return queue.remainingCapacity();
|
||||
}
|
||||
|
||||
public boolean remove(Object o) {
|
||||
return queue.remove(o);
|
||||
}
|
||||
@ -297,14 +275,6 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
|
||||
return queue.contains(o);
|
||||
}
|
||||
|
||||
public int drainTo(Collection<? super MessageEvent> c) {
|
||||
return queue.drainTo(c);
|
||||
}
|
||||
|
||||
public int drainTo(Collection<? super MessageEvent> c, int maxElements) {
|
||||
return queue.drainTo(c, maxElements);
|
||||
}
|
||||
|
||||
public boolean offer(MessageEvent e) {
|
||||
boolean success = queue.offer(e);
|
||||
assert success;
|
||||
|
@ -28,6 +28,7 @@ import java.nio.channels.WritableByteChannel;
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ -45,7 +46,6 @@ import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||
import org.jboss.netty.util.internal.DeadLockProofWorker;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
abstract class AbstractNioWorker implements Worker {
|
||||
|
||||
@ -108,14 +108,14 @@ abstract class AbstractNioWorker implements Worker {
|
||||
/**
|
||||
* Queue of channel registration tasks.
|
||||
*/
|
||||
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
|
||||
private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
|
||||
|
||||
/**
|
||||
* Queue of WriteTasks
|
||||
*/
|
||||
protected final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
|
||||
protected final Queue<Runnable> writeTaskQueue = new ConcurrentLinkedQueue<Runnable>();
|
||||
|
||||
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
|
||||
private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
|
||||
|
||||
|
||||
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
|
||||
|
@ -26,6 +26,7 @@ import java.nio.channels.Selector;
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ -43,7 +44,6 @@ import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||
import org.jboss.netty.util.internal.DeadLockProofWorker;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
class NioClientSocketPipelineSink extends AbstractNioChannelSink {
|
||||
|
||||
@ -174,7 +174,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
|
||||
private boolean started;
|
||||
private final AtomicBoolean wakenUp = new AtomicBoolean();
|
||||
private final Object startStopLock = new Object();
|
||||
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
|
||||
private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
|
||||
private final int subId;
|
||||
|
||||
Boss(int subId) {
|
||||
|
@ -19,12 +19,12 @@ import static org.jboss.netty.channel.Channels.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.socket.Worker;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* Abstract base class for Oio-Worker implementations
|
||||
@ -33,7 +33,7 @@ import org.jboss.netty.util.internal.QueueFactory;
|
||||
*/
|
||||
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
|
||||
|
||||
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
|
||||
private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
|
||||
|
||||
protected final C channel;
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
package org.jboss.netty.handler.codec.http;
|
||||
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
@ -26,7 +27,6 @@ import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
||||
import org.jboss.netty.handler.codec.PrematureChannelClosureException;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder}
|
||||
@ -49,7 +49,7 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
|
||||
ChannelDownstreamHandler {
|
||||
|
||||
/** A queue that is used for correlating a request and a response. */
|
||||
final Queue<HttpMethod> queue = QueueFactory.createQueue(HttpMethod.class);
|
||||
final Queue<HttpMethod> queue = new ConcurrentLinkedQueue<HttpMethod>();
|
||||
|
||||
/** If true, decoding stops (i.e. pass-through) */
|
||||
volatile boolean done;
|
||||
|
@ -16,6 +16,7 @@
|
||||
package org.jboss.netty.handler.codec.http;
|
||||
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
@ -24,7 +25,6 @@ import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelHandler;
|
||||
import org.jboss.netty.handler.codec.embedder.EncoderEmbedder;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* Encodes the content of the outbound {@link HttpResponse} and {@link HttpChunk}.
|
||||
@ -49,7 +49,7 @@ import org.jboss.netty.util.internal.QueueFactory;
|
||||
*/
|
||||
public abstract class HttpContentEncoder extends SimpleChannelHandler {
|
||||
|
||||
private final Queue<String> acceptEncodingQueue = QueueFactory.createQueue(String.class);
|
||||
private final Queue<String> acceptEncodingQueue = new ConcurrentLinkedQueue<String>();
|
||||
private volatile EncoderEmbedder<ChannelBuffer> encoder;
|
||||
|
||||
/**
|
||||
|
@ -23,6 +23,7 @@ import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
@ -45,7 +46,6 @@ import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.DefaultObjectSizeEstimator;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
|
||||
|
||||
/**
|
||||
@ -227,7 +227,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
ThreadFactory threadFactory) {
|
||||
|
||||
super(corePoolSize, corePoolSize, keepAliveTime, unit,
|
||||
QueueFactory.createQueue(Runnable.class), threadFactory, new NewThreadRunsPolicy());
|
||||
new LinkedBlockingQueue<Runnable>(), threadFactory, new NewThreadRunsPolicy());
|
||||
|
||||
if (objectSizeEstimator == null) {
|
||||
throw new NullPointerException("objectSizeEstimator");
|
||||
|
@ -19,6 +19,7 @@ import java.util.IdentityHashMap;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
@ -31,7 +32,6 @@ import org.jboss.netty.channel.ChannelState;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
|
||||
@ -280,7 +280,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
|
||||
}
|
||||
|
||||
protected final class ChildExecutor implements Executor, Runnable {
|
||||
private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class);
|
||||
private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<Runnable>();
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean();
|
||||
|
||||
public void execute(Runnable command) {
|
||||
|
@ -17,6 +17,7 @@ package org.jboss.netty.handler.queue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
@ -29,7 +30,6 @@ import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.util.internal.DeadLockProofWorker;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* Emulates blocking read operation. This handler stores all received messages
|
||||
@ -77,11 +77,10 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
|
||||
private volatile boolean closed;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the default unbounded {@link BlockingQueue}
|
||||
* implementation.
|
||||
* Creates a new instance with {@link LinkedBlockingQueue}
|
||||
*/
|
||||
public BlockingReadHandler() {
|
||||
this(QueueFactory.createQueue(ChannelEvent.class));
|
||||
this(new LinkedBlockingQueue<ChannelEvent>());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -21,6 +21,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
@ -36,7 +37,6 @@ import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelHandler;
|
||||
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
|
||||
import org.jboss.netty.util.HashedWheelTimer;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* Emulates buffered write operation. This handler stores all write requests
|
||||
@ -185,15 +185,14 @@ public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCy
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance with the default unbounded {@link BlockingQueue}
|
||||
* implementation.
|
||||
* Creates a new instance with {@link ConcurrentLinkedQueue}
|
||||
*
|
||||
* @param consolidateOnFlush
|
||||
* {@code true} if and only if the buffered write requests are merged
|
||||
* into a single write request on {@link #flush()}
|
||||
*/
|
||||
public BufferedWriteHandler(boolean consolidateOnFlush) {
|
||||
this(QueueFactory.createQueue(MessageEvent.class), consolidateOnFlush);
|
||||
this(new ConcurrentLinkedQueue<MessageEvent>(), consolidateOnFlush);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -24,6 +24,7 @@ import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.regex.Pattern;
|
||||
@ -54,7 +55,6 @@ import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.internal.DetectionUtil;
|
||||
import org.jboss.netty.util.internal.NonReentrantLock;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
|
||||
@ -199,7 +199,7 @@ public class SslHandler extends FrameDecoder
|
||||
int ignoreClosedChannelException;
|
||||
final Object ignoreClosedChannelExceptionLock = new Object();
|
||||
private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
|
||||
private final Queue<MessageEvent> pendingEncryptedWrites = QueueFactory.createQueue(MessageEvent.class);
|
||||
private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
|
||||
private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
|
||||
|
||||
private volatile boolean issueHandshake;
|
||||
|
@ -20,6 +20,7 @@ import static org.jboss.netty.channel.Channels.*;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
@ -38,7 +39,6 @@ import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.internal.QueueFactory;
|
||||
|
||||
/**
|
||||
* A {@link ChannelHandler} that adds support for writing a large data stream
|
||||
@ -79,7 +79,7 @@ public class ChunkedWriteHandler
|
||||
private static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
|
||||
|
||||
private final Queue<MessageEvent> queue = QueueFactory.createQueue(MessageEvent.class);
|
||||
private final Queue<MessageEvent> queue = new ConcurrentLinkedQueue<MessageEvent>();
|
||||
|
||||
private volatile ChannelHandlerContext ctx;
|
||||
private final AtomicBoolean flush = new AtomicBoolean(false);
|
||||
|
@ -24,6 +24,7 @@ import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
* This factory should be used to create the "optimal" {@link BlockingQueue}
|
||||
* instance for the running JVM.
|
||||
*/
|
||||
@Deprecated
|
||||
public final class QueueFactory {
|
||||
|
||||
private static final boolean useUnsafe = DetectionUtil.hasUnsafe();
|
||||
|
Loading…
Reference in New Issue
Block a user